SSE 服务端推送

返回 Spring Boot 基础

SSE(Server-Sent Events)是基于 HTTP 的服务端到客户端单向实时推送协议。相比 WebSocket,SSE 更轻量、复用 HTTP/2 多路复用、断线自动重连,适合通知、进度、实时数据流等单向推送场景。


SSE vs WebSocket

特性SSEWebSocket
方向单向(服务端 → 客户端)双向
协议HTTP/HTTPSws/wss
断线重连浏览器自动需手动实现
消息格式纯文本(UTF-8)文本 / 二进制
代理穿透更好(标准 HTTP)需代理支持
适用场景通知、进度、日志流聊天、游戏、协同编辑

STOMP协议 构建在 WebSocket 之上,提供更丰富的消息路由能力。


Spring MVC 实现

SseEmitter

@RestController
@RequestMapping("/sse")
public class SseController {
 
    // 保存所有活跃连接
    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
 
    @GetMapping(value = "/subscribe/{clientId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter subscribe(@PathVariable String clientId) {
        // 超时时间设为 0 表示永不超时;也可设置具体毫秒数
        SseEmitter emitter = new SseEmitter(0L);
 
        emitters.put(clientId, emitter);
 
        // 连接关闭 / 超时时清理
        emitter.onCompletion(() -> emitters.remove(clientId));
        emitter.onTimeout(() -> {
            emitter.complete();
            emitters.remove(clientId);
        });
        emitter.onError(e -> emitters.remove(clientId));
 
        return emitter;
    }
 
    // 向指定客户端推送消息
    public void sendToClient(String clientId, String eventName, Object data) {
        SseEmitter emitter = emitters.get(clientId);
        if (emitter == null) return;
        try {
            emitter.send(SseEmitter.event()
                .name(eventName)
                .data(data, MediaType.APPLICATION_JSON)
                .id(UUID.randomUUID().toString())
                .reconnectTime(3000));
        } catch (IOException e) {
            emitters.remove(clientId);
            emitter.completeWithError(e);
        }
    }
 
    // 广播给所有客户端
    public void broadcast(String eventName, Object data) {
        emitters.forEach((id, emitter) -> {
            try {
                emitter.send(SseEmitter.event().name(eventName).data(data));
            } catch (IOException e) {
                emitters.remove(id);
            }
        });
    }
}

SSE 事件格式

id: 550e8400-e29b\n
event: order-update\n
data: {"orderId":123,"status":"SHIPPED"}\n
retry: 3000\n
\n
字段说明
id事件 ID,断线重连时通过 Last-Event-ID 请求头发送,用于消息重放
event事件类型名,客户端可按类型监听
data数据,多行则拼接(每行一个 data: 字段)
retry断线后重连间隔(毫秒)

实际场景示例

任务进度推送

@Service
@RequiredArgsConstructor
public class TaskProgressService {
 
    private final SseController sseController;
 
