概览

聊天界面一直主导着与 AI 的交互方式,但多模态 AI 的最新突破正在开启令人兴奋的新可能。高质量生成模型和富有表现力的 text-to-speech(TTS)系统,现在让构建更像对话伙伴而不是工具的代理成为可能。 语音代理就是一个例子。你不必依赖键盘和鼠标向代理输入内容,而是可以用口语与其交互。这是一种更自然、更有吸引力的 AI 交互方式,在某些场景中尤其有用。

什么是语音代理?

语音代理是能够与用户进行自然语音对话的 agents。这些代理结合语音识别、自然语言处理、生成式 AI 和 text-to-speech 技术,创建顺畅自然的对话。 它们适用于多种用例,包括:
  • 客户支持
  • 个人助手
  • 免手动界面
  • 指导和培训

语音代理如何工作?

从高层来看,每个语音代理都需要处理三项任务:
  1. :捕获音频并转写
  2. 思考:解释意图、推理、规划
  3. :生成音频并将其流式传回用户
差异在于这些步骤如何排序和耦合。实践中,生产代理通常遵循两种主要架构之一:

1. STT > Agent > TTS 架构(“三明治”)

三明治架构由三个不同组件组成:speech-to-text(STT)、基于文本的 LangChain agent,以及 text-to-speech(TTS)。 优点:
  • 完全控制每个组件(可按需替换 STT/TTS providers)
  • 访问现代文本模态模型的最新能力
  • 组件边界清晰,行为透明
缺点:
  • 需要编排多个服务
  • 管理管线时有额外复杂度
  • 从语音转换为文本会丢失信息(例如语调、情绪)

2. Speech-to-Speech 架构(S2S)

Speech-to-speech 使用一个原生处理音频输入并生成音频输出的多模态模型。 优点:
  • 架构更简单,移动部件更少
  • 对简单交互而言通常延迟更低
  • 直接音频处理可以捕获语调和其他语音细微差别
缺点:
  • 模型选择有限,provider lock-in 风险更高
  • 功能可能落后于文本模态模型
  • 音频处理方式透明度较低
  • 可控性和自定义选项减少
本指南演示三明治架构,以平衡性能、可控性和对现代模型能力的访问。使用某些 STT 和 TTS providers 时,三明治架构可以在保持模块化组件控制权的同时实现低于 700ms 的延迟。

演示应用概览

下面将使用三明治架构构建一个基于语音的代理。该代理会管理三明治店订单。应用会演示三明治架构的全部三个组件,使用 AssemblyAI 进行 STT,并使用 Cartesia 进行 TTS(不过也可以为大多数 providers 构建 adapters)。 端到端参考应用可在 voice-sandwich-demo 仓库中找到。这里将走查该应用。 演示使用 WebSockets 在浏览器和服务器之间进行实时双向通信。同一架构也可以适配其他传输方式,例如电话系统(Twilio、Vonage)或 WebRTC 连接。

Architecture

演示实现了一个流式管线,每个阶段都会异步处理数据: 客户端(浏览器)
  • 捕获麦克风音频并编码为 PCM
  • 建立到后端服务器的 WebSocket 连接
  • 将音频 chunks 实时流式传输到服务器
  • 接收并播放合成语音音频
服务器(Node.js)
  • 接收来自客户端的 WebSocket 连接
  • 编排三步管线:
    • Speech-to-text (STT):将音频转发给 STT provider(例如 AssemblyAI),接收 transcript events
    • Agent:用 LangChain agent 处理 transcripts,并流式传输响应 tokens
    • Text-to-speech (TTS):将代理响应发送给 TTS provider(例如 Cartesia),接收音频 chunks
  • 将合成音频返回给客户端播放
该管线使用 async iterators 在每个阶段启用流式传输。这让下游组件可以在上游阶段完成前开始处理,从而最小化端到端延迟。

设置

详细安装和设置说明请参阅 repository README

1. Speech-to-text

STT 阶段会将传入音频流转换为文本 transcripts。该实现使用 producer-consumer pattern 并发处理音频流式传输和 transcript 接收。

核心概念

Producer-Consumer Pattern:音频 chunks 会在接收 transcript events 的同时发送给 STT 服务。这使转写可以在所有音频到达前开始。 事件类型
  • stt_chunk:STT 服务处理音频时提供的部分 transcripts
  • stt_output:触发代理处理的最终格式化 transcripts
WebSocket 连接:维护到 AssemblyAI 实时 STT API 的持久连接,并配置为 16kHz PCM 音频和自动 turn formatting。

实现

import { AssemblyAISTT } from "./assemblyai";
import type { VoiceAgentEvent } from "./types";

