本页介绍 Deep Agents 特有的 streaming 关注点,最重要的是通过 stream.subagents 从 delegated subagents 进行 streaming。对于通用 agent streaming(stream.messagesstream.values、tool calls、custom updates),请参阅 LangChain Event Streaming

Stream subagents

Deep Agents 在 LangGraph streaming 之上添加了 subagent projection。当你希望每个 delegated task call 都有一个 stream handle 时,请使用 stream.subagents。该 projection 很轻量:它会先发现 subagent tasks,只有当你在 subagent handle 上访问 message、tool-call 和 value streams 时才会打开它们。 每个 handle 的 name 都是 sub-agent 配置的 name:也就是 coordinator 调用 task tool 时传入的 subagent_type。Deep Agents 会将该 name 绑定到 delegated run,因此你在 subagent specs 中定义的同一 label,就是你在 stream 中用于 filter 和 route 的 label。
stream = agent.stream_events({
    "messages": [{"role": "user", "content": "Write me a haiku about the sea"}],
}, version="v3")

for subagent in stream.subagents:
    print(subagent.name, subagent.path, subagent.status)

    for message in subagent.messages:
        print(message.text)

Subagent stream fields

每个 subagent stream 都会暴露与 parent run 相同类型的 projections,例如 messages、tool calls、nested subagents 和 final output。关于通用 parent-run streaming model,请参阅 LangChain Event Streaming Python 使用 tool_calls 等 snake_case projection names。每个 subagent stream 都可以暴露 .messages.tool_calls.values.subagents.output
FieldDescription
nameSub-agent name,来自 coordinator 在其 task call 中选择的 subagent_type
messagesSubagent emitted messages。
subagentsNested subagent invocations。
outputFinal subagent state,或 delegated task 的 completion signal。
pathSubagent stream 的 namespace path。
statusLifecycle status,例如 startedcompletedfailedinterrupted
tool_calls作用域限定到 subagent 的 tool calls。

跟踪 subagent lifecycle

当你只需要显示哪些 subagents started 和 finished 时,请使用 stream.subagents。除非访问单个 subagent 上的这些 projections,否则不需要订阅 message 或 value streams。
stream = agent.stream_events(input, version="v3")

running = 0
completed = 0
failed = 0

for subagent in stream.subagents:
    running += 1
    print(f"{subagent.name}: started")

    try:
        _ = subagent.output
        running -= 1
        completed += 1
        print(f"{subagent.name}: completed")
    except Exception:
        running -= 1
        failed += 1
        print(f"{subagent.name}: failed")

Stream messages

Deep Agents 可以从 coordinator agent 和 delegated subagents 发出 messages。对 top-level messages 使用 stream.messages,对每个 delegated subagent 使用 subagent.messages
stream = agent.stream_events(input, version="v3")

for message in stream.messages:
    print("[coordinator]", message.text)

for subagent in stream.subagents:
    for message in subagent.messages:
        print(f"[{subagent.name}]", message.text)

Stream tool calls

Deep Agents 会在 agent tree 的每一层暴露 tool calls。对 coordinator tools 使用 top-level stream.tool_calls,对 delegated work 使用每个 subagent.tool_calls
stream = agent.stream_events(input, version="v3")

for call in stream.tool_calls:
    print("[coordinator tool]", call.tool_name, call.input)
    print(call.completed, call.error)

for subagent in stream.subagents:
    for call in subagent.tool_calls:
        print(f"[{subagent.name} tool]", call.tool_name, call.input)
        for delta in call.output_deltas:
            print(delta, end="", flush=True)

        if call.completed and call.error is None:
            print(call.output)
        elif call.error is not None:
            print(call.error)

Stream nested work

你可以递归进入 subagent stream,以观察 nested subagents、messages 和 tool calls。
stream = agent.stream_events(input, version="v3")

for subagent in stream.subagents:
    print(f"subagent {subagent.name}: {subagent.status}")

    for tool_call in subagent.tool_calls:
        print(f"{tool_call.tool_name}({tool_call.input})")
        for delta in tool_call.output_deltas:
            print(delta, end="", flush=True)

    for nested in subagent.subagents:
        print(f"nested subagent {nested.name}: {nested.status}")

并发消费

Coordinator 和 subagent output 经常交错。当需要 live UI updates 时,请并发消费 projections。 在 async code 中进行 concurrent consumption 时,请将 astream_eventsasyncio.gather 一起使用:
import asyncio

stream = await agent.astream_events(input, version="v3")

async def consume_coordinator():
    async for message in stream.messages:
        print("[coordinator]", await message.text)

async def consume_subagents():
    async for subagent in stream.subagents:
        async for message in subagent.messages:
            print(f"[{subagent.name}]", await message.text)

await asyncio.gather(consume_coordinator(), consume_subagents())
对于 synchronous code,请改用 stream.interleave(...)
stream = agent.stream_events(input, version="v3")

for name, item in stream.interleave("messages", "subagents"):
    if name == "messages":
        print("[coordinator]", item.text)
    else:
        for message in item.messages:
            print(f"[{item.name}]", message.text)
当你需要 coordinator 和所有 subagents 之间的精确 arrival order 时,请遍历 raw protocol events,并使用 namespace 识别来源:
stream = agent.stream_events(input, version="v3")

for event in stream:
    if event.get("method") != "messages":
        continue

    payload = event["params"]["data"][0]
    if not isinstance(payload, dict):
        continue
    if payload.get("event") != "content-block-delta":
        continue

    block = payload.get("delta") or {}
    if block.get("type") == "text-delta":
        source = "subagent" if event["params"]["namespace"] else "coordinator"
        print(f"[{source}] {block['text']}")

Subagents 与 subgraphs

stream.subgraphs 显示 graph execution structure。stream.subagents 显示 product-level Deep Agents task delegations。对于 user-facing UI,请使用 stream.subagents,因为它会隐藏 internal graph nodes,并直接暴露 subagent concept。

相关