本 guide 介绍如何将 deep agent 从 local prototype 推向 production deployment。它会逐步讲解如何 scope memory、configure execution environments、add guardrails,以及 connect frontend。

Overview

Agents 使用 memory 和 execution environment 中的信息完成 tasks。 在 production 中,有几个 primitives 会决定 information 如何共享和访问:
  • Thread:单次 conversation。默认情况下,message history 和 scratch files 限定在 thread 内,不会 carry over。
  • User:与你的 agent 交互的人。Memory 和 files 可以对单个 user 私有,也可以跨 users 共享。Identity 和 authorization 来自你的 auth layer
  • Assistant:已配置的 agent instance。Memory 和 files 可以绑定到一个 assistant,也可以跨所有 assistants 共享。
本页涵盖:

LangSmith Deployments

Managed Deep Agents packages your agent configuration, tools, and runtime settings for LangSmith 将 Deep Agent 推向 production 的推荐路径是 Managed Deep Agents,这是一个 API-first hosted runtime,用于在 LangSmith 中创建、运行和运营 deep agents。Managed Deep Agents 当前处于 private preview(join the waitlist)。对于需要 custom application code、custom routes、advanced authentication 或完整 Agent Server APIs 的 teams,你可以直接配置 LangSmith Deployment。任一路径都会 provision agent 所需 infrastructure:threadsruns、store 和 checkpointer,因此你不需要自行设置。传统 LangSmith Deployment 还会开箱提供 authenticationwebhookscron jobsobservability,并可通过 MCPA2A 暴露 agent。
LangSmith Cloud deployments 会自动将 traces 发送到以 deployment 命名的 project。打开 LangSmith 可 debug runs 并 monitor usage。Hybrid 或 self-hosted setups 请参阅 LangSmith tracing。建议同时设置 LangSmith Engine,它会监控 traces、检测 issues,并提出 fixes。
除非另有说明,本页所有 code snippets 都使用以下 langgraph.json
langgraph.json
{
  "dependencies": ["."],
  "graphs": {
    "agent": "./agent.py:agent"
  },
  "env": ".env"
}
langgraph.json 是告诉 LangGraph platform 如何 build 和 run 你的 application 的 configuration file。它位于 project root,并且 local development(使用 langgraph dev)和 production deployment 都需要它。关键 fields 如下:
FieldDescription
dependencies要 install 的 packages。["."] 会将当前 directory 作为 package 安装(读取 requirements.txtpyproject.tomlpackage.json)。
graphs将 graph IDs 映射到其 code locations。每个 entry 都是 "<id>": "./<file>:<variable>",其中 <id> 是你通过 API invoke graph 时使用的名称,<variable> 是从 <file> export 的 compiled graph 或 constructor function。
env包含 environment variables(API keys、secrets)的 .env file path。这些 variables 会在 build time 设置,并在 runtime 可用。
完整 configuration options(custom Docker steps、store indexing、auth handlers 等)请参阅 application structure

Production considerations

Invoking the agent

在 production 中,每次 invocation 都应携带两个 run-level parameters:
  • thread_id(通过 config={"configurable": {"thread_id": ...}} 传入):conversation 的 stable identifier。checkpointer 使用它持久化并 resume message history,因此 follow-up turns 会延续同一个 conversation。生成新的 thread_id 可以开始 fresh conversation。
  • context:你的 tools 和 middleware 在 invocation time 读取的 per-run data,例如 user_id、API keys、feature flags 或 session metadata。使用 context_schema 定义 shape,并通过 runtime.context 访问。请参阅 Runtime context
二者相互独立,并且几乎总是一起传入:
from dataclasses import dataclass

from deepagents import create_deep_agent
from langchain_core.utils.uuid import uuid7


@dataclass
class Context:
    user_id: str


agent = create_deep_agent(
    model="google_genai:gemini-3.5-flash",
    context_schema=Context,
)

# Start a conversation
config = {"configurable": {"thread_id": str(uuid7())}}
agent.invoke(
    {"messages": [{"role": "user", "content": "Plan a 3-day trip to Tokyo"}]},
    config=config,
    context=Context(user_id="user-123"),
)