async function* sttStream(
  audioStream: AsyncIterable<Uint8Array>
): AsyncGenerator<VoiceAgentEvent> {
  const stt = new AssemblyAISTT({ sampleRate: 16000 });
  const passthrough = writableIterator<VoiceAgentEvent>();

  // Producer: pump audio chunks to AssemblyAI
  const producer = (async () => {
    try {
      for await (const audioChunk of audioStream) {
        await stt.sendAudio(audioChunk);
      }
    } finally {
      await stt.close();
    }
  })();

  // Consumer: receive transcription events
  const consumer = (async () => {
    for await (const event of stt.receiveEvents()) {
      passthrough.push(event);
    }
  })();

  try {
    // Yield events as they arrive
    yield* passthrough;
  } finally {
    // Wait for producer and consumer to complete
    await Promise.all([producer, consumer]);
  }
}
应用实现了一个 AssemblyAI client,用于管理 WebSocket 连接和消息解析。实现见下文;也可以为其他 STT providers 构建类似 adapters。
export class AssemblyAISTT {
  protected _bufferIterator = writableIterator<VoiceAgentEvent.STTEvent>();
  protected _connectionPromise: Promise<WebSocket> | null = null;

  async sendAudio(buffer: Uint8Array): Promise<void> {
    const conn = await this._connection;
    conn.send(buffer);
  }

  async *receiveEvents(): AsyncGenerator<VoiceAgentEvent.STTEvent> {
    yield* this._bufferIterator;
  }

  protected get _connection(): Promise<WebSocket> {
    if (this._connectionPromise) return this._connectionPromise;

    this._connectionPromise = new Promise((resolve, reject) => {
      const params = new URLSearchParams({
        sample_rate: this.sampleRate.toString(),
        format_turns: "true",
      });
      const url = `wss://streaming.assemblyai.com/v3/ws?${params}`;
      const ws = new WebSocket(url, {
        headers: { Authorization: this.apiKey },
      });

      ws.on("open", () => resolve(ws));

      ws.on("message", (data) => {
        const message = JSON.parse(data.toString());
        if (message.type === "Turn") {
          if (message.turn_is_formatted) {
            this._bufferIterator.push({
              type: "stt_output",
              transcript: message.transcript,
              ts: Date.now()
            });
          } else {
            this._bufferIterator.push({
              type: "stt_chunk",
              transcript: message.transcript,
              ts: Date.now()
            });
          }
        }
      });
    });

    return this._connectionPromise;
  }
}

2. LangChain agent

代理阶段会通过 LangChain agent 处理文本 transcripts,并流式传输响应 tokens。在本例中,会流式传输代理生成的所有 text content blocks

核心概念

流式响应:代理使用 stream_mode="messages" 在响应 tokens 生成时发出它们,而不是等待完整响应。这让 TTS 阶段可以立即开始合成。 对话记忆:使用唯一 thread ID,checkpointer 会跨轮次维护对话状态。这让代理可以引用对话中的先前交流。

实现

import { createAgent } from "langchain";
import { HumanMessage } from "@langchain/core/messages";
import { MemorySaver } from "@langchain/langgraph";
import { tool } from "@langchain/core/tools";
import { z } from "zod";

// Define agent tools
const addToOrder = tool(
  async ({ item, quantity }) => {
    return `Added ${quantity} x ${item} to the order.`;
  },
  {
    name: "add_to_order",
    description: "Add an item to the customer's sandwich order.",
    schema: z.object({
      item: z.string(),
      quantity: z.number(),
    }),
  }
);

const confirmOrder = tool(
  async ({ orderSummary }) => {
    return `Order confirmed: ${orderSummary}. Sending to kitchen.`;
  },
  {
    name: "confirm_order",
    description: "Confirm the final order with the customer.",
    schema: z.object({
      orderSummary: z.string().describe("Summary of the order"),
    }),
  }
);

// Create agent with tools and memory
const agent = createAgent({
  model: "claude-haiku-4-5",
  tools: [addToOrder, confirmOrder],
  checkpointer: new MemorySaver(),
  systemPrompt: `You are a helpful sandwich shop assistant.
Your goal is to take the user's order. Be concise and friendly.
Do NOT use emojis, special characters, or markdown.
Your responses will be read by a text-to-speech engine.`,
});

async function* agentStream(
  eventStream: AsyncIterable<VoiceAgentEvent>
): AsyncGenerator<VoiceAgentEvent> {
  // Generate unique thread ID for conversation memory
  const threadId = crypto.randomUUID();

  for await (const event of eventStream) {
    // Pass through all upstream events
    yield event;

    // Process final transcripts through the agent
    if (event.type === "stt_output") {
      const stream = await agent.stream(
        { messages: [new HumanMessage(event.transcript)] },
        {
          configurable: { thread_id: threadId },
          streamMode: "messages",
        }
      );

      // Yield agent response chunks as they arrive
      for await (const [message] of stream) {
        yield { type: "agent_chunk", text: message.text, ts: Date.now() };
      }
    }
  }
}

3. Text-to-speech

TTS 阶段会将代理响应文本合成为音频,并将其流式传回客户端。与 STT 阶段一样,它使用 producer-consumer pattern 并发处理文本发送和音频接收。

