Skip to content

05 — Agent Loop 实现详解

概述

本章是对 04-REPL 与 Query 循环 的深度补充,聚焦 query.tsqueryLoop() 函数的实现细节。如果说 05 章解释了"Agent Loop 做什么",本章解释"Agent Loop 怎么做"——从状态结构、单次迭代的完整生命周期、到流式工具执行和所有 continue 分支的实现机理。

建议先阅读 04-REPL 与 Query 循环 中的概念介绍,再回到本章看实现。

文件地图

文件职责
query.tsAgent Loop 主体:query() 入口 + queryLoop() 无限循环
query/config.tsQueryConfig — 不可变的运行时配置快照
query/deps.tsQueryDeps — 可注入的 I/O 依赖(模型调用、压缩、UUID)
query/stopHooks.tsStop Hook 执行与结果处理
query/tokenBudget.tsToken 预算追踪与续行决策
services/tools/StreamingToolExecutor.ts流式工具并发调度器
services/tools/toolOrchestration.ts非流式工具编排(分区 + 串并行)

入口:query()queryLoop()

query() 是对外暴露的 AsyncGenerator 函数,但它只是一个薄包装:

query(params) → yield* queryLoop(params) → 通知命令完成 → return Terminal

queryLoop() 才是真正的 agent loop。分离的原因是:yield* 会透传 queryLoop 的所有 yield 和 return 值,而 query() 在正常退出时可以做清理工作(通知消费的命令已完成)。抛出异常或被 .return() 关闭时,清理代码不会执行——这是刻意为之的。

状态模型

QueryParams — 不可变输入

调用方传入的参数,在整个 agent loop 生命周期中不变:

typescript
type QueryParams = {
  messages: Message[]           // 初始消息历史
  systemPrompt: SystemPrompt    // 系统提示词
  userContext / systemContext    // 上下文注入
  canUseTool: CanUseToolFn      // 权限判断回调
  toolUseContext: ToolUseContext // 工具执行上下文
  fallbackModel?: string        // 备用模型
  querySource: QuerySource      // 来源标记 (repl_main_thread / agent:xxx / sdk)
  maxTurns?: number             // 最大轮次限制
  taskBudget?: { total: number }// API 任务预算
}

State — 可变的跨迭代状态

每次 while(true) 迭代开始时从 state 解构读取,continue 时通过 state = { ... } 整体替换(而非逐字段赋值)。这种"全替换"风格让每个 continue 站点的状态变更一目了然:

typescript
type State = {
  messages: Message[]                           // 当前消息列表(会随迭代增长或压缩后重置)
  toolUseContext: ToolUseContext                 // 工具上下文(每迭代可能更新)
  autoCompactTracking: AutoCompactTrackingState  // 自动压缩追踪
  maxOutputTokensRecoveryCount: number           // 输出 token 超限恢复次数
  hasAttemptedReactiveCompact: boolean           // 是否已尝试反应式压缩
  maxOutputTokensOverride: number | undefined    // 输出 token 上限覆盖值
  pendingToolUseSummary: Promise<...> | undefined // 上一轮的工具摘要(异步)
  stopHookActive: boolean | undefined            // Stop Hook 是否激活
  turnCount: number                              // 当前轮次计数
  transition: Continue | undefined               // 上一次 continue 的原因
}

QueryConfig — 不可变的运行时快照

queryLoop 入口处调用 buildQueryConfig() 一次性快照:

typescript
type QueryConfig = {
  sessionId: SessionId
  gates: {
    streamingToolExecution: boolean  // 是否启用流式工具执行
    emitToolUseSummaries: boolean    // 是否生成工具使用摘要
    isAnt: boolean                   // 内部用户标记
    fastModeEnabled: boolean         // 快速模式
  }
}

