LangChain 代理构建在 LangGraph 之上,因此支持同一套流式传输栈,并为代理场景提供面向消息、工具调用、状态和自定义更新的 projections。 对于大多数应用和前端用例,请通过 stream_events(..., version="v3") 使用 Event Streaming。Event Streaming 会返回带类型化 projections 的运行对象,因此可以独立消费每个 projection,而不需要解析 stream-mode 元组。
from langchain.agents import create_agent


def get_weather(city: str) -> str:
    """Get weather for a city."""
    return f"It's always sunny in {city}!"


agent = create_agent(
    model="gpt-5-nano",
    tools=[get_weather],
)

stream = agent.stream_events({
    "messages": [{"role": "user", "content": "What is the weather in SF?"}],
}, version="v3")

for message in stream.messages:
    for delta in message.text:
        print(delta, end="", flush=True)

final_state = stream.output

可以流式传输什么

Projection用途
for event in stream带完整 envelope 的原始协议事件,并可访问每个 channel。
stream.messages模型消息流,每次 LLM 调用一个。
message.text消息的文本 deltas 和最终文本。
message.reasoning暴露 reasoning content 的模型的 reasoning deltas。
message.tool_calls工具调用参数 chunks 和最终工具调用。
message.output模型调用完成后的最终消息对象。
stream.values代理状态快照。
stream.output最终代理状态。
stream.subgraphs嵌套图运行(sub-agents 和普通 subgraphs)。
stream.extensions自定义 transformer projections。
stream.tool_calls工具执行生命周期、输入、输出 deltas、最终输出和错误。
stream.messages 会产生 ChatModelStream 对象。每个消息流暴露 .text.reasoning.tool_calls.output。同步 projections 可迭代以获取实时 deltas,也可耗尽以获取最终值:使用 str(message.text) 获取最终文本,使用 message.tool_calls.get() 获取最终工具调用。

代理消息

当你想获取每次 LLM 调用的模型输出时,使用 stream.messages
stream = agent.stream_events(input, version="v3")

for message in stream.messages:
    print(f"[{message.node}] ", end="")
    for delta in message.text:
        print(delta, end="", flush=True)

    full_message = message.output
    usage = full_message.usage_metadata
    if usage:
        print(usage)
message.output 会给出最终 AI 消息,包括 provider 特定的 content blocks。在 TypeScript 中,当你只需要 token 计数或其他 usage metadata 时使用 message.usage;在 Python 中,从 message.output.usage_metadata 读取用量。

Reasoning content

Reasoning content 使用与文本内容相同的形状,但只有所选模型发出 reasoning blocks 时才可用。
stream = agent.stream_events(input, version="v3")

for message in stream.messages:
    for delta in message.reasoning:
        print(f"[thinking] {delta}", end="", flush=True)

    for delta in message.text:
        print(delta, end="", flush=True)
模型配置详情请参阅 reasoning guide 和你的 provider 集成页。

工具调用

有两个有用的工具调用 projections:
  • message.tool_calls 会在模型生成工具调用时流式传输工具调用参数 chunks。
  • stream.tool_calls 会在工具调用开始后流式传输工具执行生命周期。
stream = agent.stream_events(input, version="v3")

for message in stream.messages:
    for chunk in message.tool_calls:
        print(f"tool call chunk: {chunk}")

    finalized = message.tool_calls.get()
    if finalized:
        print(f"finalized tool calls: {finalized}")

for call in stream.tool_calls:
    print(f"{call.tool_name}({call.input})")
    for delta in call.output_deltas:
        print(delta, end="", flush=True)
    print(call.output, call.error)

流式传输 sub-agents

当一个 create_agent 调用另一个具名 create_agent(通常通过包装工具)时,内部代理的事件会流入嵌套 namespace。传给 create_agentname= 会在流中标识该内部代理,因此你可以按代理筛选和标记。 具名 sub-agents 会出现在专用的 stream.subagents projection 上。每个 handle 会暴露内部代理自己的 .messages.values.tool_calls.output,以及 .name(你传入的 name=)和 .cause(分发该 sub-agent 的工具调用)。因为这里只有具名 create_agent 运行,所以你不需要过滤普通 subgraphs。
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model


def get_weather(city: str) -> str:
    """Get weather for a given city."""
    return f"It's always sunny in {city}!"


weather_agent = create_agent(
    model=init_chat_model("openai:gpt-5.4"),
    tools=[get_weather],
    name="weather_agent",
)


