概览

在本教程中,你将学习如何使用 LangChain agents 构建一个可以回答 SQL 数据库相关问题的代理。 从高层来看,代理会:
  1. 从数据库获取可用表和 schemas
  2. 判断哪些表与问题相关
  3. 获取相关表的 schemas
  4. 基于问题和 schemas 信息生成查询
  5. 使用 LLM 再次检查查询中的常见错误
  6. 执行查询并返回结果
  7. 修正数据库引擎暴露的错误,直到查询成功
  8. 基于结果组织响应
构建 SQL 数据库问答系统需要执行模型生成的 SQL 查询。这种做法存在固有风险。请确保数据库连接权限始终尽可能缩小到代理需求范围内。这会降低构建模型驱动系统的风险,但无法完全消除风险。

概念

本教程涵盖以下概念:

设置

1

安装依赖

npm i langchain @langchain/core sqlite3 zod
2

设置 LangSmith

设置 LangSmith,以检查链或代理内部发生了什么。然后设置以下环境变量:
export LANGSMITH_TRACING="true"
export LANGSMITH_API_KEY="..."

构建 SQL 代理

1

选择 LLM

选择支持 tool-calling 的模型:
👉 Read the OpenAI chat model integration docs
npm install @langchain/openai
import { initChatModel } from "langchain";

process.env.OPENAI_API_KEY = "your-api-key";

const model = await initChatModel("gpt-5.4");
下面示例中展示的输出使用 OpenAI。
2

配置数据库

本教程将创建一个 SQLite database。SQLite 是一种易于设置和使用的轻量数据库。这里会加载 chinook 数据库,它是一个表示数字媒体商店的示例数据库。为了方便,数据库(Chinook.db)托管在公开 GCS bucket 上。
import fs from "node:fs/promises";
import path from "node:path";

const url =
  "https://storage.googleapis.com/benchmarks-artifacts/chinook/Chinook.db";
const localPath = path.resolve("Chinook.db");

async function resolveDbPath() {
  try {
    await fs.access(localPath);
    return localPath;
  } catch {
    // Chinook.db not present locally; download it.
  }
  const resp = await fetch(url);
  if (!resp.ok)
    throw new Error(`Failed to download DB. Status code: ${resp.status}`);
  const buf = Buffer.from(await resp.arrayBuffer());
  await fs.writeFile(localPath, buf);
  return localPath;
}
3

添加用于数据库交互的工具

以下数据库工具只是用于演示的最小 wrapper。它们并不用于安全场景或生产环境。在执行模型生成的 SQL 之前,请使用范围尽可能小的数据库权限,并添加应用特定验证。
将使用 sqlite3 库查询数据库并获取 schemas:
import sqlite3 from "sqlite3";

// Below are minimal tools for demonstration purposes.
async function runQuery(query: string): Promise<any[]> {
  const dbPath = await resolveDbPath();
  const db = new sqlite3.Database(dbPath);
  return new Promise((resolve, reject) => {
    db.all(query, [], (err, rows) => {
      db.close();
      if (err) reject(err);
      else resolve(rows);
    });
  });
}

async function getSchema() {
  const tables = await runQuery(
    "SELECT sql FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';",
  );
  return tables.map((row) => row.sql).join("\n\n");
}
4

创建代理

运行命令前,在 _safe_sql 中检查 LLM 生成的命令:
const DENY_RE =
  /\b(INSERT|UPDATE|DELETE|ALTER|DROP|CREATE|REPLACE|TRUNCATE)\b/i;
const HAS_LIMIT_TAIL_RE = /\blimit\b\s+\d+(\s*,\s*\d+)?\s*;?\s*$/i;