核心概念

并发处理:该实现会合并两个异步流:
  • 上游处理:透传所有事件,并将代理文本 chunks 发送给 TTS provider
  • 音频接收:从 TTS provider 接收合成音频 chunks
流式 TTS:有些 providers(例如 Cartesia)在收到文本后就会开始合成音频,从而让音频播放可以在代理生成完整响应前开始。 事件透传:所有上游事件都会原样流过,让客户端或其他观察者可以跟踪完整管线状态。

实现

import { CartesiaTTS } from "./cartesia";

async function* ttsStream(
  eventStream: AsyncIterable<VoiceAgentEvent>
): AsyncGenerator<VoiceAgentEvent> {
  const tts = new CartesiaTTS();
  const passthrough = writableIterator<VoiceAgentEvent>();

  // Producer: read upstream events and send text to Cartesia
  const producer = (async () => {
    try {
      for await (const event of eventStream) {
        passthrough.push(event);
        if (event.type === "agent_chunk") {
          await tts.sendText(event.text);
        }
      }
    } finally {
      await tts.close();
    }
  })();

  // Consumer: receive audio from Cartesia
  const consumer = (async () => {
    for await (const event of tts.receiveEvents()) {
      passthrough.push(event);
    }
  })();

  try {
    // Yield events from both producer and consumer
    yield* passthrough;
  } finally {
    await Promise.all([producer, consumer]);
  }
}
应用实现了一个 Cartesia client,用于管理 WebSocket 连接和音频流式传输。实现见下文;也可以为其他 TTS providers 构建类似 adapters。
export class CartesiaTTS {
  protected _bufferIterator = writableIterator<VoiceAgentEvent.TTSEvent>();
  protected _connectionPromise: Promise<WebSocket> | null = null;

  async sendText(text: string | null): Promise<void> {
    if (!text || !text.trim()) return;

    const conn = await this._connection;
    const payload = { text, try_trigger_generation: false };
    conn.send(JSON.stringify(payload));
  }

  async *receiveEvents(): AsyncGenerator<VoiceAgentEvent.TTSEvent> {
    yield* this._bufferIterator;
  }

  protected _generateContextId(): string {
    const timestamp = Date.now();
    const counter = this._contextCounter++;
    return `ctx_${timestamp}_${counter}`;
  }

  protected get _connection(): Promise<WebSocket> {
    if (this._connectionPromise) return this._connectionPromise;

    this._connectionPromise = new Promise((resolve, reject) => {
      const params = new URLSearchParams({
        api_key: this.apiKey,
        cartesia_version: this.cartesiaVersion,
      });
      const url = `wss://api.cartesia.ai/tts/websocket?${params.toString()}`;
      const ws = new WebSocket(url);

      ws.on("open", () => {
        resolve(ws);
      });

      ws.on("message", (data: WebSocket.RawData) => {
        const message: CartesiaTTSResponse = JSON.parse(data.toString());
        if (message.data) {
          this._bufferIterator.push({
            type: "tts_chunk",
            audio: message.data,
            ts: Date.now(),
          });
        } else if (message.error) {
          throw new Error(`Cartesia error: ${message.error}`);
        }
      });
    });

    return this._connectionPromise;
  }
}

LangSmith

你使用 LangChain 构建的许多应用会包含多个步骤,并多次调用 LLM。随着这些应用越来越复杂,能够检查链或代理内部究竟发生了什么会变得至关重要。最好的方式是使用 LangSmith 在上方链接注册后,请设置环境变量以开始记录 traces:
export LANGSMITH_TRACING="true"
export LANGSMITH_API_KEY="..."

组合所有部分

完整管线会将三个阶段串联起来:
// using https://hono.dev/
app.get("/ws", upgradeWebSocket(async () => {
  const inputStream = writableIterator<Uint8Array>();

  // Chain the three stages
  const transcriptEventStream = sttStream(inputStream);
  const agentEventStream = agentStream(transcriptEventStream);
  const outputEventStream = ttsStream(agentEventStream);

  // Process pipeline and send TTS audio to client
  const flushPromise = (async () => {
    for await (const event of outputEventStream) {
      if (event.type === "tts_chunk") {
        currentSocket?.send(event.audio);
      }
    }
  })();

  return {
    onMessage(event) {
      // Push incoming audio into pipeline
      const data = event.data;
      if (Buffer.isBuffer(data)) {
        inputStream.push(new Uint8Array(data));
      }
    },
    async onClose() {
      inputStream.cancel();
      await flushPromise;
    },
  };
}));
每个阶段都会独立且并发处理事件:音频一到达就开始转写,transcript 可用后代理立即开始推理,代理文本生成后语音合成立即开始。该架构可以实现低于 700ms 的延迟,以支持自然对话。 更多使用 LangChain 构建代理的内容,请参阅 Agents guide