LangChain 代理构建在 LangGraph 之上,因此支持同一套流式传输栈,并为代理场景提供面向消息、工具调用、状态和自定义更新的 projections。
对于大多数应用和前端用例,请通过 stream_events(..., version="v3") 使用 Event Streaming。Event Streaming 会返回带类型化 projections 的运行对象,因此可以独立消费每个 projection,而不需要解析 stream-mode 元组。
from langchain.agents import create_agent
def get_weather(city: str) -> str:
"""Get weather for a city."""
return f"It's always sunny in {city}!"
agent = create_agent(
model="gpt-5-nano",
tools=[get_weather],
)
stream = agent.stream_events({
"messages": [{"role": "user", "content": "What is the weather in SF?"}],
}, version="v3")
for message in stream.messages:
for delta in message.text:
print(delta, end="", flush=True)
final_state = stream.output
可以流式传输什么
| Projection | 用途 |
|---|
for event in stream | 带完整 envelope 的原始协议事件,并可访问每个 channel。 |
stream.messages | 模型消息流,每次 LLM 调用一个。 |
message.text | 消息的文本 deltas 和最终文本。 |
message.reasoning | 暴露 reasoning content 的模型的 reasoning deltas。 |
message.tool_calls | 工具调用参数 chunks 和最终工具调用。 |
message.output | 模型调用完成后的最终消息对象。 |
stream.values | 代理状态快照。 |
stream.output | 最终代理状态。 |
stream.subgraphs | 嵌套图运行(sub-agents 和普通 subgraphs)。 |
stream.extensions | 自定义 transformer projections。 |
stream.tool_calls | 工具执行生命周期、输入、输出 deltas、最终输出和错误。 |
stream.messages 会产生 ChatModelStream 对象。每个消息流暴露 .text、.reasoning、.tool_calls 和 .output。同步 projections 可迭代以获取实时 deltas,也可耗尽以获取最终值:使用 str(message.text) 获取最终文本,使用 message.tool_calls.get() 获取最终工具调用。
代理消息
当你想获取每次 LLM 调用的模型输出时,使用 stream.messages。
stream = agent.stream_events(input, version="v3")
for message in stream.messages:
print(f"[{message.node}] ", end="")
for delta in message.text:
print(delta, end="", flush=True)
full_message = message.output
usage = full_message.usage_metadata
if usage:
print(usage)
message.output 会给出最终 AI 消息,包括 provider 特定的 content blocks。在 TypeScript 中,当你只需要 token 计数或其他 usage metadata 时使用 message.usage;在 Python 中,从 message.output.usage_metadata 读取用量。
Reasoning content
Reasoning content 使用与文本内容相同的形状,但只有所选模型发出 reasoning blocks 时才可用。
stream = agent.stream_events(input, version="v3")
for message in stream.messages:
for delta in message.reasoning:
print(f"[thinking] {delta}", end="", flush=True)
for delta in message.text:
print(delta, end="", flush=True)
模型配置详情请参阅 reasoning guide 和你的 provider 集成页。
工具调用
有两个有用的工具调用 projections:
message.tool_calls 会在模型生成工具调用时流式传输工具调用参数 chunks。
stream.tool_calls 会在工具调用开始后流式传输工具执行生命周期。
stream = agent.stream_events(input, version="v3")
for message in stream.messages:
for chunk in message.tool_calls:
print(f"tool call chunk: {chunk}")
finalized = message.tool_calls.get()
if finalized:
print(f"finalized tool calls: {finalized}")
for call in stream.tool_calls:
print(f"{call.tool_name}({call.input})")
for delta in call.output_deltas:
print(delta, end="", flush=True)
print(call.output, call.error)
流式传输 sub-agents
当一个 create_agent 调用另一个具名 create_agent(通常通过包装工具)时,内部代理的事件会流入嵌套 namespace。传给 create_agent 的 name= 会在流中标识该内部代理,因此你可以按代理筛选和标记。
具名 sub-agents 会出现在专用的 stream.subagents projection 上。每个 handle 会暴露内部代理自己的 .messages、.values、.tool_calls 和 .output,以及 .name(你传入的 name=)和 .cause(分发该 sub-agent 的工具调用)。因为这里只有具名 create_agent 运行,所以你不需要过滤普通 subgraphs。
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
def get_weather(city: str) -> str:
"""Get weather for a given city."""
return f"It's always sunny in {city}!"
weather_agent = create_agent(
model=init_chat_model("openai:gpt-5.4"),
tools=[get_weather],
name="weather_agent",
)
def call_weather(query: str) -> str:
"""Query the weather agent."""
result = weather_agent.invoke({"messages": [{"role": "user", "content": query}]})
return result["messages"][-1].text
supervisor = create_agent(
model=init_chat_model("openai:gpt-5.4"),
tools=[call_weather],
name="supervisor",
)
stream = supervisor.stream_events(
{"messages": [{"role": "user", "content": "What's the weather in Boston?"}]},
version="v3",
)
for subagent in stream.subagents:
print(f"{subagent.name}: ", end="")
for message in subagent.messages:
for token in message.text:
print(token, end="", flush=True)
print()
从工具调用的普通 StateGraph subgraphs 也会出现在 stream.subgraphs 上,请在 .compile(name=...) 上设置 name=,以便在 subagent.graph_name 中获得标签。
stream.subagents 是具名 create_agent sub-agents 的聚焦视图,而 stream.subgraphs 覆盖每个嵌套图。使用与你的 UI 匹配的那个。
状态和最终输出
使用 stream.values 获取状态快照,使用 stream.output 获取最终代理状态。
stream = agent.stream_events(input, version="v3")
for snapshot in stream.values:
print(snapshot)
final_state = stream.output
多个 projections
要在异步代码中并发消费,请将 astream_events 与 asyncio.gather 搭配使用:
import asyncio
stream = await agent.astream_events(input, version="v3")
async def consume_messages():
async for message in stream.messages:
print(await message.text)
async def consume_tool_calls():
async for call in stream.tool_calls:
print(call.tool_name, call.input)
await asyncio.gather(consume_messages(), consume_tool_calls())
对于同步代码,请改用 stream.interleave(...):
stream = agent.stream_events(input, version="v3")
for name, item in stream.interleave("messages", "tool_calls", "values"):
if name == "messages":
print(item.text)
elif name == "tool_calls":
print(item.tool_name, item.input)
elif name == "values":
print(item)
要访问未作为类型化 projections 暴露的 channels,或检查完整 event envelope,请迭代原始协议事件:
for event in stream:
print(event["method"], event["params"]["namespace"], event["params"]["data"])
自定义更新
当应用需要内置之外的 projection 时,使用自定义 stream transformers,例如检索进度、artifacts 或领域特定事件。
stream = agent.stream_events(
input,
version="v3",
transformers=[ToolActivityTransformer],
)
for activity in stream.extensions["tool_activity"]:
print(activity)
Middleware 注册的 transformers 需要 langchain>=1.3.2。
Middleware 可以在 hooks 和 tools 旁声明 stream transformer factories。不同语言中的 factory 形状不同:
将 AgentMiddleware 子类上的 transformers 属性设置为一组 factories。每个 factory 的形状为 Callable[[tuple[str, ...]], StreamTransformer],并以 factory(scope) 的方式调用,其中 scope 是 mini-mux scope tuple(根 mux 为 (),subgraphs 为非空)。每次调用返回一个新的 transformer 可以让每个 subgraph 保持隔离。
from langchain.agents import create_agent
from langchain.agents.middleware import AgentMiddleware
class ToolActivityMiddleware(AgentMiddleware):
transformers = (ToolActivityTransformer,)
agent = create_agent(
model="gpt-5-nano",
tools=[get_weather],
middleware=[ToolActivityMiddleware()],
)
编译时,create_agent 会将 middleware 注册的 factories 与传给自身 transformers= 参数的任何内容合并。编译后图上的最终顺序为:
- 内置
ToolCallTransformer。
- Middleware 注册的 factories,按 middleware 顺序排列。
- 调用方从
create_agent 提供的 transformers=。
这样会让内置工具调用 projection 排在消费者 transformers 之前,并让调用方提供的条目拥有最终优先权。
内置 PIIMiddleware 使用此 hook 从流式 wire 输出中遮盖 PII。设置 apply_to_output=True 后,它注册的 transformer 会在内容离开运行之前,从文本 deltas、工具调用 args、工具输出和状态快照中清除检测到的 PII,关闭 after_model 状态级遮盖原本可能让原始 PII 泄露给 stream_events(version="v3") 实时读取者的窗口。
from langchain.agents import create_agent
from langchain.agents.middleware import PIIMiddleware
agent = create_agent(
model="gpt-5-nano",
tools=[],
middleware=[
PIIMiddleware("email", strategy="redact", apply_to_output=True),
],
)
完整配置面请参阅 PII detection。
Transformer contract 请参阅 Build your own projection。