function sanitizeSqlQuery(q) {
  let query = String(q ?? "").trim();

  // block multiple statements (allow one optional trailing ;)
  const semis = [...query].filter((c) => c === ";").length;
  if (semis > 1 || (query.endsWith(";") && query.slice(0, -1).includes(";"))) {
    throw new Error("multiple statements are not allowed.");
  }
  query = query.replace(/;+\s*$/g, "").trim();

  // read-only gate
  if (!query.toLowerCase().startsWith("select")) {
    throw new Error("Only SELECT statements are allowed");
  }
  if (DENY_RE.test(query)) {
    throw new Error("DML/DDL detected. Only read-only queries are permitted.");
  }

  // append LIMIT only if not already present
  if (!HAS_LIMIT_TAIL_RE.test(query)) {
    query += " LIMIT 5";
  }
  return query;
}
然后,使用 execute_sql 工具执行命令:
import { tool } from "langchain";
import * as z from "zod";

const executeSql = tool(
  async ({ query }) => {
    const q = sanitizeSqlQuery(query);
    try {
      const result = await runQuery(q);
      return JSON.stringify(result, null, 2);
    } catch (e) {
      const message = e instanceof Error ? e.message : String(e);
      throw new Error(message);
    }
  },
  {
    name: "execute_sql",
    description: "Execute a READ-ONLY SQLite SELECT query and return results.",
    schema: z.object({
      query: z.string().describe("SQLite SELECT query to execute (read-only)."),
    }),
  },
);
使用 createAgent 以最少代码构建 ReAct agent。代理会解释请求并生成 SQL 命令。工具会检查命令安全性,然后尝试执行命令。如果命令有错误,错误消息会返回给模型。模型随后可以检查原始请求和新错误消息,并生成新命令。这个过程可以持续到 LLM 成功生成命令或达到终止次数。向模型提供反馈(这里是错误消息)的模式非常强大。使用描述性 system prompt 初始化代理,以自定义其行为:
import { SystemMessage } from "langchain";

const getSystemPrompt = async () =>
  new SystemMessage(`You are a careful SQLite analyst.

Authoritative schema (do not invent columns/tables):
${await getSchema()}

Rules:
- Think step-by-step.
- When you need data, call the tool \`execute_sql\` with ONE SELECT query.
- Read-only only; no INSERT/UPDATE/DELETE/ALTER/DROP/CREATE/REPLACE/TRUNCATE.
- Limit to 5 rows unless user explicitly asks otherwise.
- If the tool returns 'Error:', revise the SQL and try again.
- Limit the number of attempts to 5.
- If you are not successful after 5 attempts, return a note to the user.
- Prefer explicit column lists; avoid SELECT *.
`);
现在,用模型、工具和 prompt 创建代理:
import { createAgent } from "langchain";

let agent = createAgent({
  model: "google-genai:gemini-3.5-flash",
  tools: [executeSql],
  systemPrompt: await getSystemPrompt(),
});
5

运行代理

用示例查询运行代理并观察其行为:
let question = "Which genre, on average, has the longest tracks?";

for await (const step of await agent.stream(
  { messages: [{ role: "user", content: question }] },
  { streamMode: "values" },
)) {
  const message = step.messages.at(-1);
  console.log(`${message.role}: ${JSON.stringify(message.content, null, 2)}`);
}
human: Which genre, on average, has the longest tracks?
ai:
tool: [{"Genre":"Sci Fi & Fantasy","AvgMilliseconds":2911783.0384615385}]
ai: Sci Fi & Fantasy — average track length ≈ 48.5 minutes (about 2,911,783 ms).
代理正确编写了查询、检查了查询,并运行查询以支持最终响应。
你可以在 LangSmith trace 中检查上述运行的各个方面,包括执行的步骤、调用的工具、LLM 看到的 prompts 等。
6

(可选)使用 Studio