# Follow-up on the same conversation: reuse the same thread_id
agent.invoke(
    {"messages": [{"role": "user", "content": "Make it 5 days instead"}]},
    config=config,
    context=Context(user_id="user-123"),
)
使用 LangGraph SDK 部署时,SDK 会为你管理 threads,你需要将返回的 thread_id 传给每次 run:
from langgraph_sdk import get_client

client = get_client(url="<DEPLOYMENT_URL>", api_key="<LANGSMITH_API_KEY>")

thread = await client.threads.create()
async for chunk in client.runs.stream(
    thread["thread_id"],
    "agent",
    input={"messages": [{"role": "user", "content": "Plan a 3-day trip to Tokyo"}]},
    context={"user_id": "user-123"},
    stream_mode="updates",
):
    print(chunk.data)
thread_id scoped conversation(message history、checkpoints)。context 携带你的 tools 和 middleware 读取的 per-run data。二者相互独立:更改其中一个不会影响另一个,你可以传入任意一个或同时传入二者。

Multi-tenancy

当你的 agent 服务多个 users 时,你需要处理三件事:验证每个 user 的身份、控制他们可访问的内容,以及管理 agent 代表他们执行操作时使用的 credentials。 Three authentication layers compose: end-user auth, agent-acting-as-user auth, and team RBAC

User identity and access control

