流式输出与 SSE

返回工程实践

LLM 逐 token 生成文本,流式输出(Streaming)将每个 token 实时推送给客户端,而不是等全部生成完再一次性返回。用户体验从”等待 10 秒看到结果”变为”立即看到文字逐渐出现”。


实现原理

Server-Sent Events(SSE)

HTTP 长连接,服务端主动推送事件流,客户端不需要轮询:

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache

data: {"delta": "你"}

data: {"delta": "好"}

data: {"delta": "!"}

data: [DONE]

格式规则:

  • 每条事件以 data: 开头
  • 事件间以空行分隔
  • [DONE] 标记流结束

服务端实现

Python(FastAPI)

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
import json
 
app = FastAPI()
client = anthropic.Anthropic()
 
@app.post("/chat")
async def chat(prompt: str):
    async def generate():
        with client.messages.stream(
            model="claude-sonnet-4-6",
            max_tokens=1024,
            messages=[{"role": "user", "content": prompt}],
        ) as stream:
            for text in stream.text_stream:
                yield f"data: {json.dumps({'delta': text})}\n\n"
        yield "data: [DONE]\n\n"
 
    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # 禁用 Nginx 缓冲
        }
    )

Node.js(Express)

import express from 'express';
import Anthropic from '@anthropic-ai/sdk';
 
const app = express();
const client = new Anthropic();
 
app.post('/chat', async (req, res) => {
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
 
    const stream = await client.messages.stream({
        model: 'claude-sonnet-4-6',
        max_tokens: 1024,
        messages: [{ role: 'user', content: req.body.prompt }],
    });
 
    for await (const chunk of stream) {
        if (chunk.type === 'content_block_delta') {
            res.write(`data: ${JSON.stringify({ delta: chunk.delta.text })}\n\n`);
        }
    }
    res.write('data: [DONE]\n\n');
    res.end();
});

客户端接收

JavaScript(EventSource)

const evtSource = new EventSource('/chat?prompt=你好');
 
evtSource.onmessage = (event) => {
    if (event.data === '[DONE]') {
        evtSource.close();
        return;
    }
    const { delta } = JSON.parse(event.data);
    document.getElementById('output').textContent += delta;
};
 
evtSource.onerror = () => evtSource.close();

JavaScript(fetch + ReadableStream,支持 POST)

const response = await fetch('/chat', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ prompt: '解释量子计算' }),
});
 
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
 
while (true) {
    const { done, value } = await reader.read();
    if (done) break;
 
    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split('\n');
    buffer = lines.pop(); // 保留未完整的行
 
    for (const line of lines) {
        if (!line.startsWith('data: ')) continue;
        const data = line.slice(6);
        if (data === '[DONE]') return;
        const { delta } = JSON.parse(data);
        outputEl.textContent += delta;
    }
}

常见问题

问题原因解决方案
响应被缓冲,不实时Nginx/代理缓冲设置 X-Accel-Buffering: no
连接自动断开代理超时(通常 60s)设置 proxy_read_timeout 300s
前端收不到事件CORS 未配置允许 Access-Control-Allow-Origin
流中断后如何续接连接断开丢失进度使用 Last-Event-ID 标记断点
一次性结果 vs 流式业务需求批量处理用非流式,避免不必要复杂度

工具调用的流式处理

LLM 流式返回时,工具调用的 JSON 参数是分块到达的,需要累积后再解析:

with client.messages.stream(...) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "input_json_delta":
                # 累积 JSON 片段,不要逐片解析
                json_buffer += event.delta.partial_json
        elif event.type == "content_block_stop":
            # 块结束时才解析完整 JSON
            tool_input = json.loads(json_buffer)
            json_buffer = ""

相关文档