Studio 提供“client side”循环和 memory,因此你可以将其作为聊天界面运行并查询数据库。你可以提出诸如 “Tell me the scheme of the database” 或 “Show me the invoices for the 5 top customers” 的问题。你会看到生成的 SQL 命令以及结果输出。下面是启动方式详情。
除了前面提到的包之外,还需要:
npm i -g @langchain/langgraph-cli@latest
在运行目录中,需要一个包含以下内容的 langgraph.json 文件:
{
  "dependencies": ["."],
  "graphs": {
      "agent": "./sqlAgent.ts:agent",
      "graph": "./sqlAgentLanggraph.ts:graph"
  },
  "env": ".env"
}
创建文件 sqlAgent.ts 并插入以下内容:
import fs from "node:fs/promises";
import path from "node:path";
import sqlite3 from "sqlite3";
import { SystemMessage, createAgent, tool } from "langchain";
import * as z from "zod";

const url =
  "https://storage.googleapis.com/benchmarks-artifacts/chinook/Chinook.db";
const localPath = path.resolve("Chinook.db");

async function resolveDbPath() {
  try {
    await fs.access(localPath);
    return localPath;
  } catch {
    // Chinook.db not present locally; download it.
  }
  const resp = await fetch(url);
  if (!resp.ok)
    throw new Error(`Failed to download DB. Status code: ${resp.status}`);
  const buf = Buffer.from(await resp.arrayBuffer());
  await fs.writeFile(localPath, buf);
  return localPath;
}

// Below are minimal tools for demonstration purposes.
async function runQuery(query: string): Promise<Record<string, unknown>[]> {
  const dbPath = await resolveDbPath();
  const db = new sqlite3.Database(dbPath);
  return new Promise((resolve, reject) => {
    db.all(query, [], (err, rows) => {
      db.close();
      if (err) reject(err);
      else resolve(rows as Record<string, unknown>[]);
    });
  });
}

async function getSchema() {
  const tables = await runQuery(
    "SELECT sql FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';",
  );
  return tables.map((row) => String(row.sql)).join("\n\n");
}

const DENY_RE =
  /\b(INSERT|UPDATE|DELETE|ALTER|DROP|CREATE|REPLACE|TRUNCATE)\b/i;
const HAS_LIMIT_TAIL_RE = /\blimit\b\s+\d+(\s*,\s*\d+)?\s*;?\s*$/i;

function sanitizeSqlQuery(q: string) {
  let query = String(q ?? "").trim();

  const semis = [...query].filter((c) => c === ";").length;
  if (semis > 1 || (query.endsWith(";") && query.slice(0, -1).includes(";"))) {
    throw new Error("multiple statements are not allowed.");
  }
  query = query.replace(/;+\s*$/g, "").trim();

  if (!query.toLowerCase().startsWith("select")) {
    throw new Error("Only SELECT statements are allowed");
  }
  if (DENY_RE.test(query)) {
    throw new Error("DML/DDL detected. Only read-only queries are permitted.");
  }

  if (!HAS_LIMIT_TAIL_RE.test(query)) {
    query += " LIMIT 5";
  }
  return query;
}

const executeSql = tool(
  async ({ query }) => {
    const q = sanitizeSqlQuery(query);
    try {
      const result = await runQuery(q);
      return JSON.stringify(result, null, 2);
    } catch (e) {
      const message = e instanceof Error ? e.message : String(e);
      throw new Error(message);
    }
  },
  {
    name: "execute_sql",
    description: "Execute a READ-ONLY SQLite SELECT query and return results.",
    schema: z.object({
      query: z.string().describe("SQLite SELECT query to execute (read-only)."),
    }),
  },
);

const getSystemPrompt = async () =>
  new SystemMessage(`You are a careful SQLite analyst.

Authoritative schema (do not invent columns/tables):
${await getSchema()}

Rules:
- Think step-by-step.
- When you need data, call the tool \`execute_sql\` with ONE SELECT query.
- Read-only only; no INSERT/UPDATE/DELETE/ALTER/DROP/CREATE/REPLACE/TRUNCATE.
- Limit to 5 rows unless user explicitly asks otherwise.
- If the tool returns 'Error:', revise the SQL and try again.
- Limit the number of attempts to 5.
- If you are not successful after 5 attempts, return a note to the user.
- Prefer explicit column lists; avoid SELECT *.
`);