注意:feature() 编译期标志(bun:bundle不在 QueryConfig 中——它们必须内联在使用处才能被 tree-shaking 消除。

单次迭代的完整生命周期

每次 while(true) 循环体是一个完整的 API 调用轮次。下面是各阶段的详细流程:

┌─────────────────────────────────────────────────────────────┐
│                    while(true) 循环体                        │
│                                                             │
│  ① 解构状态 + 初始化                                        │
│  ② 消息预处理管线 (budget → snip → microcompact → collapse) │
│  ③ 自动压缩 (autocompact)                                   │
│  ④ System Prompt 组装                                       │
│  ⑤ 阻塞限制检查                                             │
│  ⑥ API 流式调用 + 流式工具执行                               │
│  ⑦ 中断检查 (abort)                                         │
│  ⑧ 错误恢复决策 (413 / max_output_tokens)                   │
│  ⑨ Stop Hooks + Token Budget                                │
│  ⑩ 工具结果收集 + 附件注入                                   │
│  ⑪ 状态更新 → continue / return                             │
└─────────────────────────────────────────────────────────────┘

① 解构状态 + 初始化

typescript
let { toolUseContext } = state
const { messages, autoCompactTracking, turnCount, ... } = state

toolUseContextlet 因为迭代内会被更新(queryTracking、messages 绑定)。其余字段用 const——它们只在 continue 站点整体替换。

初始化 queryTracking:首轮生成新的 chainId,后续轮次递增 depth。这让分析系统能追踪一个用户请求引发的完整 agent 链。

② 消息预处理管线

消息在发送给 API 前经过四级预处理,按顺序执行

原始消息

  ├─ applyToolResultBudget()  ← 工具结果大小预算(截断过大的结果)

  ├─ snipCompactIfNeeded()    ← 历史裁剪(feature: HISTORY_SNIP)

  ├─ microcompact()           ← 微压缩(折叠旧的工具结果详情)

  ├─ applyCollapsesIfNeeded() ← 上下文折叠(feature: CONTEXT_COLLAPSE)

  └─ 预处理后的消息

为什么先 snip 再 microcompact 再 collapse 最后 autocompact? 每一步都可能减少 token 数。如果前面的步骤已经把 token 数降到阈值以下,后面更昂贵的步骤(如 autocompact 需要 API 调用)就可以跳过。

③ 自动压缩 (autocompact)

如果消息 token 数超过阈值,调用 autoCompactIfNeeded()。这是一次完整的 API 调用(用 Claude 生成对话摘要)。压缩成功后:

  1. messagesForQuery 替换为压缩后的消息
  2. yield 压缩后的消息给 REPL 渲染
  3. 重置 autoCompactTracking
  4. 如果有 taskBudget,记录压缩前的 token 用量到 taskBudgetRemaining

④ System Prompt 组装

typescript
const fullSystemPrompt = asSystemPrompt(
  appendSystemContext(systemPrompt, systemContext)
)

静态部分(角色、规则)+ 动态部分(环境、记忆)在 systemPrompt 中已经组合好。systemContext 追加系统级动态信息。用户上下文(CLAUDE.md、日期、Git)在后续 prependUserContext() 阶段注入到第一条 User Message 中。

⑤ 阻塞限制检查

在发起 API 调用前,检查 token 数是否已到硬阻塞限制。这只在以下条件都满足时生效:

  • 没有刚完成压缩
  • 不是 compact/session_memory 来源(防止压缩代理死锁)
  • 没有启用 reactive compact + auto compact(否则会抢占真正的 API 413 恢复路径)
  • 没有启用 context collapse

超限则 yield 错误消息并 return { reason: 'blocking_limit' }

⑥ API 流式调用 + 流式工具执行

这是每次迭代的核心。代码结构是一个双层循环

typescript
// 外层:fallback 重试循环(最多 2 次:原始模型 + 备用模型)
while (attemptWithFallback) {
  attemptWithFallback = false
  try {
    // 内层:流式消费循环
    for await (const message of deps.callModel({ ... })) {
      // 处理每个 streaming chunk
    }
  } catch (innerError) {
    if (innerError instanceof FallbackTriggeredError) {
      // 切换到备用模型,清空已收集的消息,重新创建 StreamingToolExecutor
      currentModel = fallbackModel
      attemptWithFallback = true
      continue
    }
    throw innerError
  }
}

流式消费循环内部做的事:

  1. Fallback 清理:如果发生了流式 fallback(streaming 中途切换模型),清空之前的消息并发出 tombstone
  2. Backfill 可观察输入:某些工具的 backfillObservableInput 会补充衍生字段(如文件路径展开),仅添加新字段时克隆消息
  3. Withhold 可恢复错误:413、max_output_tokens、media size error 都被暂扣(不 yield),后续恢复决策再处理
  4. 收集 tool_use 块:每收到一个 tool_use,设 needsFollowUp = true
  5. 流式工具调度:如果启用 StreamingToolExecutor,立即 addTool() 入队。只读工具并行启动,写入工具等待前面的完成

⑦ 中断检查 (abort)

流式调用结束后,第一件事是检查 abortController.signal.aborted

  • 如果被中断,消费 StreamingToolExecutor 剩余结果(它会为排队中的工具生成合成 tool_result)
  • 如果是 submit-interrupt(用户提交了新消息),跳过"用户中断"提示
  • 清理 Computer Use 资源(feature: CHICAGO_MCP)
  • return { reason: 'aborted_streaming' }

⑧ 错误恢复决策

needsFollowUp === false(模型没有返回 tool_use)时,进入退出判断。这里是 05 章描述的 6 种恢复路径的具体实现:

路径 A:413 → Context Collapse Drain

typescript
if (isWithheld413 && state.transition?.reason !== 'collapse_drain_retry') {
  const drained = contextCollapse.recoverFromOverflow(messagesForQuery, querySource)
  if (drained.committed > 0) {
    state = { ...next, transition: { reason: 'collapse_drain_retry' } }
    continue  // ← 回到 while(true) 顶部
  }
}

关键细节:通过检查 state.transition?.reason 防止反复 drain——如果上一轮已经 drain 过还是 413,就不再尝试,让后面的 reactive compact 处理。

路径 B:413 / Media Error → Reactive Compact

typescript
if ((isWithheld413 || isWithheldMedia) && reactiveCompact) {
  const compacted = await reactiveCompact.tryReactiveCompact({
    hasAttempted: hasAttemptedReactiveCompact, // 一次性保险
    ...
  })
  if (compacted) {
    state = { ...next, hasAttemptedReactiveCompact: true, transition: { reason: 'reactive_compact_retry' } }
    continue
  }
  // 无法恢复 → yield 暂扣的错误消息 → return
}

hasAttemptedReactiveCompact 保证只尝试一次。如果压缩后仍然 413(例如大图片在保护尾部),直接返回错误。

路径 C:Max Output Tokens → 升级到 64k

typescript
if (maxOutputTokensOverride === undefined && capEnabled) {
  state = { ...next, maxOutputTokensOverride: ESCALATED_MAX_TOKENS, transition: { reason: 'max_output_tokens_escalate' } }
  continue  // 用更大的 max_tokens 重试同一请求
}

无需注入恢复消息——这是纯参数调整的重试。

路径 D:Max Output Tokens → 多轮恢复

typescript
if (maxOutputTokensRecoveryCount < 3) {
  const recoveryMessage = createUserMessage({
    content: 'Output token limit hit. Resume directly — no apology, no recap...',
    isMeta: true,
  })
  state = { messages: [...messages, ...assistantMessages, recoveryMessage], maxOutputTokensRecoveryCount: count + 1, ... }
  continue
}
// 超过 3 次 → yield 暂扣的错误消息

路径 E:Model Fallback(在流式循环中处理)

在 ⑥ 的 catch 块中完成,不走 continue 路径。切换模型后重新进入 while (attemptWithFallback) 内层循环。

路径 F:Stop Hook Blocking

见 ⑨。

⑨ Stop Hooks + Token Budget

如果模型正常回复(没有 API 错误),执行 Stop Hooks:

typescript
const stopHookResult = yield* handleStopHooks(...)

if (stopHookResult.preventContinuation) return { reason: 'stop_hook_prevented' }
if (stopHookResult.blockingErrors.length > 0) {
  state = { messages: [..., ...blockingErrors], stopHookActive: true, transition: { reason: 'stop_hook_blocking' } }
  continue  // 路径 F:带着 Hook 的错误消息重新调用 API
}

handleStopHooks 内部:

  1. 执行用户配置的 Stop Hooks(并行)
  2. 保存 cacheSafeParams/btw 和 SDK side_question 使用
  3. 触发后台任务:Prompt Suggestion、Extract Memories、Auto Dream
  4. 清理 Computer Use 资源
  5. 如果是 Teammate,还执行 TaskCompleted 和 TeammateIdle Hooks

Token Budget 检查(feature: TOKEN_BUDGET)在 Stop Hooks 之后:

typescript
const decision = checkTokenBudget(budgetTracker, ...)
if (decision.action === 'continue') {
  // 注入续行提示消息,继续循环
  state = { ..., transition: { reason: 'token_budget_continuation' } }
  continue
}

checkTokenBudget 的判断逻辑:

  • 用量 < 90% 预算 → continue,注入进度提示
  • 连续 3+ 轮增量 < 500 tokens → stop(diminishing returns)
  • 用量 >= 90% → stop

⑩ 工具结果收集 + 附件注入

needsFollowUp === true(模型返回了 tool_use),跳过 ⑧⑨,进入工具执行阶段:

typescript
// 收集流式执行器的剩余结果,或用非流式编排器执行
const toolUpdates = streamingToolExecutor
  ? streamingToolExecutor.getRemainingResults()
  : runTools(toolUseBlocks, assistantMessages, canUseTool, toolUseContext)

for await (const update of toolUpdates) {
  yield update.message           // 工具结果
  toolResults.push(...)          // 收集到列表
  if (update.newContext) ...     // 工具可能更新上下文
}

工具执行完毕后,注入附件消息:

  1. 队列命令附件 — 消费 messageQueueManager 中的待处理通知(Sleep 唤醒、任务完成等)
  2. Memory 附件 — 如果 pendingMemoryPrefetch 已完成,注入相关记忆文件
  3. Skill Discovery 附件 — 如果技能发现预取已完成,注入匹配的技能

最后刷新 MCP 工具列表(refreshTools()),检查 maxTurns 限制。

⑪ 状态更新 → continue

typescript
const next: State = {
  messages: [...messagesForQuery, ...assistantMessages, ...toolResults],
  toolUseContext: toolUseContextWithQueryTracking,
  autoCompactTracking: tracking,
  turnCount: nextTurnCount,
  maxOutputTokensRecoveryCount: 0,        // 重置
  hasAttemptedReactiveCompact: false,      // 重置
  pendingToolUseSummary: nextPendingToolUseSummary,
  maxOutputTokensOverride: undefined,      // 重置
  transition: { reason: 'next_turn' },
}
state = next
// → 回到 while(true) 顶部,开始下一个 API 调用轮次

所有 continue / return 路径一览

Continue 路径(循环继续)

transition.reason触发条件状态变更要点
next_turn正常:工具执行完毕messages 追加助手+工具结果;计数器重置
collapse_drain_retry413 → 上下文折叠成功messages = 折叠后的消息
reactive_compact_retry413/media → 反应式压缩成功messages = 压缩后消息;hasAttemptedReactiveCompact = true
max_output_tokens_escalate输出超限 → 升级到 64kmaxOutputTokensOverride = ESCALATED_MAX_TOKENS
max_output_tokens_recovery输出超限 → 注入续行消息messages 追加恢复提示;计数器 +1
stop_hook_blockingStop Hook 返回阻塞错误messages 追加错误消息;stopHookActive = true
token_budget_continuationToken 预算未用完messages 追加续行提示

Return 路径(循环结束)

reason触发条件
completed模型返回纯文本,Stop Hooks 通过
aborted_streaming流式期间用户中断
aborted_tools工具执行期间用户中断
blocking_limitToken 数到硬阻塞限制(auto-compact 关闭时)
prompt_too_long413 且所有恢复手段都失败
image_error图片大小/格式错误
model_error模型调用抛出异常
max_turns达到 maxTurns 限制
hook_stopped工具执行 Hook 阻止继续
stop_hook_preventedStop Hook 主动终止

流式工具执行详解

StreamingToolExecutor

核心数据结构是一个 TrackedTool[] 队列,每个元素的状态机:

queued → executing → completed → yielded

并发控制规则:

  • isConcurrencySafe = true 的工具(Read、Glob、Grep 等只读工具)可以并行
  • isConcurrencySafe = false 的工具(Bash、Edit 等写入工具)必须独占执行
  • Bash 工具错误会触发 siblingAbortController,取消同批次的兄弟进程(不影响主 abortController

结果顺序保证: 结果按添加顺序(不是完成顺序)yield。getCompletedResults() 从队列头部开始,只有当头部工具完成时才 yield,保证 API 看到的 tool_result 顺序与 tool_use 一致。

流式 vs 非流式对比

StreamingToolExecutorrunTools()
执行时机API 流式期间就开始API 流式结束后开始
并发策略按添加顺序逐个评估预分区:连续只读块并行,非只读块串行
控制方式Statsig gate默认 fallback
中断处理内部生成合成 tool_result外部 yieldMissingToolResultBlocks

消息 Withhold 机制

Agent Loop 中一个精巧的设计是 withhold(暂扣):可恢复的错误消息在流式循环中被收集但不 yield,直到恢复决策阶段才决定是重试还是暴露给用户。

流式循环:
  收到 413 错误消息 → withheld = true → 不 yield → push 到 assistantMessages

恢复决策:
  if (collapse drain 成功) → continue → 错误消息被丢弃(下一轮会得到新响应)
  if (reactive compact 成功) → continue → 同上
  if (所有恢复都失败) → yield lastMessage → return → 用户看到错误

三种可暂扣的错误:

  1. Prompt Too Long (413) — 被 contextCollapse.isWithheldPromptTooLong()reactiveCompact.isWithheldPromptTooLong() 检测
  2. Media Size Error — 被 reactiveCompact.isWithheldMediaSizeError() 检测(需 gate 开启)
  3. Max Output Tokens — 被 isWithheldMaxOutputTokens() 检测

暂扣和恢复必须在同一 gate 下——如果暂扣了但没有对应的恢复路径,消息就丢失了。代码中通过在 turn 开始时预先快照 gate 值(mediaRecoveryEnabled)来保证一致性。

预取与异步优化

Agent Loop 利用模型 streaming 的等待时间(通常 5-30 秒)做了多项预取:

预取项启动时机消费时机失败策略
Memory 相关文件循环入口(startRelevantMemoryPrefetch工具执行后未 settle → 跳过,下一轮重试
Skill Discovery每轮迭代开始工具执行后无匹配 → 无附件
Tool Use Summary上一轮工具完毕时触发 (Haiku ~1s)下一轮 streaming 结束后catch → null

pendingMemoryPrefetch 使用 using 语法(TC39 Explicit Resource Management),在生成器任何退出路径上自动 dispose。

防死循环机制

Agent Loop 是一个 while(true),防止无限循环是关键。以下是各种保护措施:

机制保护什么
hasAttemptedReactiveCompactReactive Compact 只尝试一次
state.transition?.reason 检查Collapse Drain 如果上一轮已经 drain 过不再尝试
maxOutputTokensRecoveryCount ≤ 3Max Output Tokens 恢复最多 3 轮
maxTurns外部配置的硬限制
abortController用户随时可以中断
Token Budget diminishing returns连续产出过低时自动停止
attemptWithFallback 一次性标志Fallback 最多触发一次
Stop Hook hasAttemptedReactiveCompact 保留Stop Hook blocking 重试不会重置 compact guard

端到端示例:一次典型对话轮次

用户输入:"帮我把 utils.js 中的 formatDate 函数改成使用 dayjs"

轮次 1:
  ② 消息预处理 (无变化,消息量少)
  ③ 自动压缩 (跳过,token 数未超阈值)
  ⑥ API 调用 → 模型返回:
     - text: "让我先看一下这个文件"
     - tool_use: Read("utils.js")
  ⑩ StreamingToolExecutor 立即执行 Read (只读,并行安全)
  ⑪ state = { messages: [user, assistant, tool_result], transition: { reason: 'next_turn' } }
  → continue

轮次 2:
  ⑥ API 调用 → 模型返回:
     - text: "找到了,我来修改..."
     - tool_use: Edit("utils.js", old, new)
  ⑩ 执行 Edit → 权限检查 → 用户批准 → 执行
  ⑪ state 更新 → continue

轮次 3:
  ⑥ API 调用 → 模型返回:
     - text: "已完成修改。formatDate 现在使用 dayjs..."
     (纯文本,无 tool_use)
  ⑧ needsFollowUp = false
  ⑨ Stop Hooks 通过,Token Budget 无需续行
  → return { reason: 'completed' }

与子代理的关系

子代理(通过 AgentTool 启动)运行独立的 query() 调用,拥有独立的 toolUseContext(含独立的 agentIdabortController)。但它们共享进程级资源:

  • 消息队列 (messageQueueManager) — 按 agentId 隔离
  • MCP 连接 — 共享,但工具列表独立刷新
  • Token Budget — 子代理的 checkTokenBudgetagentId !== undefined 直接返回 stop(不支持子代理续行)
  • Stop Hooks — 子代理运行 SubagentStop,不运行 Stop

相关章节