def call_weather(query: str) -> str:
    """Query the weather agent."""
    result = weather_agent.invoke({"messages": [{"role": "user", "content": query}]})
    return result["messages"][-1].text


supervisor = create_agent(
    model=init_chat_model("openai:gpt-5.4"),
    tools=[call_weather],
    name="supervisor",
)

stream = supervisor.stream_events(
    {"messages": [{"role": "user", "content": "What's the weather in Boston?"}]},
    version="v3",
)

for subagent in stream.subagents:
    print(f"{subagent.name}: ", end="")
    for message in subagent.messages:
        for token in message.text:
            print(token, end="", flush=True)
    print()
从工具调用的普通 StateGraph subgraphs 也会出现在 stream.subgraphs 上,请在 .compile(name=...) 上设置 name=,以便在 subagent.graph_name 中获得标签。 stream.subagents 是具名 create_agent sub-agents 的聚焦视图,而 stream.subgraphs 覆盖每个嵌套图。使用与你的 UI 匹配的那个。

状态和最终输出

使用 stream.values 获取状态快照,使用 stream.output 获取最终代理状态。
stream = agent.stream_events(input, version="v3")

for snapshot in stream.values:
    print(snapshot)

final_state = stream.output

多个 projections

要在异步代码中并发消费,请将 astream_eventsasyncio.gather 搭配使用:
import asyncio

stream = await agent.astream_events(input, version="v3")

async def consume_messages():
    async for message in stream.messages:
        print(await message.text)

async def consume_tool_calls():
    async for call in stream.tool_calls:
        print(call.tool_name, call.input)

await asyncio.gather(consume_messages(), consume_tool_calls())
对于同步代码,请改用 stream.interleave(...)
stream = agent.stream_events(input, version="v3")

for name, item in stream.interleave("messages", "tool_calls", "values"):
    if name == "messages":
        print(item.text)
    elif name == "tool_calls":
        print(item.tool_name, item.input)
    elif name == "values":
        print(item)
要访问未作为类型化 projections 暴露的 channels,或检查完整 event envelope,请迭代原始协议事件:
for event in stream:
    print(event["method"], event["params"]["namespace"], event["params"]["data"])

自定义更新

当应用需要内置之外的 projection 时,使用自定义 stream transformers,例如检索进度、artifacts 或领域特定事件。
stream = agent.stream_events(
    input,
    version="v3",
    transformers=[ToolActivityTransformer],
)

for activity in stream.extensions["tool_activity"]:
    print(activity)

在 middleware 上注册 transformers

Middleware 注册的 transformers 需要 langchain>=1.3.2
Middleware 可以在 hooks 和 tools 旁声明 stream transformer factories。不同语言中的 factory 形状不同: AgentMiddleware 子类上的 transformers 属性设置为一组 factories。每个 factory 的形状为 Callable[[tuple[str, ...]], StreamTransformer],并以 factory(scope) 的方式调用,其中 scope 是 mini-mux scope tuple(根 mux 为 (),subgraphs 为非空)。每次调用返回一个新的 transformer 可以让每个 subgraph 保持隔离。
from langchain.agents import create_agent
from langchain.agents.middleware import AgentMiddleware


class ToolActivityMiddleware(AgentMiddleware):
    transformers = (ToolActivityTransformer,)


agent = create_agent(
    model="gpt-5-nano",
    tools=[get_weather],
    middleware=[ToolActivityMiddleware()],
)
编译时,create_agent 会将 middleware 注册的 factories 与传给自身 transformers= 参数的任何内容合并。编译后图上的最终顺序为:
  1. 内置 ToolCallTransformer
  2. Middleware 注册的 factories,按 middleware 顺序排列。
  3. 调用方从 create_agent 提供的 transformers=
这样会让内置工具调用 projection 排在消费者 transformers 之前,并让调用方提供的条目拥有最终优先权。 内置 PIIMiddleware 使用此 hook 从流式 wire 输出中遮盖 PII。设置 apply_to_output=True 后,它注册的 transformer 会在内容离开运行之前,从文本 deltas、工具调用 args、工具输出和状态快照中清除检测到的 PII,关闭 after_model 状态级遮盖原本可能让原始 PII 泄露给 stream_events(version="v3") 实时读取者的窗口。
from langchain.agents import create_agent
from langchain.agents.middleware import PIIMiddleware

agent = create_agent(
    model="gpt-5-nano",
    tools=[],
    middleware=[
        PIIMiddleware("email", strategy="redact", apply_to_output=True),
    ],
)
完整配置面请参阅 PII detection Transformer contract 请参阅 Build your own projection

相关