export const agent = createAgent({
  model: "google-genai:gemini-3.5-flash",
  tools: [executeSql],
  systemPrompt: await getSystemPrompt(),
});
7

实现 human-in-the-loop 审核

在执行代理的 SQL 查询之前,检查是否存在任何非预期动作或低效之处是一种谨慎做法。LangChain 代理支持内置 human-in-the-loop middleware,用于为代理工具调用添加监督。配置代理在调用 execute_sql 工具时暂停并等待人工审核:
import { createAgent, humanInTheLoopMiddleware } from "langchain";
import { MemorySaver } from "@langchain/langgraph";

agent = createAgent({
  model: "google-genai:gemini-3.5-flash",
  tools: [executeSql],
  systemPrompt: await getSystemPrompt(),
  middleware: [
    humanInTheLoopMiddleware({
      interruptOn: {
        execute_sql: true,
      },
      descriptionPrefix: "Tool execution pending approval",
    }),
  ],
  checkpointer: new MemorySaver(),
});
已向代理添加 checkpointer,以允许暂停和恢复执行。更多详情以及可用 middleware 配置,请参阅 human-in-the-loop guide
运行代理时,它现在会在执行 execute_sql 工具前暂停等待审核:
question = "Which genre, on average, has the longest tracks?";
const config = { configurable: { thread_id: "1" } };

for await (const step of await agent.stream(
  { messages: [{ role: "user", content: question }] },
  { ...config, streamMode: "values" },
)) {
  if ("__interrupt__" in step) {
    console.log("INTERRUPTED:");
    for (const interrupt of step.__interrupt__) {
      for (const request of interrupt.value.actionRequests) {
        console.log(request.description);
      }
    }
  } else if (step.messages) {
    const message = step.messages.at(-1);
    console.log(`${message.role}: ${JSON.stringify(message.content, null, 2)}`);
  }
}
...

INTERRUPTED:
Tool execution pending approval

Tool: execute_sql
Args: {'query': 'SELECT g.Name AS Genre, AVG(t.Milliseconds) AS AvgTrackLength FROM Track t JOIN Genre g ON t.GenreId = g.GenreId GROUP BY g.Name ORDER BY AvgTrackLength DESC LIMIT 1;'}
可以使用 Command 恢复执行,本例中接受该查询:
import { Command } from "@langchain/langgraph";

for await (const step of await agent.stream(
  new Command({ resume: { decisions: [{ type: "approve" }] } }),
  { ...config, streamMode: "values" },
)) {
  if (step.messages) {
    const message = step.messages.at(-1);
    console.log(`${message.role}: ${JSON.stringify(message.content, null, 2)}`);
  }
  if ("__interrupt__" in step) {
    console.log("INTERRUPTED:");
    for (const interrupt of step.__interrupt__) {
      for (const request of interrupt.value.actionRequests) {
        console.log(request.description);
      }
    }
  }
}
================================== Ai Message ==================================
Tool Calls:
  execute_sql (call_7oz86Epg7lYRqi9rQHbZPS1U)
 Call ID: call_7oz86Epg7lYRqi9rQHbZPS1U
  Args:
    query: SELECT Genre.Name, AVG(Track.Milliseconds) AS AvgDuration FROM Track JOIN Genre ON Track.GenreId = Genre.GenreId GROUP BY Genre.Name ORDER BY AvgDuration DESC LIMIT 5;
================================= Tool Message =================================
Name: execute_sql

[('Sci Fi & Fantasy', 2911783.0384615385), ('Science Fiction', 2625549.076923077), ('Drama', 2575283.78125), ('TV Shows', 2145041.0215053763), ('Comedy', 1585263.705882353)]
================================== Ai Message ==================================

The genre with the longest average track length is "Sci Fi & Fantasy" with an average duration of about 2,911,783 milliseconds, followed by "Science Fiction" and "Drama."
详情请参阅 human-in-the-loop guide

下一步

如需更深入的自定义,请查看本教程,了解如何直接使用 LangGraph primitives 实现 SQL 代理。