概览

聊天界面一直主导着与 AI 的交互方式,但多模态 AI 的最新突破正在开启令人兴奋的新可能。高质量生成模型和富有表现力的 text-to-speech(TTS)系统,现在让构建更像对话伙伴而不是工具的代理成为可能。 语音代理就是一个例子。你不必依赖键盘和鼠标向代理输入内容,而是可以用口语与其交互。这是一种更自然、更有吸引力的 AI 交互方式,在某些场景中尤其有用。

什么是语音代理?

语音代理是能够与用户进行自然语音对话的 agents。这些代理结合语音识别、自然语言处理、生成式 AI 和 text-to-speech 技术,创建顺畅自然的对话。 它们适用于多种用例,包括:
  • 客户支持
  • 个人助手
  • 免手动界面
  • 指导和培训

语音代理如何工作?

从高层来看,每个语音代理都需要处理三项任务:
  1. :捕获音频并转写
  2. 思考:解释意图、推理、规划
  3. :生成音频并将其流式传回用户
差异在于这些步骤如何排序和耦合。实践中,生产代理通常遵循两种主要架构之一:

1. STT > Agent > TTS 架构(“三明治”)

三明治架构由三个不同组件组成:speech-to-text(STT)、基于文本的 LangChain agent,以及 text-to-speech(TTS)。 优点:
  • 完全控制每个组件(可按需替换 STT/TTS providers)
  • 访问现代文本模态模型的最新能力
  • 组件边界清晰,行为透明
缺点:
  • 需要编排多个服务
  • 管理管线时有额外复杂度
  • 从语音转换为文本会丢失信息(例如语调、情绪)

2. Speech-to-Speech 架构(S2S)

Speech-to-speech 使用一个原生处理音频输入并生成音频输出的多模态模型。 优点:
  • 架构更简单,移动部件更少
  • 对简单交互而言通常延迟更低
  • 直接音频处理可以捕获语调和其他语音细微差别
缺点:
  • 模型选择有限,provider lock-in 风险更高
  • 功能可能落后于文本模态模型
  • 音频处理方式透明度较低
  • 可控性和自定义选项减少
本指南演示三明治架构,以平衡性能、可控性和对现代模型能力的访问。使用某些 STT 和 TTS providers 时,三明治架构可以在保持模块化组件控制权的同时实现低于 700ms 的延迟。

演示应用概览

下面将使用三明治架构构建一个基于语音的代理。该代理会管理三明治店订单。应用会演示三明治架构的全部三个组件,使用 AssemblyAI 进行 STT,并使用 Cartesia 进行 TTS(不过也可以为大多数 providers 构建 adapters)。 端到端参考应用可在 voice-sandwich-demo 仓库中找到。这里将走查该应用。 演示使用 WebSockets 在浏览器和服务器之间进行实时双向通信。同一架构也可以适配其他传输方式,例如电话系统(Twilio、Vonage)或 WebRTC 连接。

Architecture

演示实现了一个流式管线,每个阶段都会异步处理数据: 客户端(浏览器)
  • 捕获麦克风音频并编码为 PCM
  • 建立到后端服务器的 WebSocket 连接
  • 将音频 chunks 实时流式传输到服务器
  • 接收并播放合成语音音频
服务器(Python)
  • 接收来自客户端的 WebSocket 连接
  • 编排三步管线:
    • Speech-to-text (STT):将音频转发给 STT provider(例如 AssemblyAI),接收 transcript events
    • Agent:用 LangChain agent 处理 transcripts,并流式传输响应 tokens
    • Text-to-speech (TTS):将代理响应发送给 TTS provider(例如 Cartesia),接收音频 chunks
  • 将合成音频返回给客户端播放
该管线使用 async generators 在每个阶段启用流式传输。这让下游组件可以在上游阶段完成前开始处理,从而最小化端到端延迟。

设置

详细安装和设置说明请参阅 repository README

1. Speech-to-text

STT 阶段会将传入音频流转换为文本 transcripts。该实现使用 producer-consumer pattern 并发处理音频流式传输和 transcript 接收。

核心概念

Producer-Consumer Pattern:音频 chunks 会在接收 transcript events 的同时发送给 STT 服务。这使转写可以在所有音频到达前开始。 事件类型
  • stt_chunk:STT 服务处理音频时提供的部分 transcripts
  • stt_output:触发代理处理的最终格式化 transcripts
WebSocket 连接:维护到 AssemblyAI 实时 STT API 的持久连接,并配置为 16kHz PCM 音频和自动 turn formatting。

实现

from typing import AsyncIterator
import asyncio
from assemblyai_stt import AssemblyAISTT
from events import VoiceAgentEvent

