SSE 服务端推送
SSE(Server-Sent Events)是基于 HTTP 的服务端到客户端单向实时推送协议。相比 WebSocket,SSE 更轻量、复用 HTTP/2 多路复用、断线自动重连,适合通知、进度、实时数据流等单向推送场景。
SSE vs WebSocket
| 特性 | SSE | WebSocket |
|---|---|---|
| 方向 | 单向(服务端 → 客户端) | 双向 |
| 协议 | HTTP/HTTPS | ws/wss |
| 断线重连 | 浏览器自动 | 需手动实现 |
| 消息格式 | 纯文本(UTF-8) | 文本 / 二进制 |
| 代理穿透 | 更好(标准 HTTP) | 需代理支持 |
| 适用场景 | 通知、进度、日志流 | 聊天、游戏、协同编辑 |
STOMP协议 构建在 WebSocket 之上,提供更丰富的消息路由能力。
Spring MVC 实现
SseEmitter
@RestController
@RequestMapping("/sse")
public class SseController {
// 保存所有活跃连接
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
@GetMapping(value = "/subscribe/{clientId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribe(@PathVariable String clientId) {
// 超时时间设为 0 表示永不超时;也可设置具体毫秒数
SseEmitter emitter = new SseEmitter(0L);
emitters.put(clientId, emitter);
// 连接关闭 / 超时时清理
emitter.onCompletion(() -> emitters.remove(clientId));
emitter.onTimeout(() -> {
emitter.complete();
emitters.remove(clientId);
});
emitter.onError(e -> emitters.remove(clientId));
return emitter;
}
// 向指定客户端推送消息
public void sendToClient(String clientId, String eventName, Object data) {
SseEmitter emitter = emitters.get(clientId);
if (emitter == null) return;
try {
emitter.send(SseEmitter.event()
.name(eventName)
.data(data, MediaType.APPLICATION_JSON)
.id(UUID.randomUUID().toString())
.reconnectTime(3000));
} catch (IOException e) {
emitters.remove(clientId);
emitter.completeWithError(e);
}
}
// 广播给所有客户端
public void broadcast(String eventName, Object data) {
emitters.forEach((id, emitter) -> {
try {
emitter.send(SseEmitter.event().name(eventName).data(data));
} catch (IOException e) {
emitters.remove(id);
}
});
}
}SSE 事件格式
id: 550e8400-e29b\n
event: order-update\n
data: {"orderId":123,"status":"SHIPPED"}\n
retry: 3000\n
\n
| 字段 | 说明 |
|---|---|
id | 事件 ID,断线重连时通过 Last-Event-ID 请求头发送,用于消息重放 |
event | 事件类型名,客户端可按类型监听 |
data | 数据,多行则拼接(每行一个 data: 字段) |
retry | 断线后重连间隔(毫秒) |
实际场景示例
任务进度推送
@Service
@RequiredArgsConstructor
public class TaskProgressService {
private final SseController sseController;
@Async
public void processTask(String taskId, String clientId) {
try {
for (int i = 0; i <= 100; i += 10) {
Thread.sleep(500);
sseController.sendToClient(clientId, "progress",
Map.of("taskId", taskId, "progress", i));
}
sseController.sendToClient(clientId, "complete",
Map.of("taskId", taskId, "result", "SUCCESS"));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}配合 @Async 在线程池中执行,避免阻塞。
结合 Spring 事件机制
Spring ApplicationEvent 与 SSE 结合,实现解耦推送:
// 1. 定义业务事件
public record OrderShippedEvent(Long orderId, String clientId) {}
// 2. 业务层发布事件
@Service
@RequiredArgsConstructor
public class OrderService {
private final ApplicationEventPublisher publisher;
public void shipOrder(Long orderId, String clientId) {
// ... 业务逻辑
publisher.publishEvent(new OrderShippedEvent(orderId, clientId));
}
}
// 3. 监听事件,推送 SSE
@Component
@RequiredArgsConstructor
public class SseEventHandler {
private final SseController sseController;
@EventListener
public void onOrderShipped(OrderShippedEvent event) {
sseController.sendToClient(event.clientId(), "order-shipped",
Map.of("orderId", event.orderId()));
}
}Spring WebFlux 实现(响应式)
WebFlux 天然支持 SSE,用 Flux<ServerSentEvent<T>> 表示无限数据流:
@RestController
@RequestMapping("/sse")
public class ReactiveSseController {
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Map<String, Object>>> stream() {
return Flux.interval(Duration.ofSeconds(1))
.map(seq -> ServerSentEvent.<Map<String, Object>>builder()
.id(String.valueOf(seq))
.event("tick")
.data(Map.of("seq", seq, "time", Instant.now()))
.build());
}
// 与业务 Flux 绑定
@GetMapping(value = "/prices/{symbol}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<PriceUpdate>> priceStream(@PathVariable String symbol) {
return priceService.getPriceFlux(symbol)
.map(price -> ServerSentEvent.<PriceUpdate>builder()
.event("price")
.data(price)
.build());
}
}前端接入
// 建立 SSE 连接
const es = new EventSource('/sse/subscribe/client-001');
// 监听默认消息
es.onmessage = (event) => {
console.log('message:', event.data);
};
// 监听命名事件
es.addEventListener('order-shipped', (event) => {
const data = JSON.parse(event.data);
console.log('Order shipped:', data.orderId);
});
es.addEventListener('progress', (event) => {
const { progress } = JSON.parse(event.data);
updateProgressBar(progress);
});
// 错误处理(浏览器会自动重连)
es.onerror = (error) => {
if (es.readyState === EventSource.CLOSED) {
console.log('连接已关闭');
}
};
// 主动关闭
// es.close();携带认证 token(EventSource 不支持自定义请求头,需改用 URL 参数或 fetch polyfill):
// 方案 1:Token 写在 URL 参数(不推荐,会出现在日志中)
const es = new EventSource(`/sse/subscribe?token=${token}`);
// 方案 2:使用 fetch + ReadableStream polyfill(推荐)
import { fetchEventSource } from '@microsoft/fetch-event-source';
fetchEventSource('/sse/subscribe', {
headers: { Authorization: `Bearer ${token}` },
onmessage(event) { /* ... */ },
});集群环境处理
SseEmitter 存储在内存中,多实例部署时需将消息路由到持有该连接的节点:
方案 1:Redis Pub/Sub 广播
@Configuration
public class SseRedisConfig {
@Bean
public MessageListenerAdapter sseMessageListener(SseController controller) {
return new MessageListenerAdapter(controller, "onRedisMessage");
}
@Bean
public RedisMessageListenerContainer container(
RedisConnectionFactory factory,
MessageListenerAdapter listener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener(listener, new PatternTopic("sse:*"));
return container;
}
}
// 发布时广播到 Redis,由持有连接的节点推送
public void sendToClient(String clientId, String event, Object data) {
SseEmitter emitter = emitters.get(clientId);
if (emitter != null) {
// 本节点有连接,直接发送
doSend(emitter, event, data);
} else {
// 广播到 Redis,其他节点处理
redisTemplate.convertAndSend("sse:" + clientId,
JsonUtils.toJson(Map.of("event", event, "data", data)));
}
}详见 Redis集成。
方案 2:WebFlux + Reactive Streams
使用 Sinks.Many 实现跨节点事件总线,结合 响应式编程 中的背压机制管理流量。
超时与心跳
Nginx 等代理默认 60 秒超时会断开长连接,需定期发送心跳:
@Scheduled(fixedRate = 25_000) // 每 25 秒发送心跳
public void sendHeartbeat() {
emitters.forEach((id, emitter) -> {
try {
emitter.send(SseEmitter.event()
.comment("heartbeat") // 注释行,客户端不触发事件
.build());
} catch (IOException e) {
emitters.remove(id);
}
});
}同时配置 Nginx:
location /sse/ {
proxy_pass http://backend;
proxy_read_timeout 3600s;
proxy_buffering off; # 关键:禁用缓冲才能实时推送
proxy_cache off;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding on;
}注意事项
- HTTP/1.1 同域名最多 6 个并发 SSE 连接(浏览器限制),HTTP/2 无此限制
SseEmitter默认 30 秒超时,生产环境建议设为0L(永不超时)或业务合适值- 服务端重启会断开所有连接,客户端通过
Last-Event-ID+ 重连实现消息补偿 - 内存中持有大量
SseEmitter时注意 GC 压力,清理已完成/超时的 emitter - 与 全局异常处理 配合:SSE 请求中抛出的异常需在
@ExceptionHandler中以SseEmitter.completeWithError()处理,避免挂起连接