消息队列让用户无需等待代理完成当前消息处理,就能快速连续发送多条消息。每条消息都会被立即接受,加入活动线程的队列,并按顺序处理,让你完全了解并控制待处理工作。
This feature requires the LangGraph Agent Server. Run your agent locally with langgraph dev or deploy it to LangSmith to use this pattern.

为什么使用消息队列?

在典型聊天界面中,用户必须等待代理完成响应后才能发送下一条消息。这会在几种场景中造成阻力:
  • 批量问题:用户想一次提出五个相关问题,而不是等待每个答案
  • 后续追问链:在代理仍在工作时提交澄清或额外上下文
  • 自动化测试序列:以编程方式发送一系列 prompt 来验证代理行为
  • 数据录入工作流:逐个输入结构化数据进行处理
消息队列通过立即接受所有提交并按顺序处理它们来解决这个问题。 这是一种代理 UX 原语,而不是表层聊天功能。SDK 会将队列作为流控制器的一部分进行跟踪,因此你的 UI 可以显示待处理工作、取消过期请求,并在当前运行继续时保持输入框可用。

工作原理

当你希望某次提交排在当前正在运行的请求之后等待时,传入 multitaskStrategy: "enqueue"。代理处理期间,排队提交会加入活动线程的队列。当前运行完成后,下一条排队消息会自动分发。 使用对应框架的队列 helper 读取队列状态:
属性类型描述
queue.entriesSubmissionQueueEntry[]所有待处理队列条目的数组
queue.sizenumber当前队列中的条目数
queue.cancel(id)(id: string) => Promise<void>按 ID 取消指定队列条目
queue.clear()() => Promise<void>取消所有队列条目
每个 SubmissionQueueEntry 对象包含:
字段类型描述
idstring该队列条目的唯一标识符
valuesobject已提交的输入值(包括消息)
optionsobject随提交传入的任何额外选项
createdAtstring创建该条目的 ISO 时间戳

设置 useStream

useStream 连接到你的代理,然后搭配对应框架的提交队列 helper。运行正在进行时调用 stream.submit() 发送消息;对需要排在活动请求之后等待的提交传入 multitaskStrategy: "enqueue"。读取 queue.entriesqueue.size 渲染待处理工作,并使用 queue.cancel()queue.clear() 在条目开始处理前移除它们。
The code examples use useStream<typeof myAgent> for type-safe stream state. See Type inference for Python or JavaScript backends.
import { useStream, useSubmissionQueue } from "@langchain/react";

function Chat() {
  const stream = useStream<typeof myAgent>({
    apiUrl: "http://localhost:2024",
    assistantId: "simple_agent",
  });
  const queue = useSubmissionQueue(stream);

  const handleSubmit = (text: string) => {
    stream.submit({
      messages: [{ type: "human", content: text }],
    });
  };

  const pendingCount = queue.size;
  const entries = queue.entries;

  return (
    <div>
      <MessageList messages={stream.messages} />
      {pendingCount > 0 && <QueueList entries={entries} queue={queue} />}
      <ChatInput onSubmit={handleSubmit} />
    </div>
  );
}

显示队列

构建一个 QueueList 组件,用取消按钮显示每条待处理消息。这让用户看到正在等待的内容,并能移除不再需要的条目。
function QueueList({ entries, queue }) {
  return (
    <div className="queue-panel">
      <div className="queue-header">
        <span>Queued messages ({entries.length})</span>
        <button onClick={() => queue.clear()}>Clear all</button>
      </div>
      <ul className="queue-entries">
        {entries.map((entry) => {
          const text = entry.values?.messages?.at(-1)?.content ?? "Pending...";
          return (
            <li key={entry.id} className="queue-entry">
              <span className="queue-text">{text}</span>
              <span className="queue-time">
                {new Date(entry.createdAt).toLocaleTimeString()}
              </span>
              <button
                className="queue-cancel"
                onClick={() => queue.cancel(entry.id)}
              >
                Cancel
              </button>
            </li>
          );
        })}
      </ul>
    </div>
  );
}
将每条排队消息的前几个字符显示为预览,让用户无需阅读完整消息也能快速识别要取消的条目。

取消排队消息

你有两种级别的取消:

取消单个条目

按 ID 从队列中移除指定消息。代理会跳过它并移动到下一个条目。
await queue.cancel(entryId);

清空整个队列

一次移除所有待处理消息。当用户切换上下文或想重新开始时,这很有用。
await queue.clear();
取消队列条目只影响尚未开始处理的消息。如果代理已经在处理某条消息,从队列中取消它不会产生效果。使用 stream.stop() 中断当前运行。

使用 onCreated 串联后续提交

创建新运行时会触发 onCreated 回调,让你可以通过 hook 以编程方式提交后续消息。这适合构建多步工作流,其中下一个问题依赖前一个提交已被接受。
stream.submit(
  { messages: [{ type: "human", content: "What is quantum computing?" }] },
  {
    onCreated(run) {
      console.log("Run created:", run.runId);
      // Chain a follow-up
      stream.submit({
        messages: [{ type: "human", content: "Give me a simple analogy." }],
      });
    },
  }
);
这个模式会自然填充队列。第一条消息会立即开始处理,后续消息会排在它之后。

启动新线程

当用户想开始新对话时,更新传入流的响应式 threadId。传入 null 会清除当前线程绑定;下一次提交会创建新线程。
function NewThreadButton() {
  const [threadId, setThreadId] = useState<string | null>(null);
  const stream = useStream<typeof myAgent>({ threadId, onThreadId: setThreadId });

  return (
    <button onClick={() => setThreadId(null)}>
      New conversation
    </button>
  );
}

最佳实践

  • 限制队列大小:虽然客户端没有硬性队列大小限制,但很大的队列可能降低用户体验。可以考虑在队列超过合理阈值(例如 10 个条目)时显示警告。
  • 显示队列位置:为每个排队条目编号,让用户知道处理顺序。
  • 保留输入焦点:提交后保持输入框聚焦,让用户可以立即输入下一条消息。
  • 添加过渡动画:条目开始处理时,将它们从队列面板平滑移动到消息列表中。
  • 优雅处理错误:如果排队消息失败,显示错误且不阻塞后续队列条目。
  • 对快速提交做防抖:对于自动化或编程式提交,在消息之间添加小延迟,避免压垮服务器。