async def stt_stream(
    audio_stream: AsyncIterator[bytes],
) -> AsyncIterator[VoiceAgentEvent]:
    """
    Transform stream: Audio (Bytes) → Voice Events (VoiceAgentEvent)

    Uses a producer-consumer pattern where:
    - Producer: Reads audio chunks and sends them to AssemblyAI
    - Consumer: Receives transcription events from AssemblyAI
    """
    stt = AssemblyAISTT(sample_rate=16000)

    async def send_audio():
        """Background task that pumps audio chunks to AssemblyAI."""
        try:
            async for audio_chunk in audio_stream:
                await stt.send_audio(audio_chunk)
        finally:
            # Signal completion when audio stream ends
            await stt.close()

    # Launch audio sending in background
    send_task = asyncio.create_task(send_audio())

    try:
        # Receive and yield transcription events as they arrive
        async for event in stt.receive_events():
            yield event
    finally:
        # Cleanup
        with contextlib.suppress(asyncio.CancelledError):
            send_task.cancel()
            await send_task
        await stt.close()
应用实现了一个 AssemblyAI client,用于管理 WebSocket 连接和消息解析。实现见下文;也可以为其他 STT providers 构建类似 adapters。
class AssemblyAISTT:
    def __init__(self, api_key: str | None = None, sample_rate: int = 16000):
        self.api_key = api_key or os.getenv("ASSEMBLYAI_API_KEY")
        self.sample_rate = sample_rate
        self._ws: WebSocketClientProtocol | None = None

    async def send_audio(self, audio_chunk: bytes) -> None:
        """Send PCM audio bytes to AssemblyAI."""
        ws = await self._ensure_connection()
        await ws.send(audio_chunk)

    async def receive_events(self) -> AsyncIterator[STTEvent]:
        """Yield STT events as they arrive from AssemblyAI."""
        async for raw_message in self._ws:
            message = json.loads(raw_message)

            if message["type"] == "Turn":
                # Final formatted transcript
                if message.get("turn_is_formatted"):
                    yield STTOutputEvent.create(message["transcript"])
                # Partial transcript
                else:
                    yield STTChunkEvent.create(message["transcript"])

    async def _ensure_connection(self) -> WebSocketClientProtocol:
        """Establish WebSocket connection if not already connected."""
        if self._ws is None:
            url = f"wss://streaming.assemblyai.com/v3/ws?sample_rate={self.sample_rate}&format_turns=true"
            self._ws = await websockets.connect(
                url,
                additional_headers={"Authorization": self.api_key}
            )
        return self._ws

2. LangChain agent

代理阶段会通过 LangChain agent 处理文本 transcripts,并流式传输响应 tokens。在本例中,会流式传输代理生成的所有 text content blocks

核心概念

流式响应:代理使用 stream_mode="messages" 在响应 tokens 生成时发出它们,而不是等待完整响应。这让 TTS 阶段可以立即开始合成。 对话记忆:使用唯一 thread ID,checkpointer 会跨轮次维护对话状态。这让代理可以引用对话中的先前交流。

实现

from langchain_core.utils.uuid import uuid7
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langgraph.checkpoint.memory import InMemorySaver

# Define agent tools
def add_to_order(item: str, quantity: int) -> str:
    """Add an item to the customer's sandwich order."""
    return f"Added {quantity} x {item} to the order."

def confirm_order(order_summary: str) -> str:
    """Confirm the final order with the customer."""
    return f"Order confirmed: {order_summary}. Sending to kitchen."

# Create agent with tools and memory
agent = create_agent(
    model="google_genai:gemini-3.5-flash",  # Select your model
    tools=[add_to_order, confirm_order],
    system_prompt="""You are a helpful sandwich shop assistant.
    Your goal is to take the user's order. Be concise and friendly.
    Do NOT use emojis, special characters, or markdown.
    Your responses will be read by a text-to-speech engine.""",
    checkpointer=InMemorySaver(),
)

async def agent_stream(
    event_stream: AsyncIterator[VoiceAgentEvent],
) -> AsyncIterator[VoiceAgentEvent]:
    """
    Transform stream: Voice Events → Voice Events (with Agent Responses)

    Passes through all upstream events and adds agent_chunk events
    when processing STT transcripts.
    """
    # Generate unique thread ID for conversation memory
    thread_id = str(uuid7())

    async for event in event_stream:
        # Pass through all upstream events
        yield event

        # Process final transcripts through the agent
        if event.type == "stt_output":
            # Stream agent response with conversation context
            stream = agent.astream(
                {"messages": [HumanMessage(content=event.transcript)]},
                {"configurable": {"thread_id": thread_id}},
                stream_mode="messages",
            )

            # Yield agent response chunks as they arrive
            async for message, _ in stream:
                if message.text:
                    yield AgentChunkEvent.create(message.text)

3. Text-to-speech

TTS 阶段会将代理响应文本合成为音频,并将其流式传回客户端。与 STT 阶段一样,它使用 producer-consumer pattern 并发处理文本发送和音频接收。

核心概念

并发处理:该实现会合并两个异步流:
  • 上游处理:透传所有事件,并将代理文本 chunks 发送给 TTS provider
  • 音频接收:从 TTS provider 接收合成音频 chunks
流式 TTS:有些 providers(例如 Cartesia)在收到文本后就会开始合成音频,从而让音频播放可以在代理生成完整响应前开始。 事件透传:所有上游事件都会原样流过,让客户端或其他观察者可以跟踪完整管线状态。

实现