    @Async
    public void processTask(String taskId, String clientId) {
        try {
            for (int i = 0; i <= 100; i += 10) {
                Thread.sleep(500);
                sseController.sendToClient(clientId, "progress",
                    Map.of("taskId", taskId, "progress", i));
            }
            sseController.sendToClient(clientId, "complete",
                Map.of("taskId", taskId, "result", "SUCCESS"));
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

配合 @Async 在线程池中执行,避免阻塞。

结合 Spring 事件机制

Spring ApplicationEvent 与 SSE 结合,实现解耦推送:

// 1. 定义业务事件
public record OrderShippedEvent(Long orderId, String clientId) {}
 
// 2. 业务层发布事件
@Service
@RequiredArgsConstructor
public class OrderService {
    private final ApplicationEventPublisher publisher;
 
    public void shipOrder(Long orderId, String clientId) {
        // ... 业务逻辑
        publisher.publishEvent(new OrderShippedEvent(orderId, clientId));
    }
}
 
// 3. 监听事件,推送 SSE
@Component
@RequiredArgsConstructor
public class SseEventHandler {
    private final SseController sseController;
 
    @EventListener
    public void onOrderShipped(OrderShippedEvent event) {
        sseController.sendToClient(event.clientId(), "order-shipped",
            Map.of("orderId", event.orderId()));
    }
}

Spring WebFlux 实现(响应式)

WebFlux 天然支持 SSE,用 Flux<ServerSentEvent<T>> 表示无限数据流:

@RestController
@RequestMapping("/sse")
public class ReactiveSseController {
 
    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<Map<String, Object>>> stream() {
        return Flux.interval(Duration.ofSeconds(1))
            .map(seq -> ServerSentEvent.<Map<String, Object>>builder()
                .id(String.valueOf(seq))
                .event("tick")
                .data(Map.of("seq", seq, "time", Instant.now()))
                .build());
    }
 
    // 与业务 Flux 绑定
    @GetMapping(value = "/prices/{symbol}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<PriceUpdate>> priceStream(@PathVariable String symbol) {
        return priceService.getPriceFlux(symbol)
            .map(price -> ServerSentEvent.<PriceUpdate>builder()
                .event("price")
                .data(price)
                .build());
    }
}

前端接入

// 建立 SSE 连接
const es = new EventSource('/sse/subscribe/client-001');
 
// 监听默认消息
es.onmessage = (event) => {
    console.log('message:', event.data);
};
 
// 监听命名事件
es.addEventListener('order-shipped', (event) => {
    const data = JSON.parse(event.data);
    console.log('Order shipped:', data.orderId);
});
 
es.addEventListener('progress', (event) => {
    const { progress } = JSON.parse(event.data);
    updateProgressBar(progress);
});
 
// 错误处理(浏览器会自动重连)
es.onerror = (error) => {
    if (es.readyState === EventSource.CLOSED) {
        console.log('连接已关闭');
    }
};
 
// 主动关闭
// es.close();

携带认证 token(EventSource 不支持自定义请求头,需改用 URL 参数或 fetch polyfill):

// 方案 1:Token 写在 URL 参数(不推荐,会出现在日志中)
const es = new EventSource(`/sse/subscribe?token=${token}`);
 
// 方案 2:使用 fetch + ReadableStream polyfill(推荐)
import { fetchEventSource } from '@microsoft/fetch-event-source';
 
fetchEventSource('/sse/subscribe', {
    headers: { Authorization: `Bearer ${token}` },
    onmessage(event) { /* ... */ },
});

集群环境处理

SseEmitter 存储在内存中,多实例部署时需将消息路由到持有该连接的节点:

方案 1:Redis Pub/Sub 广播

@Configuration
public class SseRedisConfig {
 
    @Bean
    public MessageListenerAdapter sseMessageListener(SseController controller) {
        return new MessageListenerAdapter(controller, "onRedisMessage");
    }
 
    @Bean
    public RedisMessageListenerContainer container(
            RedisConnectionFactory factory,
            MessageListenerAdapter listener) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        container.addMessageListener(listener, new PatternTopic("sse:*"));
        return container;
    }
}
 
// 发布时广播到 Redis,由持有连接的节点推送
public void sendToClient(String clientId, String event, Object data) {
    SseEmitter emitter = emitters.get(clientId);
    if (emitter != null) {
        // 本节点有连接,直接发送
        doSend(emitter, event, data);
    } else {
        // 广播到 Redis,其他节点处理
        redisTemplate.convertAndSend("sse:" + clientId,
            JsonUtils.toJson(Map.of("event", event, "data", data)));
    }
}

详见 Redis集成

方案 2:WebFlux + Reactive Streams

使用 Sinks.Many 实现跨节点事件总线,结合 响应式编程 中的背压机制管理流量。


超时与心跳

Nginx 等代理默认 60 秒超时会断开长连接,需定期发送心跳:

@Scheduled(fixedRate = 25_000)   // 每 25 秒发送心跳
public void sendHeartbeat() {
    emitters.forEach((id, emitter) -> {
        try {
            emitter.send(SseEmitter.event()
                .comment("heartbeat")   // 注释行,客户端不触发事件
                .build());
        } catch (IOException e) {
            emitters.remove(id);
        }
    });
}

同时配置 Nginx:

location /sse/ {
    proxy_pass http://backend;
    proxy_read_timeout 3600s;
    proxy_buffering off;          # 关键:禁用缓冲才能实时推送
    proxy_cache off;
    proxy_set_header Connection '';
    proxy_http_version 1.1;
    chunked_transfer_encoding on;
}

注意事项

  • HTTP/1.1 同域名最多 6 个并发 SSE 连接(浏览器限制),HTTP/2 无此限制
  • SseEmitter 默认 30 秒超时,生产环境建议设为 0L(永不超时)或业务合适值
  • 服务端重启会断开所有连接,客户端通过 Last-Event-ID + 重连实现消息补偿
  • 内存中持有大量 SseEmitter 时注意 GC 压力,清理已完成/超时的 emitter
  • 全局异常处理 配合:SSE 请求中抛出的异常需在 @ExceptionHandler 中以 SseEmitter.completeWithError() 处理,避免挂起连接