Appearance
定时任务实现原理详解
基于 OpenCode 源码分析,版本截止 2026-03-28
目录
概述
OpenCode 的定时任务系统包含两套独立的实现:
| 系统 | 文件位置 | 用途 | 调度方式 |
|---|---|---|---|
| Scheduler | packages/opencode/src/scheduler/ | 轻量级间隔调度 | setInterval 间隔执行 |
| TaskGenerate | packages/opencode/src/task/generate.ts | Flash 任务自动化代码生成 | HTTP API 触发 + FIFO 队列 |
1. 轻量级调度器 (Scheduler)
用于周期性后台任务,简单的 setInterval 实现,支持实例级和全局级两种作用域隔离。
2. 任务生成系统 (TaskGenerate)
用于对接 Flash 任务管理系统,自动执行 AI 代码生成。包含完整的任务队列、状态机、进度追踪和错误处理机制。
整体架构总览
mermaid
graph TB
subgraph "外部系统"
F[Flash 任务平台]
L[Lighthouse 监控]
G[Git 远程仓库]
end
subgraph "Scheduler 轻量级调度器"
SC[Scheduler 命名空间]
IM[Instance 实例隔离]
ST[State 状态管理]
end
subgraph "TaskGenerate 任务生成系统"
TG[TaskGenerate 命名空间]
QM[Queue 队列管理]
TM[TaskInfo 状态存储]
FL[Flash 集成层]
SE[Session 会话管理]
BU[Bus 事件总线]
CL[CodeUpload 代码上传]
LG[Logger 日志记录]
end
SC --> IM
IM --> ST
TG --> QM
TG --> TM
TG --> FL
FL --> F
TG --> SE
SE --> BU
TG --> CL
TG --> LG
LG --> L
F --> G
G --> SE
SE --> G
style SC fill:#e1f5fe
style TG fill:#fff3e0两大系统的对比
mermaid
graph LR
subgraph "Scheduler 适用场景"
S1[定时健康检查]
S2[缓存预热]
S3[心跳上报]
end
subgraph "TaskGenerate 适用场景"
T1[Flash 需求自动化]
T2[PRD 转代码]
T3[自动提 PR]
end
S1 --> SE1["setInterval(ms)"]
S2 --> SE2["实例/全局 作用域"]
S3 --> SE3["unref() 不阻塞退出"]
T1 --> TE1["HTTP API 触发"]
T2 --> TE2["FIFO 分支队列"]
T3 --> TE3["Git Worktree 隔离"]轻量级调度器 Scheduler
核心类型定义
文件: packages/opencode/src/scheduler/index.ts
typescript
// 任务定义
export type Task = {
id: string // 唯一标识
interval: number // 执行间隔(毫秒)
run: () => Promise<void> // 执行函数
scope?: "instance" | "global" // 作用域
}
// 内部数据结构
type Timer = ReturnType<typeof setInterval>
type Entry = {
tasks: Map<string, Task> // 任务映射
timers: Map<string, Timer> // 定时器映射
}作用域机制
mermaid
graph TB
subgraph "实例作用域 instance"
I1[项目 A 调度器]
I2[项目 B 调度器]
T1[task-A-1]
T2[task-B-1]
end
subgraph "全局作用域 global"
G1[全局健康检查]
G2[全局心跳上报]
end
I1 --> T1
I2 --> T2
G1 -.->|每个实例共享| G2
style I1 fill:#e8f5e9
style I2 fill:#e8f5e9
style G1 fill:#fce4ec
style G2 fill:#fce4ec实例作用域:每个项目实例独立的调度器,任务只在该实例内生效。 全局作用域:跨所有实例共享,通常用于监控类任务。
核心注册流程
mermaid
sequenceDiagram
participant 调用方
participant Scheduler
participant State
participant Timer
调用方->>Scheduler: register(task)
Scheduler->>State: 获取状态
State-->>Scheduler: entry
alt 已有同名定时器
alt instance 作用域
Scheduler->>Timer: clearInterval(old)
end
alt global 作用域
Scheduler-->>调用方: return (防止重复)
end
end
Scheduler->>Timer: setInterval(run, interval)
Scheduler->>Timer: timer.unref()
Scheduler->>调用方: 立即执行 run() 一次
Timer->>Timer: 每 interval ms 执行关键实现细节
1. 防止重复注册
typescript
// 全局作用域:已注册则跳过
if (task.scope === "global" && entry.timers.has(task.id)) {
return
}
// 实例作用域:先清除旧的再注册
if (entry.timers.has(task.id)) {
clearInterval(entry.timers.get(task.id)!)
}2. 自动清理 (dispose)
typescript
const state = Instance.state(
() => create(),
async (entry) => {
// 清除所有定时器
for (const timer of entry.timers.values()) {
clearInterval(timer)
}
// 清理任务映射
entry.tasks.clear()
}
)3. 进程退出不阻塞 (unref)
typescript
const timer = setInterval(task.run, task.interval)
timer.unref() // 允许进程在没有其他工作时退出使用示例
typescript
import { Scheduler } from "./scheduler"
// 注册实例级任务
Scheduler.register({
id: "health-check",
interval: 30 * 1000, // 30秒
scope: "instance",
run: async () => {
console.log("健康检查...")
}
})
// 注册全局任务
Scheduler.register({
id: "global-telemetry",
interval: 60 * 1000,
scope: "global",
run: async () => {
await reportMetrics()
}
})任务生成系统 TaskGenerate
核心数据流
mermaid
graph LR
A[POST /generate] --> B[创建任务]
B --> C{队列状态?}
C -->|空闲| D[立即执行]
C -->|忙碌| E[加入队列]
E --> F[等待]
F --> G[前一个完成]
G --> D
D --> H[准备工作区]
H --> I[获取任务信息]
I --> J[创建会话]
J --> K[AI 代码生成]
K --> L[收集结果]
L --> M[上报 Lighthouse]
H -.->|git worktree| N[(远程仓库)]
J -.->|SessionPrompt| O[(AI 模型)]
M -.->|异步| P[Lighthouse]
style E fill:#fff3e0
style G fill:#e8f5e9任务状态机
mermaid
stateDiagram-v2
[*] --> queued: 入队
queued --> pending: 准备开始
pending --> preparing: 创建工作区
preparing --> fetching: 获取任务详情
fetching --> generating: AI 生成代码
generating --> completed: 生成成功
generating --> failed: 执行出错
generating --> timeout: 超时
completed --> [*]
failed --> [*]
timeout --> [*]
queued --> aborted: 用户取消
preparing --> aborted: 用户取消
fetching --> aborted: 用户取消
generating --> aborted: 用户取消
aborted --> [*]
note right of completed: diff 已收集<br/>文件已上传
note right of failed: 错误信息已记录<br/>可恢复错误会重试核心类型定义
文件: packages/opencode/src/task/generate.ts
typescript
// 任务状态枚举
const TaskStatus = z.enum([
"queued", // 排队中
"pending", // 等待执行
"preparing", // 准备中(创建工作区)
"fetching", // 获取任务信息
"generating", // AI 代码生成中
"completed", // 完成
"failed", // 失败
"aborted", // 中止
"timeout" // 超时
])
// 任务信息结构
const TaskInfo = z.object({
id: z.string(),
taskId: z.number(), // Flash 任务 ID
appId: z.string(),
sessionId: z.string().optional(),
status: TaskStatus,
workspace: z.object({
directory: z.string(), // 工作区目录
branch: z.string(), // 分支名
repoDir: z.string(), // 仓库路径
}).optional(),
progress: z.object({
startedAt: z.number(),
updatedAt: z.number(),
step: z.string().optional(),
message: z.string().optional(),
currentTool: z.string().optional(),
stats: z.object({
toolsExecuted: z.number().optional(),
filesModified: z.number().optional(),
filesRead: z.number().optional(),
}).optional(),
}).optional(),
result: z.object({
duration: z.number(),
startedAt: z.number(),
completedAt: z.number(),
summary: z.string().optional(),
filesChanged: z.array(z.object({
path: z.string(),
changeType: z.string()
})),
diffs: z.record(z.string(), z.string()),
}).optional(),
error: z.object({
code: z.string(),
message: z.string(),
detail: z.string().optional(),
failedAt: z.string().optional(),
recoverable: z.boolean().optional(),
}).optional(),
queue: z.object({
position: z.number(),
queuedAt: z.number(),
}).optional(),
createdAt: z.number(),
})队列管理机制
mermaid
graph TB
subgraph "Queue 队列管理 (per appId + branch)"
Q1[QueueItem<br/>id: task-1]
Q2[QueueItem<br/>id: task-2]
Q3[QueueItem<br/>id: task-3]
subgraph "BranchQueue"
R[running: task-1]
P[pending: [task-2, task-3]]
end
end
style R fill:#c8e6c9
style P fill:#fff3e0关键特性:
- FIFO 队列:先入先出,保证任务顺序
- 分支隔离:每个 (appId, branch) 组合有独立队列
- 单并发:每个分支同时只运行一个任务
typescript
type QueueItem = { id: string; input: GenerateInput }
type BranchQueue = {
running: string | null // 当前运行的任务
pending: QueueItem[] // 等待队列
}
// 队列操作
queues.get(key)!.pending.push({ id, input }) // 入队
processNextInQueue(key) // 执行下一个Flash 集成
工作区准备流程
mermaid
graph TB
A[appTaskPrepare] --> B{工作区存在?}
B -->|不存在| C[git clone 仓库]
B -->|存在| D[git fetch 更新]
C --> E[git worktree add]
D --> E
E --> F[git merge 同步最新]
F --> G{合并成功?}
G -->|失败| H[报告错误]
G -->|成功| I[验证分支存在]
I --> J[返回工作区信息]Flash API 调用链
mermaid
sequenceDiagram
participant TaskGenerate
participant FlashAPI as Flash API
participant Git
participant Session
TaskGenerate->>FlashAPI: taskInfo(taskId)
FlashAPI-->>TaskGenerate: 任务基本信息
TaskGenerate->>FlashAPI: detail(storyId, taskId)
FlashAPI-->>TaskGenerate: PRD + 验收标准 + 图片
TaskGenerate->>Git: appTaskPrepare()
Git-->>TaskGenerate: 工作区目录
TaskGenerate->>Session: create()
Session-->>TaskGenerate: sessionId
TaskGenerate->>FlashAPI: 订阅任务进度事件
FlashAPI-->>TaskGenerate: tool_start / tool_complete关键设计模式
1. 实例隔离模式 (Instance State)
mermaid
graph TB
subgraph "Instance A"
SA1[State]
TA1[task-1]
end
subgraph "Instance B"
SA2[State]
TA2[task-2]
end
SA1 -.->|隔离| TA1
SA2 -.->|隔离| TA2每个项目实例有独立的 State 存储,调度器和任务互不影响。
2. 事件驱动进度追踪
mermaid
graph LR
A[AI Agent] -->|tool.start| B[Bus]
B -->|更新进度| C[TaskInfo]
A -->|tool.complete| B
B -->|记录统计| C
C -->|定期| D[Lighthouse]通过 Bus.subscribe 监听工具执行事件,实时更新任务进度。
3. Git Worktree 隔离
mermaid
graph TB
subgraph "主仓库"
M[(main)]
M1[(feature-1)]
M2[(feature-2)]
end
W1[Worktree 1<br/>task-1]
W2[Worktree 2<br/>task-2]
M --> W1
M --> W2
W1 -.->|独立目录| D1[/tmp/opencode-task-1]
W2 -.->|独立目录| D2[/tmp/opencode-task-2]
style D1 fill:#e3f2fd
style D2 fill:#e3f2fd每个任务在独立的 git worktree 中执行,避免分支冲突。
4. 双日志策略
mermaid
graph LR
A[任务执行] --> B[本地日志]
A --> C[远程上报]
B --> D["~/.opencode/task-logs/{id}.json"]
C --> E[Lighthouse]
style D fill:#fff8e1
style E fill:#e8f5e9本地日志:完整任务记录,用于调试和问题排查。 远程上报:关键指标上报,用于监控和告警。
5. 自动清理 (Disposable State)
typescript
// State.create 第二个参数是清理函数
const state = Instance.state(
() => create(),
async (entry) => {
// 实例销毁时自动清理
for (const timer of entry.timers.values()) {
clearInterval(timer)
}
entry.tasks.clear()
entry.timers.clear()
}
)关键源码文件索引
| 组件 | 文件路径 | 职责 |
|---|---|---|
| Scheduler 核心 | packages/opencode/src/scheduler/index.ts | 轻量级间隔调度器 |
| TaskGenerate 核心 | packages/opencode/src/task/generate.ts | 任务生成主逻辑 |
| 任务日志 | packages/opencode/src/task/logger.ts | 本地 + Lighthouse 日志 |
| 代码上传 | packages/opencode/src/task/code-upload.ts | 生成结果上传 |
| Flash 集成 | packages/opencode/src/flash/story-task.ts | Flash API 调用 |
| HTTP 路由 | packages/opencode/src/server/routes/task-generate.ts | API 端点定义 |
| 实例管理 | packages/opencode/src/project/instance.ts | 实例隔离 |
| 状态管理 | packages/opencode/src/project/state.ts | 可销毁状态 |
| 事件总线 | packages/opencode/src/bus/index.ts | 发布订阅 |
| 会话管理 | packages/opencode/src/session/index.ts | AI 会话 |
| 调度器测试 | packages/opencode/test/scheduler.test.ts | 单元测试 |
| 生成任务测试 | packages/opencode/test/server/task-generate.test.ts | API 测试 |
总结
| 特性 | Scheduler | TaskGenerate |
|---|---|---|
| 调度方式 | 间隔执行 (setInterval) | HTTP API 触发 |
| 作用域 | instance / global | 全局队列 |
| 触发方式 | 启动时注册 | 外部 API 调用 |
| 并发控制 | 无限制(各实例独立) | FIFO 队列(按分支) |
| 错误恢复 | 无 | 可恢复错误重试 |
| 进度追踪 | 无 | 实时 Bus 事件 |
| 适用场景 | 健康检查、心跳 | 自动化代码生成 |
文档生成时间: 2026-03-28