from cartesia_tts import CartesiaTTS
from utils import merge_async_iters

async def tts_stream(
    event_stream: AsyncIterator[VoiceAgentEvent],
) -> AsyncIterator[VoiceAgentEvent]:
    """
    Transform stream: Voice Events → Voice Events (with Audio)

    Merges two concurrent streams:
    1. process_upstream(): passes through events and sends text to Cartesia
    2. tts.receive_events(): yields audio chunks from Cartesia
    """
    tts = CartesiaTTS()

    async def process_upstream() -> AsyncIterator[VoiceAgentEvent]:
        """Process upstream events and send agent text to Cartesia."""
        async for event in event_stream:
            # Pass through all events
            yield event
            # Send agent text to Cartesia for synthesis
            if event.type == "agent_chunk":
                await tts.send_text(event.text)

    try:
        # Merge upstream events with TTS audio events
        # Both streams run concurrently
        async for event in merge_async_iters(
            process_upstream(),
            tts.receive_events()
        ):
            yield event
    finally:
        await tts.close()
应用实现了一个 Cartesia client,用于管理 WebSocket 连接和音频流式传输。实现见下文;也可以为其他 TTS providers 构建类似 adapters。
import base64
import json
import websockets

class CartesiaTTS:
    def __init__(
        self,
        api_key: Optional[str] = None,
        voice_id: str = "f6ff7c0c-e396-40a9-a70b-f7607edb6937",
        model_id: str = "sonic-3",
        sample_rate: int = 24000,
        encoding: str = "pcm_s16le",
    ):
        self.api_key = api_key or os.getenv("CARTESIA_API_KEY")
        self.voice_id = voice_id
        self.model_id = model_id
        self.sample_rate = sample_rate
        self.encoding = encoding
        self._ws: WebSocketClientProtocol | None = None

    def _generate_context_id(self) -> str:
        """Generate a valid context_id for Cartesia."""
        timestamp = int(time.time() * 1000)
        counter = self._context_counter
        self._context_counter += 1
        return f"ctx_{timestamp}_{counter}"

    async def send_text(self, text: str | None) -> None:
        """Send text to Cartesia for synthesis."""
        if not text or not text.strip():
            return

        ws = await self._ensure_connection()
        payload = {
            "model_id": self.model_id,
            "transcript": text,
            "voice": {
                "mode": "id",
                "id": self.voice_id,
            },
            "output_format": {
                "container": "raw",
                "encoding": self.encoding,
                "sample_rate": self.sample_rate,
            },
            "language": self.language,
            "context_id": self._generate_context_id(),
        }
        await ws.send(json.dumps(payload))

    async def receive_events(self) -> AsyncIterator[TTSChunkEvent]:
        """Yield audio chunks as they arrive from Cartesia."""
        async for raw_message in self._ws:
            message = json.loads(raw_message)

            # Decode and yield audio chunks
            if "data" in message and message["data"]:
                audio_chunk = base64.b64decode(message["data"])
                if audio_chunk:
                    yield TTSChunkEvent.create(audio_chunk)

    async def _ensure_connection(self) -> WebSocketClientProtocol:
        """Establish WebSocket connection if not already connected."""
        if self._ws is None:
            url = (
                f"wss://api.cartesia.ai/tts/websocket"
                f"?api_key={self.api_key}&cartesia_version={self.cartesia_version}"
            )
            self._ws = await websockets.connect(url)

        return self._ws

LangSmith

你使用 LangChain 构建的许多应用会包含多个步骤,并多次调用 LLM。随着这些应用越来越复杂,能够检查链或代理内部究竟发生了什么会变得至关重要。最好的方式是使用 LangSmith 在上方链接注册后,请设置环境变量以开始记录 traces:
export LANGSMITH_TRACING="true"
export LANGSMITH_API_KEY="..."
或者,在 Python 中设置:
import getpass
import os

os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_API_KEY"] = getpass.getpass()

组合所有部分

完整管线会将三个阶段串联起来:
from langchain_core.runnables import RunnableGenerator

pipeline = (
    RunnableGenerator(stt_stream)      # Audio → STT events
    | RunnableGenerator(agent_stream)  # STT events → Agent events
    | RunnableGenerator(tts_stream)    # Agent events → TTS audio
)

# Use in WebSocket endpoint
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()

    async def websocket_audio_stream():
        """Yield audio bytes from WebSocket."""
        while True:
            data = await websocket.receive_bytes()
            yield data

    # Transform audio through pipeline
    output_stream = pipeline.atransform(websocket_audio_stream())

    # Send TTS audio back to client
    async for event in output_stream:
        if event.type == "tts_chunk":
            await websocket.send_bytes(event.audio)
这里使用 RunnableGenerators 组合管线的每个步骤。这是 LangChain 内部用于管理跨组件 streaming 的抽象。 每个阶段都会独立且并发处理事件:音频一到达就开始转写,transcript 可用后代理立即开始推理,代理文本生成后语音合成立即开始。该架构可以实现低于 700ms 的延迟,以支持自然对话。 更多使用 LangChain 构建代理的内容,请参阅 Agents guide