单元测试用于在隔离环境中验证 Agent 的小型、确定性逻辑单元。通过将真实 LLM 替换为内存中的伪实现(又称 fixture),你可以预先设定精确响应(文本、工具调用和错误),从而使测试快速、免费、可重复,并且无需 API 密钥。

使用 fakeModel 模拟聊天模型

fakeModel 是一个构建器风格的伪聊天模型,允许你预先设定精确响应(文本、工具调用、错误),并断言模型实际接收到的内容。它继承自 BaseChatModel,因此可以在任何需要真实模型的地方使用。
import { fakeModel } from "langchain";

快速开始

创建模型,使用 .respond() 排队响应,然后调用模型。每次 invoke() 都会按顺序消费下一个排队响应:
import { fakeModel } from "langchain";
import { AIMessage, HumanMessage } from "@langchain/core/messages";

const model = fakeModel()
  .respond(new AIMessage("I can help with that."))
  .respond(new AIMessage("Here's what I found."))
  .respond(new AIMessage("You're welcome!"));

const r1 = await model.invoke([new HumanMessage("Can you help?")]);
// r1.content === "I can help with that."

const r2 = await model.invoke([new HumanMessage("What did you find?")]);
// r2.content === "Here's what I found."

const r3 = await model.invoke([new HumanMessage("Thanks!")]);
// r3.content === "You're welcome!"
如果调用次数超过了已排队的响应数量,它会抛出带有明确说明的错误:
const model = fakeModel()
  .respond(new AIMessage("only one"));

await model.invoke([new HumanMessage("first")]);  // 正常
await model.invoke([new HumanMessage("second")]); // 抛出:"no response queued for invocation 1"

工具调用响应

.respond() 支持工具调用,只需传入带有 tool_callsAIMessage
import { fakeModel } from "langchain";
import { AIMessage, HumanMessage } from "@langchain/core/messages";

const model = fakeModel()
  .respond(new AIMessage({
    content: "",
    tool_calls: [
      { name: "get_weather", args: { city: "San Francisco" }, id: "call_1", type: "tool_call" },
    ],
  }))
  .respond(new AIMessage("It's 72°F and sunny in San Francisco."));

const r1 = await model.invoke([new HumanMessage("What's the weather in SF?")]);
console.log(r1.tool_calls[0].name); // "get_weather"

const r2 = await model.invoke([new HumanMessage("Thanks")]);
console.log(r2.content); // "It's 72°F and sunny in San Francisco."
.respondWithTools() 是同样功能的简写形式。无需构造完整的 AIMessage,只需提供工具名称和参数:
// 以下两种排队方式会产生完全相同的响应:

model.respond(new AIMessage({
  content: "",
  tool_calls: [
    { name: "get_weather", args: { city: "SF" }, id: "call_1", type: "tool_call" },
  ],
}));

// 等价简写:
model.respondWithTools([  
  { name: "get_weather", args: { city: "SF" }, id: "call_1" },
]);
id 字段是可选的。如果省略,将自动生成唯一 ID。
.respond().respondWithTools() 可以任意混合使用。这对于测试 Agent 循环特别有用,因为模型通常会在工具调用和文本响应之间交替。

模拟错误

在特定轮次抛出错误

.respond() 传入 Error 对象,会使模型在对应调用时抛出异常。错误可以出现在响应序列中的任何位置:
import { fakeModel } from "langchain";
import { AIMessage, HumanMessage } from "@langchain/core/messages";

const model = fakeModel()
  .respond(new Error("rate limit exceeded"))  // 第 1 轮:抛出异常
  .respond(new AIMessage("Recovered!"));      // 第 2 轮:成功返回

try {
  await model.invoke([new HumanMessage("first")]);
} catch (e) {
  console.log(e.message); // "rate limit exceeded"
}

const result = await model.invoke([new HumanMessage("retry")]);
console.log(result.content); // "Recovered!"

每次调用都抛出错误

.alwaysThrow() 会让每次调用都抛出异常,而不受响应队列影响。这对于测试错误处理和重试逻辑非常有用:
import { fakeModel } from "langchain";
import { HumanMessage } from "@langchain/core/messages";

const model = fakeModel().alwaysThrow(new Error("service unavailable"));

await model.invoke([new HumanMessage("a")]); // 抛出 "service unavailable"
await model.invoke([new HumanMessage("b")]); // 抛出 "service unavailable"

使用工厂函数生成动态响应

.respond() 还支持传入函数,根据输入消息动态生成响应。该函数接收完整消息数组,并返回 BaseMessageError
import { fakeModel } from "langchain";
import { AIMessage, HumanMessage } from "@langchain/core/messages";

const model = fakeModel()
  .respond((messages) => {
    const last = messages[messages.length - 1].text;
    return new AIMessage(`You said: ${last}`);
  });

const result = await model.invoke([new HumanMessage("hello")]);
console.log(result.content); // "You said: hello"
工厂函数也可以返回错误:
import { fakeModel } from "langchain";
import { AIMessage, HumanMessage } from "@langchain/core/messages";

const model = fakeModel()
  .respond((messages) => {
    const content = messages[messages.length - 1].text;
    if (content.includes("forbidden")) {
      return new Error("Content policy violation");
    }
    return new AIMessage("OK");
  });