LangSmith Deployments 支持 custom authentication 来建立 user identity,并支持 authorization handlers 来控制对 threads、assistants 和 store namespaces 等 resources 的 access。Authorization handlers 会在 authentication 成功后运行,并可以:
  • 使用 ownership metadata 标记 resources(例如 owner: user_id
  • 返回 filters,让 users 只能看到自己的 resources
  • 对 unauthorized operations 返回 HTTP 403 拒绝 access
Step-by-step tutorial 请参阅 Make conversations private。Walkthrough 请观看 custom auth video 你如何 scope memoryexecution environments 会决定 users 之间共享哪些 data。详情请参阅下方 sections。

Team access control (RBAC)

LangSmith 的 role-based access control 管理你 team 中谁可以 deploy、configure 和 monitor agents。这与上面的 end-user authorization 是分开的。
RoleAccess
Workspace Admin完整 permissions,包括 settings 和 member management
Workspace Editor创建和修改 resources,但不能 delete runs 或 manage members
Workspace ViewerRead-only access
Enterprise plans 提供带 granular permissions 的 custom roles。完整 permission model 请参阅 RBAC reference

End-user credentials

当你的 agent 需要代表 user 调用 external APIs(例如读取他们的 GitHub repos、发送 Slack messages、查询他们的 data warehouse)时,你需要一种方式将 user credentials 传给 agent,而不是硬编码它们。 OAuth via Agent Auth. Agent Auth 提供 managed OAuth 2.0 flow。配置 OAuth provider 后,agent 可以请求 scoped to each user 的 tokens。首次使用时,agent 会 interrupts execution 并展示 OAuth consent URL。User 完成 authentication 后,agent 会带着 valid token resume。Tokens 会自动 stored 和 refreshed。
from langchain_auth import Client
from langchain.tools import tool, ToolRuntime

auth_client = Client()

# Inside your agent's tool:
@tool
async def github_action(runtime: ToolRuntime):
    """Perform an action on behalf of the user via GitHub."""
    auth_result = await auth_client.authenticate(
        provider="github",
        scopes=["repo", "read:org"],
        user_id=runtime.server_info.user.identity,
    )
    # Use auth_result.token for GitHub API calls on the user's behalf
Credential injection for sandboxes. 如果你的 agent 在 sandbox 中运行调用 external APIs 的 code,sandbox auth proxy 可以自动将 credentials 注入 outbound requests,因此 sandbox code 永远不会接收 raw API keys。Setup details 请参阅 Managing secrets Workspace secrets. 对于所有 users 共享的 API keys(例如 organization 的 LLM provider keys、search API keys),请将其存储为 LangSmith 中的 workspace secrets。详情请参阅 Managing secrets

Async

LLM-based applications 高度 I/O-bound:调用 language models、databases 和 external services。Async programming 让这些 operations 并发运行而不是阻塞,从而提升 throughput 和 responsiveness。
LangChain 遵循在 async method names 前加 a 的约定(例如 ainvokeabefore_agentastream)。Sync 和 async variants 位于同一个 class 或 namespace 中。
面向 production 构建时:
  • 创建 async tools。 LangChain 会在单独 thread 中运行 sync tools 以避免 blocking,但 native async 可以完全避免 threading overhead。
  • 使用 async middleware methods。 Custom middleware 应实现 async hooks(例如 abefore_agent 而不是 before_agent)。
  • 对 external resource lifecycle 使用 async。 创建 sandboxes 或连接 MCP servers 涉及 network calls,应该被 awaited。这就是 provision 这些 resources 的 graph factories 为 async 的原因。

Durability

Deep Agents 运行在 LangGraph 上,LangGraph 开箱提供 durable execution。persistence layer 会在每一步 checkpoint state,因此因 failure、timeout 或 human-in-the-loop pause 中断的 run,可以从其最后记录的 state resume,而不需要重新处理 previous steps。对于会 spawn 许多 subagents 的 long-running deep agents,这意味着 mid-run failure 不会丢失 completed work。 Durable execution: when a worker crashes mid-run, another worker picks the run up from the latest checkpoint Checkpointing 还支持:
  • Indefinite interrupts Human-in-the-loop workflows 可以暂停数分钟或数天,并从离开的位置精确 resume。
  • Time travel 每个 checkpointed step 都是一个可以 rewind 的 snapshot,让你在出错时从 earlier state replay。
  • 安全处理 sensitive operations。 对于涉及 payments 或其他 irreversible actions 的 workflows,checkpoints 提供 audit trail 和 recovery point,可检查导致 action 的 exact state。
LangSmith Deployments 会自动配置 persistent checkpointer。如果你 self-hosting,请参阅 persistence 获取 setup instructions。

Memory

没有 memory 时,每次 conversation 都从零开始。Memory 让你的 agent 在 conversations 之间保留 information(user preferences、learned instructions、past experiences),从而随着时间个性化其 behavior。Memory types overview 请参阅 memory concepts guide Short-term memory is scoped to a single thread via checkpoints; long-term memory persists across threads via the store

Scoping

Memory 始终跨 conversations persistent。主要问题是它如何跨 user 和 assistant boundaries scoped。正确 scope 取决于谁应该查看和修改 data:
ScopeNamespaceUse caseExample
User(推荐默认值)(user_id)Per-user preferences 和 context”I prefer concise responses”
Assistant(assistant_id)一个 assistant 的 shared instructions”Cap posts at 280 characters”
Global(org_id)面向所有 users 和 assistants 的 read-only policies”Never disclose internal pricing”
Shared memory(assistant、user 或 organization scope)是 prompt injection 的 vector。如果一个 user 可以写入另一个 user conversation 会读取的 memory,malicious user 可能会将 instructions 注入 shared state。请在合适位置强制 read-only access。例如,让 organization-wide policies 只能通过 application code 写入,而不是由 agent 自己写入。使用 permissions declaratively deny 对 shared paths 的 writes,或使用 backend policy hooks 实现 custom validation logic。

Configuration

在 Deep Agents 中,memory 以 files 形式存储在 virtual filesystem 中。默认情况下,files scoped 到单个 thread(conversation),不会跨 threads 共享。 若要跨 threads 共享 memory,请将 /memories/ 之类的 path route 到写入 LangGraph StoreStoreBackend。使用 CompositeBackend 可以同时为 agent 提供 thread-scoped scratch space 和 cross-thread long-term memory
下方展示的 rt.server_infort.execution_info namespace patterns 需要 deepagents>=0.5.0
你也可以使用 Store API 从 application code read 和 write store。Examples 请参阅 Advanced usage 完整 namespace factory API 请参阅 namespace factories。Self-improving instructions 和 knowledge bases 等 memory patterns 请参阅 long-term memory

Execution environment

在本地,agents 可以直接 read 和 write disk 上的 files,并运行 shell commands。在 production 中,你需要考虑 isolation 和 persistence。正确设置取决于你的 agent 是否需要 execute code:
  • Filesystem backends 适用于 agent 只需要 read 和 write files 的情况。请选择匹配 persistence needs 的 backend:thread-scoped scratch space、cross-thread storage,或二者混合。
  • Sandboxes 会添加带 execute tool 的 isolated container,用于运行 shell commands。如果 agent 需要 run code、install packages,或执行 file I/O 之外的操作,请使用 sandbox。

Filesystem

根据需要持久化的内容选择 backend:
  • StateBackend(默认):thread-scoped scratch space。Files 通过你的 checkpointer 在 thread 的 turns 之间 persist,但不跨 threads 共享。每一步都会 checkpoint,因此避免写入 large files。
  • StoreBackend:可跨 conversations 存活的 cross-thread storage。使用 namespace factory 设置 scope。
  • CompositeBackend:混合二者。默认使用 thread-scoped scratch space,并为 /memories/ 等 specific paths 设置 cross-thread routes。
  • ContextHubBackend:LangSmith Hub repo(owner/namename)中的 durable files。当你希望获得 LangSmith-native persistence,而不想 provision 单独 LangGraph store 时,请使用它。
完整 backends list 和 custom backend 构建方法请参阅 backends
FilesystemBackendLocalShellBackend 会直接访问 host。不要在 deployed agents 中使用它们。

Sandboxes

如果你的 agent 需要 run code(不仅是 read 和 write files),请使用 sandbox。Sandboxes 同时提供 filesystem 和用于运行 shell commands 的 execute tool,所有操作都在 isolated container 内完成。这种 isolation 也会保护你的 host:如果 agent 的 code 耗尽 memory 或 crash,只有 sandbox 受影响。你的 server 会继续运行。

Lifecycle

关键决策是 sandbox 存活多久。每个 conversation 是否获得 fresh sandbox,还是多个 conversations 共享 persistent environment?
ScopeSandbox ID stored onLifecycleExample use case
Thread-scopedThread metadata每个 conversation fresh,并在 TTL 后 cleanup每次 conversation 都从 clean state 开始的 data analysis bot
Assistant-scopedAssistant config所有 conversations 共享在 conversations 之间维护 cloned repo 的 coding assistant
下方 examples 使用 async graph factory,而不是 static graph,因为 sandbox 需要 thread_idassistant_id 来查找或创建正确 sandbox。Graph factories 不接收完整 Runtime(没有 server_infoexecution_info);相反,它接受 RunnableConfig,并从 config["configurable"] 读取 thread_idassistant_id。Factory 是 async 的,因为 sandbox creation 是 I/O-bound operation,需要 invocation time 才可用的 per-run information。
每个 conversation 都获得自己的 sandbox。graph factory 从 run config 读取 thread_id,因此每个 thread 都自动获得自己的 isolated environment。Provider 的 label-based lookup 会处理跨 runs 的 deduplication。当 sandbox TTL 过期时会 cleanup。
agent.py
from daytona import CreateSandboxFromSnapshotParams, Daytona
from deepagents import create_deep_agent
from langchain_core.runnables import RunnableConfig
from langchain_daytona import DaytonaSandbox

client = Daytona()


async def agent(config: RunnableConfig):
    thread_id = config["configurable"]["thread_id"]
    try:
        sandbox = await client.find_one(labels={"thread_id": thread_id})
    except Exception:
        sandbox = await client.create(
            CreateSandboxFromSnapshotParams(
                labels={"thread_id": thread_id},
                auto_delete_interval=3600,  # TTL: clean up when idle
            )
        )
    return create_deep_agent(
        model="google_genai:gemini-3.5-flash",
        backend=DaytonaSandbox(sandbox=sandbox)
    )
因为 agent variable 是 async function(不是 compiled graph),server 会将其视为 graph factory,并在每次 run 时调用它、注入 config。Factory 会通过 provider 的 label-based search 查找或创建 sandbox,并返回连接到该 sandbox 的 fresh agent graph。 使用 langgraph deploy 部署后,从 application code 使用 SDK invoke agent。无论 scope 如何,client-side code 都相同。Scoping 完全由上方 agent factory 处理,但 behavior 不同:
每个 thread 都有自己的 sandbox。同一个 thread 内的 follow-up messages 会复用同一个 sandbox,但 new thread 总是从 fresh sandbox 开始,没有 previous conversations 的 leftover files 或 installed packages。
client.py
from langgraph_sdk import get_client

client = get_client(url="<DEPLOYMENT_URL>", api_key="<LANGSMITH_API_KEY>")

# Conversation 1: install pandas and analyze data
thread_1 = await client.threads.create()
async for chunk in client.runs.stream(
    thread_1["thread_id"],
    "agent",
    input={"messages": [{"role": "human", "content": "Install pandas and analyze sales_data.csv"}]},
    stream_mode="updates",
):
    print(chunk.data)

# Follow-up in the same conversation — pandas is still installed
async for chunk in client.runs.stream(
    thread_1["thread_id"],
    "agent",
    input={"messages": [{"role": "human", "content": "Now plot the results"}]},
    stream_mode="updates",
):
    print(chunk.data)

# Conversation 2: fresh sandbox — pandas is NOT installed, no files from conversation 1
thread_2 = await client.threads.create()
async for chunk in client.runs.stream(
    thread_2["thread_id"],
    "agent",
    input={"messages": [{"role": "human", "content": "What packages are installed?"}]},
    stream_mode="updates",
):
    print(chunk.data)

File transfers

Sandboxes 是 isolated containers,因此你的 application code 不能直接访问其中的 files。使用 upload_files()download_files() 在 sandbox boundary 两侧移动 data:
  • 在 agent 运行前 seed sandbox:上传 user files、skill scripts、configuration 或 persistent memories,让 agent 从一开始就拥有所需内容
  • 在 agent 完成后 retrieve results:下载 generated artifacts(reports、plots、exports),并将 updated memories 同步回来供 future conversations 使用
Provider-specific file transfer examples 请参阅 working with files。Provider setup、security 和 lifecycle patterns 请参阅完整 sandboxes guide
Agent 需要 execute 的 Skill scripts 必须在 agent 运行前上传到 sandbox。你也可能希望同步 memories,让 agent 可以在 container 内 read 和 update 它们。使用带 before_agentafter_agent hooks 的 custom middleware 跨 sandbox boundary 移动 files:
agent.py
from deepagents import create_deep_agent
from langchain.agents.middleware import AgentMiddleware, AgentState
from langgraph.runtime import Runtime


def _safe_filename(key: str) -> str:
    """Reject keys that contain path traversal or glob characters."""
    name = key.split("/")[-1]
    if ".." in name or any(c in name for c in ("*", "?")):
        raise ValueError(f"Invalid key: {key}")
    return name


class SandboxSyncMiddleware(AgentMiddleware):
    """Sync skills and memories between the store and the sandbox."""

    def __init__(self, backend: CompositeBackend):
        super().__init__()
        self.backend = backend

    async def abefore_agent(self, state: AgentState, runtime: Runtime) -> None:
        """Upload skill scripts and memories into the sandbox."""
        user_id = runtime.server_info.user.identity  
        store = runtime.store
        files = []
        for item in await store.asearch(("skills", user_id)):
            name = _safe_filename(item.key)
            files.append((f"/skills/{name}", item.value["content"].encode()))
        for item in await store.asearch(("memories", user_id)):
            name = _safe_filename(item.key)
            files.append((f"/memories/{name}", item.value["content"].encode()))
        if files:
            await self.backend.upload_files(files)

    async def aafter_agent(self, state: AgentState, runtime: Runtime) -> None:
        """Sync updated memories back to the store."""
        user_id = runtime.server_info.user.identity  
        store = runtime.store
        items = await store.asearch(("memories", user_id))
        results = await self.backend.download_files(
            [f"/memories/{item.key}" for item in items]
        )
        for result in results:
            if result.content is not None:
                await store.aput(
                    ("memories", user_id),
                    result.path.split("/")[-1],
                    {"content": result.content.decode()},
                )


backend = CompositeBackend(
    default=DaytonaSandbox(sandbox=sandbox),
    routes={
        "/skills/": StoreBackend(
            rt,
            namespace=lambda rt: ("skills", rt.server_info.user.identity),
        ),
        "/memories/": StoreBackend(
            rt,
            namespace=lambda rt: ("memories", rt.server_info.user.identity),
        ),
    },
)

agent = create_deep_agent(
    model="google_genai:gemini-3.5-flash",
    backend=backend,
    middleware=[SandboxSyncMiddleware(backend)],
)

Managing secrets

Sandboxes 是 isolated containers,因此来自 host 的 environment variables 在其中不可用。有两种方式可以向 sandbox code 提供 API keys 和其他 secrets: Auth proxy(推荐)。 sandbox auth proxy 会拦截来自 sandbox 的 outbound requests,并自动注入 authentication headers。Sandbox code 正常调用 external APIs,proxy 会根据 destination host 添加正确 credentials。这意味着 API keys 永远不会出现在 sandbox code、environment variables 或 logs 中。 The sandbox auth proxy injects credentials into outbound requests so secrets never enter the sandbox
{
  "proxy_config": {
    "rules": [
      {
        "name": "openai-api",
        "match_hosts": ["api.openai.com"],
        "inject_headers": {
          "Authorization": "Bearer ${OPENAI_API_KEY}"
        }
      },
      {
        "name": "anthropic-api",
        "match_hosts": ["api.anthropic.com"],
        "inject_headers": {
          "x-api-key": "${ANTHROPIC_API_KEY}"
        }
      }
    ]
  }
}
${SECRET_KEY} references 会解析到 LangSmith workspace settings 中存储的 secrets。创建引用这些 secrets 的 template 前,请先在那里配置 secrets。 Workspace secrets. 对于不需要 proxy-based injection 的 API keys(例如 agent server 本身使用而非 sandbox code 使用的 keys),请将其作为 workspace secrets 存储在 LangSmith 中。这些会在 workspace 中所有 agents 的 runtime 作为 environment variables 可用。
避免通过 environment variables 或 file uploads 将 secrets 传入 sandboxes。Agents 可以读取 sandbox 内任何 accessible file 或 environment variable,包括 credentials。Auth proxy 会让 secrets 完全不进入 sandbox。

Guardrails

Production 中的 agents 会 autonomously 运行,这意味着它们可能 indefinite loop、hit rate limits,或处理包含 sensitive information 的 user data。Deep Agents 提供两层 protection:
  • Permissions:Declarative allow/deny rules,控制 agent 可以 read 或 write 哪些 files 和 directories。使用 permissions 将 agent isolate 到 working directory、保护 sensitive files,或 enforce read-only memory。
  • Middleware:围绕 model 和 tool calls 的 hooks,用于 rate limiting、error handling 和 data privacy。
Middleware hooks—before_model, wrap_model_call, wrap_tool_call, after_model—wrap the agent loop so policies run deterministically around every relevant step

Rate limiting

这里的 rate limiting 指限制 agent 在一次 run 内自己的 LLM 和 tool usage,而不是 incoming requests 的 API gateway rate limiting。 如果没有 limits,一个 confused agent 可能会在几分钟内通过反复调用同一个 tool 或发起数百次 model calls 耗尽你的 LLM API budget。请对每次 run 内的 model calls 和 tool executions 都设置 caps:
from deepagents import create_deep_agent
from langchain.agents.middleware import ModelCallLimitMiddleware, ToolCallLimitMiddleware

agent = create_deep_agent(
    model="google_genai:gemini-3.5-flash",
    middleware=[
        ModelCallLimitMiddleware(run_limit=50),
        ToolCallLimitMiddleware(run_limit=200),
    ],
)
使用 run_limit 限制单次 invocation 内的 calls(每 turn reset)。使用 thread_limit 限制整个 conversation 内的 calls(需要 checkpointer)。完整 configuration 请参阅 ModelCallLimitMiddlewareToolCallLimitMiddleware

Handling errors

并非所有 errors 都应以同一种方式处理。Transient failures(network timeouts、rate limits)应自动 retry。LLM 可恢复的 errors(bad tool output、parsing failures)应反馈给 model。需要 human input 的 errors 应 pause agent。带 code examples 的完整拆解请参阅 Handle errors appropriately Middleware 处理 transient case。Model calls 和 tool calls 各自拥有带 exponential backoff 的 retry middleware。如果 primary model provider 完全 down,fallback middleware 会切换到 alternative:
from deepagents import create_deep_agent
from langchain.agents.middleware import (
    ModelFallbackMiddleware,
    ModelRetryMiddleware,
    ToolRetryMiddleware,
)

agent = create_deep_agent(
    model="google_genai:gemini-3.5-flash",
    middleware=[
        # Retry model calls on rate limits, timeouts, and 5xx errors
        ModelRetryMiddleware(max_retries=3, backoff_factor=2.0, initial_delay=1.0),
        # If the primary model is fully down, fall back to an alternative
        ModelFallbackMiddleware("gpt-5.4"),
        # Retry specific tools that hit external APIs (not all tools)
        ToolRetryMiddleware(
            max_retries=2,
            tools=["search", "fetch_url"],
            retry_on=(TimeoutError, ConnectionError),
        ),
    ],
)
ToolRetryMiddleware scoped 到 specific tools,而不是 retry everything。失败的 filesystem read_file 不会因 retry 获益,但 timed out 的 web search 很可能会。完整 configuration 请参阅 ModelRetryMiddlewareModelFallbackMiddleware

Data privacy

如果你的 agent 处理可能包含 emails、credit card numbers 或其他 PII 的 user input,你可以在其到达 model 或存储到 logs 前检测并处理它:
from deepagents import create_deep_agent
from langchain.agents.middleware import PIIMiddleware

agent = create_deep_agent(
    model="google_genai:gemini-3.5-flash",
    middleware=[
        PIIMiddleware("email", strategy="redact", apply_to_input=True),
        PIIMiddleware("credit_card", strategy="mask", apply_to_input=True),
    ],
)
Strategies 包括 redact(替换为 [REDACTED_EMAIL])、mask(像 ****-****-****-1234 这样的 partial masking)、hash(deterministic hash)和 block(raise an error)。你也可以为 domain-specific patterns 编写 custom detectors。 完整 configuration 请参阅 PIIMiddleware 可用 middleware 的完整列表请参阅 prebuilt middleware

Frontend

Deep Agents 使用 useStream 将你的 UI 连接到 agent backend。useStream 是 frontend hook(适用于 React、Vue、Svelte 和 Angular),可以从你的 agent 实时 stream messages、subagent progress 和 custom state。 本地开发时,useStream 指向 http://localhost:2024。在 production 中,将它指向你的 LangSmith Deployment,并配置 reconnection,避免 users 在 connection drops 时丢失进度。
import { useStream } from "@langchain/react";

function App() {
  const stream = useStream<typeof agent>({
    apiUrl: "https://your-deployment.langsmith.dev",
    assistantId: "agent",
    reconnectOnMount: true,    // Resume stream after page refresh or navigation
    fetchStateHistory: true,   // Load full thread history on mount
  });
}
reconnectOnMount 会自动接上 in-progress run。如果 user 在 agent 工作时 refresh,他们会看到它继续,而不是 blank screen。fetchStateHistory 会加载 thread 的 full conversation history,因此 returning users 可以看到 previous messages。 对于会 spawn 许多 subagents 的 deep agent workflows,submit 时请设置较高的 recursionLimit,避免 long-running executions 被截断:
stream.submit(
  { messages: [{ type: "human", content: text }] },
  {
    streamSubgraphs: true,
    config: { recursionLimit: 10000 },
  },
);
Deep agents 专属 UI patterns,例如 subagent cards、todo lists 和 custom state rendering,请参阅 frontend guide