流式输出与 SSE
→ 返回工程实践
LLM 逐 token 生成文本,流式输出(Streaming)将每个 token 实时推送给客户端,而不是等全部生成完再一次性返回。用户体验从”等待 10 秒看到结果”变为”立即看到文字逐渐出现”。
实现原理
Server-Sent Events(SSE)
HTTP 长连接,服务端主动推送事件流,客户端不需要轮询:
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
data: {"delta": "你"}
data: {"delta": "好"}
data: {"delta": "!"}
data: [DONE]
格式规则:
- 每条事件以
data:开头 - 事件间以空行分隔
[DONE]标记流结束
服务端实现
Python(FastAPI)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
import json
app = FastAPI()
client = anthropic.Anthropic()
@app.post("/chat")
async def chat(prompt: str):
async def generate():
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
) as stream:
for text in stream.text_stream:
yield f"data: {json.dumps({'delta': text})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # 禁用 Nginx 缓冲
}
)Node.js(Express)
import express from 'express';
import Anthropic from '@anthropic-ai/sdk';
const app = express();
const client = new Anthropic();
app.post('/chat', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const stream = await client.messages.stream({
model: 'claude-sonnet-4-6',
max_tokens: 1024,
messages: [{ role: 'user', content: req.body.prompt }],
});
for await (const chunk of stream) {
if (chunk.type === 'content_block_delta') {
res.write(`data: ${JSON.stringify({ delta: chunk.delta.text })}\n\n`);
}
}
res.write('data: [DONE]\n\n');
res.end();
});客户端接收
JavaScript(EventSource)
const evtSource = new EventSource('/chat?prompt=你好');
evtSource.onmessage = (event) => {
if (event.data === '[DONE]') {
evtSource.close();
return;
}
const { delta } = JSON.parse(event.data);
document.getElementById('output').textContent += delta;
};
evtSource.onerror = () => evtSource.close();JavaScript(fetch + ReadableStream,支持 POST)
const response = await fetch('/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ prompt: '解释量子计算' }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop(); // 保留未完整的行
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const data = line.slice(6);
if (data === '[DONE]') return;
const { delta } = JSON.parse(data);
outputEl.textContent += delta;
}
}常见问题
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 响应被缓冲,不实时 | Nginx/代理缓冲 | 设置 X-Accel-Buffering: no |
| 连接自动断开 | 代理超时(通常 60s) | 设置 proxy_read_timeout 300s |
| 前端收不到事件 | CORS 未配置 | 允许 Access-Control-Allow-Origin |
| 流中断后如何续接 | 连接断开丢失进度 | 使用 Last-Event-ID 标记断点 |
| 一次性结果 vs 流式 | 业务需求 | 批量处理用非流式,避免不必要复杂度 |
工具调用的流式处理
LLM 流式返回时,工具调用的 JSON 参数是分块到达的,需要累积后再解析:
with client.messages.stream(...) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "input_json_delta":
# 累积 JSON 片段,不要逐片解析
json_buffer += event.delta.partial_json
elif event.type == "content_block_stop":
# 块结束时才解析完整 JSON
tool_input = json.loads(json_buffer)
json_buffer = ""