Appearance
05 — Agent Loop 实现详解
概述
本章是对 04-REPL 与 Query 循环 的深度补充,聚焦 query.ts 中 queryLoop() 函数的实现细节。如果说 05 章解释了"Agent Loop 做什么",本章解释"Agent Loop 怎么做"——从状态结构、单次迭代的完整生命周期、到流式工具执行和所有 continue 分支的实现机理。
建议先阅读 04-REPL 与 Query 循环 中的概念介绍,再回到本章看实现。
文件地图
| 文件 | 职责 |
|---|---|
query.ts | Agent Loop 主体:query() 入口 + queryLoop() 无限循环 |
query/config.ts | QueryConfig — 不可变的运行时配置快照 |
query/deps.ts | QueryDeps — 可注入的 I/O 依赖(模型调用、压缩、UUID) |
query/stopHooks.ts | Stop Hook 执行与结果处理 |
query/tokenBudget.ts | Token 预算追踪与续行决策 |
services/tools/StreamingToolExecutor.ts | 流式工具并发调度器 |
services/tools/toolOrchestration.ts | 非流式工具编排(分区 + 串并行) |
入口:query() → queryLoop()
query() 是对外暴露的 AsyncGenerator 函数,但它只是一个薄包装:
query(params) → yield* queryLoop(params) → 通知命令完成 → return TerminalqueryLoop() 才是真正的 agent loop。分离的原因是:yield* 会透传 queryLoop 的所有 yield 和 return 值,而 query() 在正常退出时可以做清理工作(通知消费的命令已完成)。抛出异常或被 .return() 关闭时,清理代码不会执行——这是刻意为之的。
状态模型
QueryParams — 不可变输入
调用方传入的参数,在整个 agent loop 生命周期中不变:
typescript
type QueryParams = {
messages: Message[] // 初始消息历史
systemPrompt: SystemPrompt // 系统提示词
userContext / systemContext // 上下文注入
canUseTool: CanUseToolFn // 权限判断回调
toolUseContext: ToolUseContext // 工具执行上下文
fallbackModel?: string // 备用模型
querySource: QuerySource // 来源标记 (repl_main_thread / agent:xxx / sdk)
maxTurns?: number // 最大轮次限制
taskBudget?: { total: number }// API 任务预算
}State — 可变的跨迭代状态
每次 while(true) 迭代开始时从 state 解构读取,continue 时通过 state = { ... } 整体替换(而非逐字段赋值)。这种"全替换"风格让每个 continue 站点的状态变更一目了然:
typescript
type State = {
messages: Message[] // 当前消息列表(会随迭代增长或压缩后重置)
toolUseContext: ToolUseContext // 工具上下文(每迭代可能更新)
autoCompactTracking: AutoCompactTrackingState // 自动压缩追踪
maxOutputTokensRecoveryCount: number // 输出 token 超限恢复次数
hasAttemptedReactiveCompact: boolean // 是否已尝试反应式压缩
maxOutputTokensOverride: number | undefined // 输出 token 上限覆盖值
pendingToolUseSummary: Promise<...> | undefined // 上一轮的工具摘要(异步)
stopHookActive: boolean | undefined // Stop Hook 是否激活
turnCount: number // 当前轮次计数
transition: Continue | undefined // 上一次 continue 的原因
}QueryConfig — 不可变的运行时快照
在 queryLoop 入口处调用 buildQueryConfig() 一次性快照:
typescript
type QueryConfig = {
sessionId: SessionId
gates: {
streamingToolExecution: boolean // 是否启用流式工具执行
emitToolUseSummaries: boolean // 是否生成工具使用摘要
isAnt: boolean // 内部用户标记
fastModeEnabled: boolean // 快速模式
}
}注意:feature() 编译期标志(bun:bundle)不在 QueryConfig 中——它们必须内联在使用处才能被 tree-shaking 消除。
单次迭代的完整生命周期
每次 while(true) 循环体是一个完整的 API 调用轮次。下面是各阶段的详细流程:
┌─────────────────────────────────────────────────────────────┐
│ while(true) 循环体 │
│ │
│ ① 解构状态 + 初始化 │
│ ② 消息预处理管线 (budget → snip → microcompact → collapse) │
│ ③ 自动压缩 (autocompact) │
│ ④ System Prompt 组装 │
│ ⑤ 阻塞限制检查 │
│ ⑥ API 流式调用 + 流式工具执行 │
│ ⑦ 中断检查 (abort) │
│ ⑧ 错误恢复决策 (413 / max_output_tokens) │
│ ⑨ Stop Hooks + Token Budget │
│ ⑩ 工具结果收集 + 附件注入 │
│ ⑪ 状态更新 → continue / return │
└─────────────────────────────────────────────────────────────┘① 解构状态 + 初始化
typescript
let { toolUseContext } = state
const { messages, autoCompactTracking, turnCount, ... } = statetoolUseContext 用 let 因为迭代内会被更新(queryTracking、messages 绑定)。其余字段用 const——它们只在 continue 站点整体替换。
初始化 queryTracking:首轮生成新的 chainId,后续轮次递增 depth。这让分析系统能追踪一个用户请求引发的完整 agent 链。
② 消息预处理管线
消息在发送给 API 前经过四级预处理,按顺序执行:
原始消息
│
├─ applyToolResultBudget() ← 工具结果大小预算(截断过大的结果)
│
├─ snipCompactIfNeeded() ← 历史裁剪(feature: HISTORY_SNIP)
│
├─ microcompact() ← 微压缩(折叠旧的工具结果详情)
│
├─ applyCollapsesIfNeeded() ← 上下文折叠(feature: CONTEXT_COLLAPSE)
│
└─ 预处理后的消息为什么先 snip 再 microcompact 再 collapse 最后 autocompact? 每一步都可能减少 token 数。如果前面的步骤已经把 token 数降到阈值以下,后面更昂贵的步骤(如 autocompact 需要 API 调用)就可以跳过。
③ 自动压缩 (autocompact)
如果消息 token 数超过阈值,调用 autoCompactIfNeeded()。这是一次完整的 API 调用(用 Claude 生成对话摘要)。压缩成功后:
- 将
messagesForQuery替换为压缩后的消息 - yield 压缩后的消息给 REPL 渲染
- 重置
autoCompactTracking - 如果有
taskBudget,记录压缩前的 token 用量到taskBudgetRemaining
④ System Prompt 组装
typescript
const fullSystemPrompt = asSystemPrompt(
appendSystemContext(systemPrompt, systemContext)
)静态部分(角色、规则)+ 动态部分(环境、记忆)在 systemPrompt 中已经组合好。systemContext 追加系统级动态信息。用户上下文(CLAUDE.md、日期、Git)在后续 prependUserContext() 阶段注入到第一条 User Message 中。
⑤ 阻塞限制检查
在发起 API 调用前,检查 token 数是否已到硬阻塞限制。这只在以下条件都满足时生效:
- 没有刚完成压缩
- 不是 compact/session_memory 来源(防止压缩代理死锁)
- 没有启用 reactive compact + auto compact(否则会抢占真正的 API 413 恢复路径)
- 没有启用 context collapse
超限则 yield 错误消息并 return { reason: 'blocking_limit' }。
⑥ API 流式调用 + 流式工具执行
这是每次迭代的核心。代码结构是一个双层循环:
typescript
// 外层:fallback 重试循环(最多 2 次:原始模型 + 备用模型)
while (attemptWithFallback) {
attemptWithFallback = false
try {
// 内层:流式消费循环
for await (const message of deps.callModel({ ... })) {
// 处理每个 streaming chunk
}
} catch (innerError) {
if (innerError instanceof FallbackTriggeredError) {
// 切换到备用模型,清空已收集的消息,重新创建 StreamingToolExecutor
currentModel = fallbackModel
attemptWithFallback = true
continue
}
throw innerError
}
}流式消费循环内部做的事:
- Fallback 清理:如果发生了流式 fallback(streaming 中途切换模型),清空之前的消息并发出 tombstone
- Backfill 可观察输入:某些工具的
backfillObservableInput会补充衍生字段(如文件路径展开),仅添加新字段时克隆消息 - Withhold 可恢复错误:413、max_output_tokens、media size error 都被暂扣(不 yield),后续恢复决策再处理
- 收集 tool_use 块:每收到一个
tool_use,设needsFollowUp = true - 流式工具调度:如果启用
StreamingToolExecutor,立即addTool()入队。只读工具并行启动,写入工具等待前面的完成
⑦ 中断检查 (abort)
流式调用结束后,第一件事是检查 abortController.signal.aborted:
- 如果被中断,消费
StreamingToolExecutor剩余结果(它会为排队中的工具生成合成 tool_result) - 如果是
submit-interrupt(用户提交了新消息),跳过"用户中断"提示 - 清理 Computer Use 资源(feature: CHICAGO_MCP)
return { reason: 'aborted_streaming' }
⑧ 错误恢复决策
当 needsFollowUp === false(模型没有返回 tool_use)时,进入退出判断。这里是 05 章描述的 6 种恢复路径的具体实现:
路径 A:413 → Context Collapse Drain
typescript
if (isWithheld413 && state.transition?.reason !== 'collapse_drain_retry') {
const drained = contextCollapse.recoverFromOverflow(messagesForQuery, querySource)
if (drained.committed > 0) {
state = { ...next, transition: { reason: 'collapse_drain_retry' } }
continue // ← 回到 while(true) 顶部
}
}关键细节:通过检查 state.transition?.reason 防止反复 drain——如果上一轮已经 drain 过还是 413,就不再尝试,让后面的 reactive compact 处理。
路径 B:413 / Media Error → Reactive Compact
typescript
if ((isWithheld413 || isWithheldMedia) && reactiveCompact) {
const compacted = await reactiveCompact.tryReactiveCompact({
hasAttempted: hasAttemptedReactiveCompact, // 一次性保险
...
})
if (compacted) {
state = { ...next, hasAttemptedReactiveCompact: true, transition: { reason: 'reactive_compact_retry' } }
continue
}
// 无法恢复 → yield 暂扣的错误消息 → return
}hasAttemptedReactiveCompact 保证只尝试一次。如果压缩后仍然 413(例如大图片在保护尾部),直接返回错误。
路径 C:Max Output Tokens → 升级到 64k
typescript
if (maxOutputTokensOverride === undefined && capEnabled) {
state = { ...next, maxOutputTokensOverride: ESCALATED_MAX_TOKENS, transition: { reason: 'max_output_tokens_escalate' } }
continue // 用更大的 max_tokens 重试同一请求
}无需注入恢复消息——这是纯参数调整的重试。
路径 D:Max Output Tokens → 多轮恢复
typescript
if (maxOutputTokensRecoveryCount < 3) {
const recoveryMessage = createUserMessage({
content: 'Output token limit hit. Resume directly — no apology, no recap...',
isMeta: true,
})
state = { messages: [...messages, ...assistantMessages, recoveryMessage], maxOutputTokensRecoveryCount: count + 1, ... }
continue
}
// 超过 3 次 → yield 暂扣的错误消息路径 E:Model Fallback(在流式循环中处理)
在 ⑥ 的 catch 块中完成,不走 continue 路径。切换模型后重新进入 while (attemptWithFallback) 内层循环。
路径 F:Stop Hook Blocking
见 ⑨。
⑨ Stop Hooks + Token Budget
如果模型正常回复(没有 API 错误),执行 Stop Hooks:
typescript
const stopHookResult = yield* handleStopHooks(...)
if (stopHookResult.preventContinuation) return { reason: 'stop_hook_prevented' }
if (stopHookResult.blockingErrors.length > 0) {
state = { messages: [..., ...blockingErrors], stopHookActive: true, transition: { reason: 'stop_hook_blocking' } }
continue // 路径 F:带着 Hook 的错误消息重新调用 API
}handleStopHooks 内部:
- 执行用户配置的 Stop Hooks(并行)
- 保存
cacheSafeParams供/btw和 SDK side_question 使用 - 触发后台任务:Prompt Suggestion、Extract Memories、Auto Dream
- 清理 Computer Use 资源
- 如果是 Teammate,还执行 TaskCompleted 和 TeammateIdle Hooks
Token Budget 检查(feature: TOKEN_BUDGET)在 Stop Hooks 之后:
typescript
const decision = checkTokenBudget(budgetTracker, ...)
if (decision.action === 'continue') {
// 注入续行提示消息,继续循环
state = { ..., transition: { reason: 'token_budget_continuation' } }
continue
}checkTokenBudget 的判断逻辑:
- 用量 < 90% 预算 →
continue,注入进度提示 - 连续 3+ 轮增量 < 500 tokens →
stop(diminishing returns) - 用量 >= 90% →
stop
⑩ 工具结果收集 + 附件注入
当 needsFollowUp === true(模型返回了 tool_use),跳过 ⑧⑨,进入工具执行阶段:
typescript
// 收集流式执行器的剩余结果,或用非流式编排器执行
const toolUpdates = streamingToolExecutor
? streamingToolExecutor.getRemainingResults()
: runTools(toolUseBlocks, assistantMessages, canUseTool, toolUseContext)
for await (const update of toolUpdates) {
yield update.message // 工具结果
toolResults.push(...) // 收集到列表
if (update.newContext) ... // 工具可能更新上下文
}工具执行完毕后,注入附件消息:
- 队列命令附件 — 消费
messageQueueManager中的待处理通知(Sleep 唤醒、任务完成等) - Memory 附件 — 如果
pendingMemoryPrefetch已完成,注入相关记忆文件 - Skill Discovery 附件 — 如果技能发现预取已完成,注入匹配的技能
最后刷新 MCP 工具列表(refreshTools()),检查 maxTurns 限制。
⑪ 状态更新 → continue
typescript
const next: State = {
messages: [...messagesForQuery, ...assistantMessages, ...toolResults],
toolUseContext: toolUseContextWithQueryTracking,
autoCompactTracking: tracking,
turnCount: nextTurnCount,
maxOutputTokensRecoveryCount: 0, // 重置
hasAttemptedReactiveCompact: false, // 重置
pendingToolUseSummary: nextPendingToolUseSummary,
maxOutputTokensOverride: undefined, // 重置
transition: { reason: 'next_turn' },
}
state = next
// → 回到 while(true) 顶部,开始下一个 API 调用轮次所有 continue / return 路径一览
Continue 路径(循环继续)
| transition.reason | 触发条件 | 状态变更要点 |
|---|---|---|
next_turn | 正常:工具执行完毕 | messages 追加助手+工具结果;计数器重置 |
collapse_drain_retry | 413 → 上下文折叠成功 | messages = 折叠后的消息 |
reactive_compact_retry | 413/media → 反应式压缩成功 | messages = 压缩后消息;hasAttemptedReactiveCompact = true |
max_output_tokens_escalate | 输出超限 → 升级到 64k | maxOutputTokensOverride = ESCALATED_MAX_TOKENS |
max_output_tokens_recovery | 输出超限 → 注入续行消息 | messages 追加恢复提示;计数器 +1 |
stop_hook_blocking | Stop Hook 返回阻塞错误 | messages 追加错误消息;stopHookActive = true |
token_budget_continuation | Token 预算未用完 | messages 追加续行提示 |
Return 路径(循环结束)
| reason | 触发条件 |
|---|---|
completed | 模型返回纯文本,Stop Hooks 通过 |
aborted_streaming | 流式期间用户中断 |
aborted_tools | 工具执行期间用户中断 |
blocking_limit | Token 数到硬阻塞限制(auto-compact 关闭时) |
prompt_too_long | 413 且所有恢复手段都失败 |
image_error | 图片大小/格式错误 |
model_error | 模型调用抛出异常 |
max_turns | 达到 maxTurns 限制 |
hook_stopped | 工具执行 Hook 阻止继续 |
stop_hook_prevented | Stop Hook 主动终止 |
流式工具执行详解
StreamingToolExecutor
核心数据结构是一个 TrackedTool[] 队列,每个元素的状态机:
queued → executing → completed → yielded并发控制规则:
isConcurrencySafe = true的工具(Read、Glob、Grep 等只读工具)可以并行isConcurrencySafe = false的工具(Bash、Edit 等写入工具)必须独占执行- Bash 工具错误会触发
siblingAbortController,取消同批次的兄弟进程(不影响主abortController)
结果顺序保证: 结果按添加顺序(不是完成顺序)yield。getCompletedResults() 从队列头部开始,只有当头部工具完成时才 yield,保证 API 看到的 tool_result 顺序与 tool_use 一致。
流式 vs 非流式对比
| StreamingToolExecutor | runTools() | |
|---|---|---|
| 执行时机 | API 流式期间就开始 | API 流式结束后开始 |
| 并发策略 | 按添加顺序逐个评估 | 预分区:连续只读块并行,非只读块串行 |
| 控制方式 | Statsig gate | 默认 fallback |
| 中断处理 | 内部生成合成 tool_result | 外部 yieldMissingToolResultBlocks |
消息 Withhold 机制
Agent Loop 中一个精巧的设计是 withhold(暂扣):可恢复的错误消息在流式循环中被收集但不 yield,直到恢复决策阶段才决定是重试还是暴露给用户。
流式循环:
收到 413 错误消息 → withheld = true → 不 yield → push 到 assistantMessages
恢复决策:
if (collapse drain 成功) → continue → 错误消息被丢弃(下一轮会得到新响应)
if (reactive compact 成功) → continue → 同上
if (所有恢复都失败) → yield lastMessage → return → 用户看到错误三种可暂扣的错误:
- Prompt Too Long (413) — 被
contextCollapse.isWithheldPromptTooLong()和reactiveCompact.isWithheldPromptTooLong()检测 - Media Size Error — 被
reactiveCompact.isWithheldMediaSizeError()检测(需 gate 开启) - Max Output Tokens — 被
isWithheldMaxOutputTokens()检测
暂扣和恢复必须在同一 gate 下——如果暂扣了但没有对应的恢复路径,消息就丢失了。代码中通过在 turn 开始时预先快照 gate 值(mediaRecoveryEnabled)来保证一致性。
预取与异步优化
Agent Loop 利用模型 streaming 的等待时间(通常 5-30 秒)做了多项预取:
| 预取项 | 启动时机 | 消费时机 | 失败策略 |
|---|---|---|---|
| Memory 相关文件 | 循环入口(startRelevantMemoryPrefetch) | 工具执行后 | 未 settle → 跳过,下一轮重试 |
| Skill Discovery | 每轮迭代开始 | 工具执行后 | 无匹配 → 无附件 |
| Tool Use Summary | 上一轮工具完毕时触发 (Haiku ~1s) | 下一轮 streaming 结束后 | catch → null |
pendingMemoryPrefetch 使用 using 语法(TC39 Explicit Resource Management),在生成器任何退出路径上自动 dispose。
防死循环机制
Agent Loop 是一个 while(true),防止无限循环是关键。以下是各种保护措施:
| 机制 | 保护什么 |
|---|---|
hasAttemptedReactiveCompact | Reactive Compact 只尝试一次 |
state.transition?.reason 检查 | Collapse Drain 如果上一轮已经 drain 过不再尝试 |
maxOutputTokensRecoveryCount ≤ 3 | Max Output Tokens 恢复最多 3 轮 |
maxTurns | 外部配置的硬限制 |
abortController | 用户随时可以中断 |
| Token Budget diminishing returns | 连续产出过低时自动停止 |
attemptWithFallback 一次性标志 | Fallback 最多触发一次 |
Stop Hook hasAttemptedReactiveCompact 保留 | Stop Hook blocking 重试不会重置 compact guard |
端到端示例:一次典型对话轮次
用户输入:"帮我把 utils.js 中的 formatDate 函数改成使用 dayjs"
轮次 1:
② 消息预处理 (无变化,消息量少)
③ 自动压缩 (跳过,token 数未超阈值)
⑥ API 调用 → 模型返回:
- text: "让我先看一下这个文件"
- tool_use: Read("utils.js")
⑩ StreamingToolExecutor 立即执行 Read (只读,并行安全)
⑪ state = { messages: [user, assistant, tool_result], transition: { reason: 'next_turn' } }
→ continue
轮次 2:
⑥ API 调用 → 模型返回:
- text: "找到了,我来修改..."
- tool_use: Edit("utils.js", old, new)
⑩ 执行 Edit → 权限检查 → 用户批准 → 执行
⑪ state 更新 → continue
轮次 3:
⑥ API 调用 → 模型返回:
- text: "已完成修改。formatDate 现在使用 dayjs..."
(纯文本,无 tool_use)
⑧ needsFollowUp = false
⑨ Stop Hooks 通过,Token Budget 无需续行
→ return { reason: 'completed' }与子代理的关系
子代理(通过 AgentTool 启动)运行独立的 query() 调用,拥有独立的 toolUseContext(含独立的 agentId、abortController)。但它们共享进程级资源:
- 消息队列 (
messageQueueManager) — 按agentId隔离 - MCP 连接 — 共享,但工具列表独立刷新
- Token Budget — 子代理的
checkTokenBudget对agentId !== undefined直接返回stop(不支持子代理续行) - Stop Hooks — 子代理运行
SubagentStop,不运行Stop
相关章节
- 04-REPL 与 Query 循环 — 概念层面的对话循环介绍
- 06-Tool 系统 — 工具定义、权限、MCP 代理
- 03-状态管理 — AppState、ToolUseContext 详解
- 07-服务层 — API 调用、Compact 压缩
- 09-Agent/Task — 子代理生命周期