await model.invoke([new HumanMessage("forbidden topic")]); // 抛出 "Content policy violation"
每个函数都作为一个队列项,仅消费一次。如果希望同样的动态逻辑在多个轮次中复用,需要多次调用 respond() 将其加入队列。

结构化输出

对于使用 .withStructuredOutput() 的代码,可以通过 .structuredResponse() 配置伪返回值:
import { fakeModel } from "langchain";
import { HumanMessage } from "@langchain/core/messages";
import { z } from "zod";

const model = fakeModel()
  .structuredResponse({ temperature: 72, unit: "fahrenheit" });

const structured = model.withStructuredOutput(
  z.object({
    temperature: z.number(),
    unit: z.string(),
  })
);

const result = await structured.invoke([new HumanMessage("Weather?")]);
console.log(result);
// { temperature: 72, unit: "fahrenheit" }
传递给 .withStructuredOutput() 的 schema 会被忽略。模型始终返回通过 .structuredResponse() 配置的值。这样可以让测试专注于应用逻辑,而不是解析逻辑。

断言模型接收到的内容

fakeModel 会记录每次调用,包括传入的消息和选项。这类似于传统测试框架中的 spy 或 mock:
import { fakeModel } from "langchain";
import { AIMessage, HumanMessage } from "@langchain/core/messages";

const model = fakeModel()
  .respond(new AIMessage("first"))
  .respond(new AIMessage("second"));

await model.invoke([new HumanMessage("question 1")]);
await model.invoke([new HumanMessage("question 2")]);

console.log(model.callCount); // 2

console.log(model.calls[0].messages[0].content); // "question 1"
console.log(model.calls[1].messages[0].content); // "question 2"
即使模型抛出异常,调用记录也会保留:
import { fakeModel } from "langchain";
import { HumanMessage } from "@langchain/core/messages";

const model = fakeModel().respond(new Error("boom"));

try {
  await model.invoke([new HumanMessage("will fail")]);
} catch {
  // 已处理异常
}

console.log(model.callCount); // 1
console.log(model.calls[0].messages[0].content); // "will fail"

bindTools 一起使用

LangChain Agent 和 LangGraph 等框架会在内部调用 model.bindTools(tools)fakeModel 会自动处理这一点。绑定后的模型与原始模型共享同一个响应队列和调用记录,因此无需额外配置:
import { fakeModel } from "langchain";
import { AIMessage, HumanMessage } from "@langchain/core/messages";
import { tool } from "@langchain/core/tools";
import { z } from "zod";

const searchTool = tool(async ({ query }) => `Results for: ${query}`, {
  name: "search",
  description: "Search the web",
  schema: z.object({ query: z.string() }),
});

const model = fakeModel()
  .respondWithTools([{ name: "search", args: { query: "weather" }, id: "1" }])
  .respond(new AIMessage("The weather is sunny."));

const bound = model.bindTools([searchTool]);

const r1 = await bound.invoke([new HumanMessage("weather?")]);
console.log(r1.tool_calls[0].name); // "search"

const r2 = await bound.invoke([new HumanMessage("thanks")]);
console.log(r2.content); // "The weather is sunny."

// 调用记录是共享的,通过原始模型查看即可。
console.log(model.callCount); // 2
import { describe, test, expect } from "vitest";
import { fakeModel } from "langchain";
import { AIMessage, HumanMessage, ToolMessage } from "@langchain/core/messages";
import { tool } from "@langchain/core/tools";
import { z } from "zod";

const getWeather = tool(
  async ({ city }) => `72°F and sunny in ${city}`,
  {
    name: "get_weather",
    description: "获取城市天气",
    schema: z.object({ city: z.string() }),
  }
);

async function runAgent(
  model: ReturnType<typeof fakeModel>,
  input: string
) {
  const messages: any[] = [new HumanMessage(input)];
  const bound = model.bindTools([getWeather]);

  while (true) {
    const response = await bound.invoke(messages);
    messages.push(response);

    if (!response.tool_calls?.length) {
      return { messages, finalResponse: response };
    }

    for (const tc of response.tool_calls) {
      const result = await getWeather.invoke(tc.args);
      messages.push(new ToolMessage({
        content: result as string,
        tool_call_id: tc.id!,
      }));
    }
  }
}

describe("weather agent", () => {
  test("调用 get_weather 并返回最终答案", async () => {
    const model = fakeModel()
      .respondWithTools([
        { name: "get_weather", args: { city: "SF" }, id: "call_1" },
      ])
      .respond(new AIMessage("It's 72°F and sunny in SF!"));

    const { finalResponse } = await runAgent(model, "Weather in SF?");

    expect(finalResponse.content).toBe("It's 72°F and sunny in SF!");
    expect(model.callCount).toBe(2);

    const secondCall = model.calls[1].messages;
    const toolMsg = secondCall.find((m: any) => m._getType() === "tool");
    expect(toolMsg?.content).toContain("72°F and sunny in SF");
  });

  test("能够正确处理模型错误", async () => {
    const model = fakeModel()
      .respond(new Error("rate limit"));

    await expect(
      runAgent(model, "Weather?")
    ).rejects.toThrow("rate limit");

    expect(model.callCount).toBe(1);
  });
});

后续步骤

了解如何使用真实模型提供商 API 测试 Agent,请参阅 集成测试