通过实现 hooks 构建自定义 middleware,这些 hooks 会在 agent 执行流的特定点运行。

Hooks

Middleware 提供两种 hook 风格来拦截 agent 执行:

Node-style hooks

在特定执行点按顺序运行。

Wrap-style hooks

围绕每次模型或工具调用运行。

Node-style hooks

在特定执行点按顺序运行。用于 logging、validation 和 state updates。 选择 middleware 所需的 hooks。你可以在 node-style hooks 和 wrap-style hooks 之间选择。 Node-style hooks 会在特定执行点运行:
Hook何时运行
before_agentAgent 启动前(每次 invocation 一次)
before_model每次模型调用前
after_model每次模型响应后
after_agentAgent 完成后(每次 invocation 一次)
Wrap-style hooks 会围绕每次调用运行,让你控制执行:
Hook何时运行
wrap_model_call围绕每次模型调用
wrap_tool_call围绕每次工具调用
示例:
from langchain.agents.middleware import before_model, after_model, AgentState
from langchain.messages import AIMessage
from langgraph.runtime import Runtime
from typing import Any


@before_model(can_jump_to=["end"])
def check_message_limit(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    if len(state["messages"]) >= 50:
        return {
            "messages": [AIMessage("Conversation limit reached.")],
            "jump_to": "end"
        }
    return None

@after_model
def log_response(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    print(f"Model returned: {state['messages'][-1].content}")
    return None

Wrap-style hooks

拦截执行并控制何时调用 handler。用于 retries、caching 和 transformation。 你可以决定 handler 调用零次(短路)、一次(正常流程)或多次(重试逻辑)。 可用 hooks:
  • wrap_model_call:围绕每次模型调用
  • wrap_tool_call:围绕每次工具调用
示例:
from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse
from typing import Callable


@wrap_model_call
def retry_model(
    request: ModelRequest,
    handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
    for attempt in range(3):
        try:
            return handler(request)
        except Exception as e:
            if attempt == 2:
                raise
            print(f"Retry {attempt + 1}/3 after error: {e}")

State updates

Node-style 和 wrap-style hooks 都可以更新 agent state。机制有所不同:
  • Node-style hooksbefore_agentbefore_modelafter_modelafter_agent):直接返回 dict。该 dict 会使用 graph 的 reducers 应用到 agent state。
  • Wrap-style hookswrap_model_callwrap_tool_call):对于模型调用,返回带有 CommandExtendedModelResponse,以便在模型响应旁边注入 state updates。对于工具调用,直接返回 Command。当你需要根据模型或工具调用期间运行的逻辑来追踪或更新 state 时使用它们,例如摘要触发点、usage metadata,或根据 request/response 计算出的自定义字段。

Node-style hooks

从 node-style hook 返回 dict,将更新合并到 agent state 中。Dict keys 会映射到 state fields。
from langchain.agents.middleware import after_model, AgentState
from langgraph.runtime import Runtime
from typing import Any
from typing_extensions import NotRequired


class TrackingState(AgentState):
    model_call_count: NotRequired[int]


@after_model(state_schema=TrackingState)
def increment_after_model(state: TrackingState, runtime: Runtime) -> dict[str, Any] | None:
    return {"model_call_count": state.get("model_call_count", 0) + 1}

Wrap-style hooks

wrap_model_call 返回带有 CommandExtendedModelResponse,从模型调用层注入 state updates:
from typing import Callable
from langchain.agents.middleware import (
    wrap_model_call,
    ModelRequest,
    ModelResponse,
    AgentState,
    ExtendedModelResponse
)
from langgraph.types import Command
from typing_extensions import NotRequired

class UsageTrackingState(AgentState):
    """Agent state with token usage tracking."""

    last_model_call_tokens: NotRequired[int]


@wrap_model_call(state_schema=UsageTrackingState)
def track_usage(
    request: ModelRequest,
    handler: Callable[[ModelRequest], ModelResponse],
) -> ExtendedModelResponse:
    response = handler(request)
    return ExtendedModelResponse(
        model_response=response,
        command=Command(update={"last_model_call_tokens": 150}),
    )
Command 会流经 graph 的 reducers,因此更新会正确应用,messages 会追加而不是替换现有 state。

Composition with multiple middleware

当多个 middleware 层返回 ExtendedModelResponse 时,它们的 commands 会组合:
  • Commands 通过 reducers 应用: 每个 Command 都会成为单独的 state update。对 messages 来说,这意味着它们会追加。
  • 冲突时外层优先: 对于非 reducer state fields,commands 会先应用内层,再应用外层。冲突 keys 上最外层 middleware 的值优先。
  • 重试安全: 如果外层 middleware 实现了可能多次调用 handler() 的逻辑(例如 retry logic),较早调用产生的 commands 会被丢弃。
from typing import Annotated, Callable

from langchain.agents.middleware import (
    AgentMiddleware,
    AgentState,
    ExtendedModelResponse,
    ModelRequest,
    ModelResponse,
)
from langchain.messages import SystemMessage
from langgraph.types import Command
from typing_extensions import NotRequired


def _last_wins(_a: str, b: str) -> str:
    """Reducer: last writer wins (outer overwrites inner)."""
    return b


class CustomMiddlewareState(AgentState):
    """Agent state: trace_layer uses last-wins (outer wins), messages use additive reducer."""

    # Non-reducer field with last-wins: both middleware write; outermost value wins
    trace_layer: NotRequired[Annotated[str, _last_wins]]


class OuterMiddleware(AgentMiddleware):
    def wrap_model_call(
        self,
        request: ModelRequest,
        handler: Callable[[ModelRequest], ModelResponse],
    ) -> ExtendedModelResponse:
        response = handler(request)
        return ExtendedModelResponse(
            model_response=response,
            command=Command(update={
                "trace_layer": "outer",
                "messages": [SystemMessage(content="[Outer ran]")],
            }),
        )


class InnerMiddleware(AgentMiddleware):
    """Adds trace_layer and message. Outer adds to same keys; trace_layer: outer wins, messages: additive."""

    def wrap_model_call(
        self,
        request: ModelRequest,
        handler: Callable[[ModelRequest], ModelResponse],
    ):
        response = handler(request)
        return ExtendedModelResponse(
            model_response=response,
            command=Command(update={
                "trace_layer": "inner",
                "messages": [SystemMessage(content="[Inner ran]")],
            }),
        )

Create middleware

可以通过两种方式创建 middleware:

Decorator-based middleware

对单 hook middleware 来说快速而简单。使用 decorators 包装单个函数。

Class-based middleware

对具有多个 hooks 或配置的复杂 middleware 更强大。

Decorator-based middleware

对单 hook middleware 来说快速而简单。使用 decorators 包装单个函数。 可用 decorators: Node-style: Wrap-style: Convenience: 示例:
from langchain.agents.middleware import (
    before_model,
    wrap_model_call,
    AgentState,
    ModelRequest,
    ModelResponse,
)
from langchain.agents import create_agent
from langgraph.runtime import Runtime
from typing import Any, Callable


@before_model
def log_before_model(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    print(f"About to call model with {len(state['messages'])} messages")
    return None

@wrap_model_call
def retry_model(
    request: ModelRequest,
    handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
    for attempt in range(3):
        try:
            return handler(request)
        except Exception as e:
            if attempt == 2:
                raise
            print(f"Retry {attempt + 1}/3 after error: {e}")

agent = create_agent(
    model="gpt-5.4",
    middleware=[log_before_model, retry_model],
    tools=[...],
)
何时使用 decorators:
  • 只需要单个 hook
  • 没有复杂配置
  • 快速原型开发

Class-based middleware

对具有多个 hooks 或配置的复杂 middleware 更强大。当需要为同一个 hook 定义同步和异步实现,或希望在单个 middleware 中组合多个 hooks 时,请使用 classes。 python AgentMiddleware 子类可以声明三个 class attributes,agent factory 会在 compile time 读取它们:
  • state_schema:使用自定义字段扩展 agent state。请参阅 Custom state schema
  • tools:注册随 middleware 提供的额外 tools,例如 to-do list middleware 上的 write_todos
  • transformers:注册 scope-aware stream transformer factories。请参阅 Custom stream transformers。 :::
示例:
from langchain.agents.middleware import (
    AgentMiddleware,
    AgentState,
    ModelRequest,
    ModelResponse,
)
from langgraph.runtime import Runtime
from typing import Any, Callable

class LoggingMiddleware(AgentMiddleware):
    def before_model(self, state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
        print(f"About to call model with {len(state['messages'])} messages")
        return None

    def after_model(self, state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
        print(f"Model returned: {state['messages'][-1].content}")
        return None

    async def abefore_model(
        self, state: AgentState, runtime: Runtime
    ) -> dict[str, Any] | None:
        # Async version of before_model
        return None

    async def aafter_model(
        self, state: AgentState, runtime: Runtime
    ) -> dict[str, Any] | None:
        # Async version of after_model
        print(f"Model returned: {state['messages'][-1].content}")
        return None


agent = create_agent(
    model="gpt-5.4",
    middleware=[LoggingMiddleware()],
    tools=[...],
)
何时使用 classes:
  • 为同一个 hook 同时定义同步和异步实现
  • 单个 middleware 需要多个 hooks
  • 需要复杂配置,例如可配置阈值、自定义模型
  • 需要通过初始化时配置在多个项目中复用
:::

Custom state schema

如果 middleware 需要跨 hooks 追踪 state,可以使用自定义属性扩展 agent state。这使 middleware 能够:
  • 跨执行追踪 state:维护在 agent 执行生命周期中持续存在的计数器、flags 或其他值
  • 在 hooks 之间共享数据:将信息从 before_model 传给 after_model,或在不同 middleware instances 之间传递
  • 实现横切关注点:添加 rate limiting、usage tracking、user context 或 audit logging 等功能,而不修改核心 agent 逻辑
  • 做出条件决策:使用累积 state 决定是否继续执行、跳转到不同节点,或动态修改行为
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.agents.middleware import AgentState, before_model, after_model
from typing_extensions import NotRequired
from typing import Any
from langgraph.runtime import Runtime


class CustomState(AgentState):
    model_call_count: NotRequired[int]
    user_id: NotRequired[str]


@before_model(state_schema=CustomState, can_jump_to=["end"])
def check_call_limit(state: CustomState, runtime: Runtime) -> dict[str, Any] | None:
    count = state.get("model_call_count", 0)
    if count > 10:
        return {"jump_to": "end"}
    return None


@after_model(state_schema=CustomState)
def increment_counter(state: CustomState, runtime: Runtime) -> dict[str, Any] | None:
    return {"model_call_count": state.get("model_call_count", 0) + 1}


agent = create_agent(
    model="gpt-5.4",
    middleware=[check_call_limit, increment_counter],
    tools=[],
)

# Invoke with custom state
result = agent.invoke({
    "messages": [HumanMessage("Hello")],
    "model_call_count": 0,
    "user_id": "user-123",
})

Custom stream transformers

Middleware-registered transformers 需要 langchain>=1.3.2
Middleware 可以注册 stream transformer factories,将 live agent stream 中的事件投影到类型化 extension channels。这适合在不耦合 framework 内置 projections 的情况下暴露计数器、side-channel artifacts、部分输出或 wire-level redaction。 在 compile time,middleware-registered factories 会与调用方直接传给 agent factory 的内容合并。final ordering rules 会让内置 ToolCallTransformer 保持在前面,并让调用方提供的条目排在最后。 transformers class attribute 设置为 factory callables tuple。每个 factory 形如 Callable[[tuple[str, ...]], StreamTransformer],并以 factory(scope) 调用,其中 scope 是 mini-mux scope tuple(root 为 (),subgraphs 为非空);每次调用返回新的 transformer 可以让每个 subgraph 保持隔离。
from langchain.agents import create_agent
from langchain.agents.middleware import AgentMiddleware


class ToolActivityMiddleware(AgentMiddleware):
    transformers = (ToolActivityTransformer,)


agent = create_agent(
    model="gpt-5-nano",
    tools=[...],
    middleware=[ToolActivityMiddleware()],
)
完整排序规则和 PII redaction 示例请参阅 Register transformers on middleware

Execution order

使用多个 middleware 时,需要理解它们如何执行:
agent = create_agent(
    model="gpt-5.4",
    middleware=[middleware1, middleware2, middleware3],
    tools=[...],
)
Before hooks 按顺序运行:
  1. middleware1.before_agent()
  2. middleware2.before_agent()
  3. middleware3.before_agent()
Agent loop 开始
  1. middleware1.before_model()
  2. middleware2.before_model()
  3. middleware3.before_model()
Wrap hooks 像函数调用一样嵌套:
  1. middleware1.wrap_model_call()middleware2.wrap_model_call()middleware3.wrap_model_call() → model
After hooks 按反向顺序运行:
  1. middleware3.after_model()
  2. middleware2.after_model()
  3. middleware1.after_model()
Agent loop 结束
  1. middleware3.after_agent()
  2. middleware2.after_agent()
  3. middleware1.after_agent()
关键规则:
  • before_* hooks:从前到后
  • after_* hooks:从后到前(反向)
  • wrap_* hooks:嵌套执行(第一个 middleware 包装其他所有 middleware)

Agent jumps

如需从 middleware 提前退出,请返回包含 jump_to 的 dictionary: 可用 jump targets:
  • 'end':跳到 agent execution 末尾(或第一个 after_agent hook)
  • 'tools':跳到 tools node
  • 'model':跳到 model node(或第一个 before_model hook)
from langchain.agents.middleware import after_model, hook_config, AgentState
from langchain.messages import AIMessage
from langgraph.runtime import Runtime
from typing import Any


@after_model
@hook_config(can_jump_to=["end"])
def check_for_blocked(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    last_message = state["messages"][-1]
    if "BLOCKED" in last_message.content:
        return {
            "messages": [AIMessage("I cannot respond to that request.")],
            "jump_to": "end"
        }
    return None

Best practices

  1. 保持 middleware 聚焦,每个 middleware 都应做好一件事
  2. 优雅处理 errors,不要让 middleware errors 导致 agent 崩溃
  3. Use appropriate hook types:
    • Node-style 用于顺序逻辑(logging、validation)
    • Wrap-style 用于控制流(retry、fallback、caching)
  4. 清楚记录任何自定义 state properties
  5. 集成前独立对 middleware 做 unit test
  6. 考虑执行顺序,将关键 middleware 放在列表前面
  7. 尽可能使用 built-in middleware

Examples

Dynamic prompt

在 runtime 动态修改 system prompt,以便在每次模型调用前注入 context、用户专属指令或其他信息。这是最常见的 middleware 用例之一。 使用 ModelRequest 上的 system_message 字段读取和修改 system prompt。它包含 SystemMessage 对象,即使 agent 是使用字符串 system_prompt 创建的也是如此。
from collections.abc import Callable

from langchain.agents.middleware import ModelRequest, ModelResponse, wrap_model_call
from langchain.messages import SystemMessage


@wrap_model_call
def add_context(
    request: ModelRequest,
    handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
    new_content = list(request.system_message.content_blocks) + [
        {"type": "text", "text": "Additional context."}
    ]
    new_system_message = SystemMessage(content=new_content)
    return handler(request.override(system_message=new_system_message))
  • ModelRequest.system_message 始终是 SystemMessage 对象,即使 agent 是使用 system_prompt="string" 创建的
  • 使用 SystemMessage.content_blocks 将 content 作为 blocks 列表访问,无论原始 content 是字符串还是列表
  • 修改 system messages 时,使用 content_blocks 并追加新 blocks,以保留现有结构
  • 可以将 SystemMessage 对象直接传给 create_agentsystem_prompt 参数,以支持 cache control 等高级用例

Dynamic model selection

from collections.abc import Callable

from langchain.agents.middleware import ModelRequest, ModelResponse, wrap_model_call
from langchain.chat_models import init_chat_model

complex_model = init_chat_model("claude-sonnet-4-6")
simple_model = init_chat_model("claude-haiku-4-5-20251001")


@wrap_model_call
def dynamic_model(
    request: ModelRequest,
    handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
    if len(request.messages) > 10:
        model = complex_model
    else:
        model = simple_model
    return handler(request.override(model=model))

Dynamically selecting tools

在 runtime 选择相关 tools,以提升性能和准确性。本节介绍如何过滤预注册 tools。对于在 runtime 发现的 tools(例如来自 MCP servers)的注册方式,请参阅 Runtime tool registration 收益:
  • 更短的 prompts:仅暴露相关 tools,降低复杂度
  • 更高准确性:模型从更少选项中做出正确选择
  • 权限控制:根据用户访问权限动态过滤 tools
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse
from typing import Callable


@wrap_model_call
def select_tools(
    request: ModelRequest,
    handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
    """Middleware to select relevant tools based on state/context."""
    # Select a small, relevant subset of tools based on state/context
    relevant_tools = select_relevant_tools(request.state, request.runtime)
    return handler(request.override(tools=relevant_tools))

agent = create_agent(
    model="gpt-5.4",
    tools=all_tools,  # All available tools need to be registered upfront
    middleware=[select_tools],
)

Tool call monitoring

from collections.abc import Callable

from langchain.agents.middleware import wrap_tool_call
from langchain.messages import ToolMessage
from langchain.tools.tool_node import ToolCallRequest
from langgraph.types import Command


@wrap_tool_call
def monitor_tool(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], ToolMessage | Command],
) -> ToolMessage | Command:
    print(f"Executing tool: {request.tool_call['name']}")
    print(f"Arguments: {request.tool_call['args']}")
    try:
        result = handler(request)
        print("Tool completed successfully")
        return result
    except Exception as e:
        print(f"Tool failed: {e}")
        raise

Prompt caching (Anthropic)

使用 Anthropic models 时,使用带 cache control 指令的 structured content blocks 缓存大型 system prompts:
from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse
from langchain.messages import SystemMessage
from typing import Callable


@wrap_model_call
def add_cached_context(
    request: ModelRequest,
    handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
    # Always work with content blocks
    new_content = list(request.system_message.content_blocks) + [
        {
            "type": "text",
            "text": "Here is a large document to analyze:\n\n<document>...</document>",
            # content up until this point is cached
            "cache_control": {"type": "ephemeral"}
        }
    ]

    new_system_message = SystemMessage(content=new_content)
    return handler(request.override(system_message=new_system_message))
注意事项:
  • ModelRequest.system_message 始终是 SystemMessage 对象,即使 agent 是使用 system_prompt="string" 创建的
  • 使用 SystemMessage.content_blocks 将 content 作为 blocks 列表访问,无论原始 content 是字符串还是列表
  • 修改 system messages 时,使用 content_blocks 并追加新 blocks,以保留现有结构
  • 可以将 SystemMessage 对象直接传给 create_agentsystem_prompt 参数,以支持 cache control 等高级用例
:::

Additional resources