"""
客服状态机示例
此示例演示状态机模式。
单个 agent 会根据 current_step 状态动态更改其行为,
从而创建一个用于顺序收集信息的状态机。
"""
from langchain_core.utils.uuid import uuid7
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command
from typing import Callable, Literal
from typing_extensions import NotRequired
from langchain.agents import AgentState, create_agent
from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse, SummarizationMiddleware
from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage, ToolMessage
from langchain.tools import tool, ToolRuntime
model = init_chat_model("google_genai:gemini-3.5-flash")
# 定义可能的工作流步骤
SupportStep = Literal["warranty_collector", "issue_classifier", "resolution_specialist"]
class SupportState(AgentState):
"""客服工作流的状态。"""
current_step: NotRequired[SupportStep]
warranty_status: NotRequired[Literal["in_warranty", "out_of_warranty"]]
issue_type: NotRequired[Literal["hardware", "software"]]
@tool
def record_warranty_status(
status: Literal["in_warranty", "out_of_warranty"],
runtime: ToolRuntime[None, SupportState],
) -> Command:
"""记录客户的保修状态,并转换到问题分类步骤。"""
return Command(
update={
"messages": [
ToolMessage(
content=f"保修状态已记录为:{status}",
tool_call_id=runtime.tool_call_id,
)
],
"warranty_status": status,
"current_step": "issue_classifier",
}
)
@tool
def record_issue_type(
issue_type: Literal["hardware", "software"],
runtime: ToolRuntime[None, SupportState],
) -> Command:
"""记录问题类型,并转换到解决方案专家步骤。"""
return Command(
update={
"messages": [
ToolMessage(
content=f"问题类型已记录为:{issue_type}",
tool_call_id=runtime.tool_call_id,
)
],
"issue_type": issue_type,
"current_step": "resolution_specialist",
}
)
@tool
def escalate_to_human(reason: str) -> str:
"""将工单升级给人工客服专家。"""
# 在真实系统中,这会创建工单、通知员工等。
return f"正在升级给人工客服。原因:{reason}"
@tool
def provide_solution(solution: str) -> str:
"""为客户问题提供解决方案。"""
return f"已提供解决方案:{solution}"
# 将 prompt 定义为常量
WARRANTY_COLLECTOR_PROMPT = """你是帮助客户处理设备问题的客服 agent。
当前步骤:保修验证
在此步骤中,你需要:
1. 热情地问候客户
2. 询问客户的设备是否在保修期内
3. 使用 record_warranty_status 记录客户回答并进入下一步
保持自然、友好的对话风格。不要一次询问多个问题。"""
ISSUE_CLASSIFIER_PROMPT = """你是帮助客户处理设备问题的客服 agent。
当前步骤:问题分类
客户信息:保修状态为 {warranty_status}
在此步骤中,你需要:
1. 要求客户描述问题
2. 判断这是硬件问题(物理损坏、部件损坏)还是软件问题(app 崩溃、性能问题)
3. 使用 record_issue_type 记录分类并进入下一步
如果不清楚,请先提澄清问题,再进行分类。"""
RESOLUTION_SPECIALIST_PROMPT = """你是帮助客户处理设备问题的客服 agent。
当前步骤:解决方案
客户信息:保修状态为 {warranty_status},问题类型为 {issue_type}
在此步骤中,你需要:
1. 对于软件问题:使用 provide_solution 提供故障排查步骤
2. 对于硬件问题:
- 如果在保修期内:使用 provide_solution 说明保修维修流程
- 如果超出保修期:使用 escalate_to_human 处理付费维修选项
解决方案要具体且有帮助。"""
# 步骤配置:将步骤名称映射到(prompt、tools、required_state)
STEP_CONFIG = {
"warranty_collector": {
"prompt": WARRANTY_COLLECTOR_PROMPT,
"tools": [record_warranty_status],
"requires": [],
},
"issue_classifier": {
"prompt": ISSUE_CLASSIFIER_PROMPT,
"tools": [record_issue_type],
"requires": ["warranty_status"],
},
"resolution_specialist": {
"prompt": RESOLUTION_SPECIALIST_PROMPT,
"tools": [provide_solution, escalate_to_human],
"requires": ["warranty_status", "issue_type"],
},
}
@wrap_model_call
def apply_step_config(
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
"""根据当前步骤配置 agent 行为。"""
# 获取当前步骤(首次交互默认使用 warranty_collector)
current_step = request.state.get("current_step", "warranty_collector")
# 查找步骤配置
step_config = STEP_CONFIG[current_step]
# 验证必需状态是否存在
for key in step_config["requires"]:
if request.state.get(key) is None:
raise ValueError(f"到达 {current_step} 前必须先设置 {key}")
# 使用状态值格式化 prompt
system_prompt = step_config["prompt"].format(**request.state)
# 注入 system prompt 和步骤专属工具
request = request.override(
system_prompt=system_prompt,
tools=step_config["tools"],
)
return handler(request)
# 收集所有步骤配置中的全部工具
all_tools = [
record_warranty_status,
record_issue_type,
provide_solution,
escalate_to_human,
]
# 使用基于步骤的配置和摘要功能创建 agent
agent = create_agent(
model,
tools=all_tools,
state_schema=SupportState,
middleware=[
apply_step_config,
SummarizationMiddleware(
model="gpt-5.4-mini",
trigger=("tokens", 4000),
keep=("messages", 10)
)
],
checkpointer=InMemorySaver(),
)
# ============================================================================
# 测试工作流
# ============================================================================
if __name__ == "__main__":
thread_id = str(uuid7())
config = {"configurable": {"thread_id": thread_id}}
result = agent.invoke(
{"messages": [HumanMessage("你好,我的手机屏幕碎了")]},
config
)
result = agent.invoke(
{"messages": [HumanMessage("是的,它还在保修期内")]},
config
)
result = agent.invoke(
{"messages": [HumanMessage("屏幕摔落后出现了物理裂纹")]},
config
)
result = agent.invoke(
{"messages": [HumanMessage("我应该怎么办?")]},
config
)
for msg in result['messages']:
msg.pretty_print()