WebSocket
WebSocket 是基于 TCP 的全双工通信协议,一次握手后保持长连接,服务端和客户端均可主动推送消息。Spring Boot 提供两种使用方式:原生 WebSocket(直接操作连接)和 STOMP over WebSocket(消息路由协议,详见 STOMP协议)。
单向推送场景(服务端 → 客户端)可优先考虑更轻量的 SSE服务端推送。
WebSocket vs SSE
| 特性 | WebSocket | SSE |
|---|---|---|
| 通信方向 | 双向 | 单向(服务端 → 客户端) |
| 协议 | ws / wss | HTTP/HTTPS |
| 消息格式 | 文本 / 二进制 | 纯文本 |
| 断线重连 | 需手动实现 | 浏览器自动 |
| 适用场景 | 聊天、游戏、协同编辑 | 通知、进度、实时数据 |
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>原生 WebSocket
实现 WebSocketHandler
@Component
public class ChatHandler extends TextWebSocketHandler {
// 线程安全的连接管理
private final Set<WebSocketSession> sessions =
Collections.newSetFromMap(new ConcurrentHashMap<>());
@Override
public void afterConnectionEstablished(WebSocketSession session) {
sessions.add(session);
log.info("新连接: {}, 当前在线: {}", session.getId(), sessions.size());
}
@Override
protected void handleTextMessage(WebSocketSession session,
TextMessage message) throws Exception {
String payload = message.getPayload();
log.info("收到消息: {} from {}", payload, session.getId());
// 广播给所有在线客户端
broadcast(payload, session);
}
@Override
public void handleTransportError(WebSocketSession session,
Throwable ex) throws Exception {
log.error("连接异常: {}", session.getId(), ex);
sessions.remove(session);
if (session.isOpen()) session.close();
}
@Override
public void afterConnectionClosed(WebSocketSession session,
CloseStatus status) {
sessions.remove(session);
log.info("连接关闭: {}, 状态: {}", session.getId(), status);
}
private void broadcast(String message, WebSocketSession sender) {
sessions.forEach(s -> {
if (s.isOpen()) {
try {
s.sendMessage(new TextMessage(message));
} catch (IOException e) {
sessions.remove(s);
}
}
});
}
// 主动推送(业务层调用)
public void sendToSession(String sessionId, String message) {
sessions.stream()
.filter(s -> s.getId().equals(sessionId) && s.isOpen())
.findFirst()
.ifPresent(s -> {
try {
s.sendMessage(new TextMessage(message));
} catch (IOException e) {
log.error("推送失败: {}", sessionId, e);
}
});
}
}二进制消息处理
@Component
public class BinaryHandler extends BinaryWebSocketHandler {
@Override
protected void handleBinaryMessage(WebSocketSession session,
BinaryMessage message) throws Exception {
ByteBuffer payload = message.getPayload();
byte[] bytes = new byte[payload.remaining()];
payload.get(bytes);
// 处理二进制数据(图片、音视频帧等)
session.sendMessage(new BinaryMessage(processBytes(bytes)));
}
}注册与配置
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
private final ChatHandler chatHandler;
private final AuthHandshakeInterceptor authInterceptor;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(chatHandler, "/ws/chat")
.addInterceptors(authInterceptor)
.setAllowedOriginPatterns("*"); // 生产环境配置具体域名
// SockJS 降级(兼容不支持 WebSocket 的旧浏览器)
registry.addHandler(chatHandler, "/ws/chat-sockjs")
.addInterceptors(authInterceptor)
.withSockJS()
.setStreamBytesLimit(512 * 1024)
.setHttpMessageCacheSize(1000)
.setDisconnectDelay(30_000);
}
}跨域配置也可统一在 跨域处理 中处理,但 WebSocket 握手的 Origin 检查需在此单独配置。
HandshakeInterceptor(握手拦截器)
在 WebSocket 握手阶段获取 HTTP 请求信息(如 Session、Token),传递给 WebSocketSession 的 attributes:
@Component
public class AuthHandshakeInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request,
ServerHttpResponse response,
WebSocketHandler wsHandler,
Map<String, Object> attributes) {
// 从 URL 参数获取 token
if (request instanceof ServletServerHttpRequest servletRequest) {
String token = servletRequest.getServletRequest()
.getParameter("token");
if (token == null || !tokenService.validate(token)) {
response.setStatusCode(HttpStatus.UNAUTHORIZED);
return false; // 拒绝握手
}
String userId = tokenService.getUserId(token);
attributes.put("userId", userId);
}
return true; // 允许握手
}
@Override
public void afterHandshake(ServerHttpRequest request,
ServerHttpResponse response,
WebSocketHandler wsHandler,
Exception exception) {}
}
// Handler 中通过 attributes 获取
@Override
public void afterConnectionEstablished(WebSocketSession session) {
String userId = (String) session.getAttributes().get("userId");
userSessionMap.put(userId, session);
}结构化消息(JSON)
实际项目通常将消息体序列化为 JSON:
// 消息类型定义
public record WsMessage(String type, String from, Object data) {}
@Override
protected void handleTextMessage(WebSocketSession session,
TextMessage message) throws Exception {
WsMessage msg = objectMapper.readValue(message.getPayload(), WsMessage.class);
switch (msg.type()) {
case "chat" -> handleChat(session, msg);
case "join" -> handleJoin(session, msg);
case "leave" -> handleLeave(session, msg);
default -> log.warn("未知消息类型: {}", msg.type());
}
}
private void send(WebSocketSession session, WsMessage msg) throws IOException {
session.sendMessage(new TextMessage(objectMapper.writeValueAsString(msg)));
}房间 / 频道管理
@Component
public class RoomManager {
// roomId -> 该房间的 sessions
private final Map<String, Set<WebSocketSession>> rooms = new ConcurrentHashMap<>();
public void joinRoom(String roomId, WebSocketSession session) {
rooms.computeIfAbsent(roomId,
k -> Collections.newSetFromMap(new ConcurrentHashMap<>()))
.add(session);
}
public void leaveRoom(String roomId, WebSocketSession session) {
Set<WebSocketSession> room = rooms.get(roomId);
if (room != null) {
room.remove(session);
if (room.isEmpty()) rooms.remove(roomId);
}
}
public void sendToRoom(String roomId, String message) {
Set<WebSocketSession> room = rooms.getOrDefault(roomId, Set.of());
room.stream().filter(WebSocketSession::isOpen).forEach(s -> {
try {
s.sendMessage(new TextMessage(message));
} catch (IOException e) {
room.remove(s);
}
});
}
}集群方案
WebSocketSession 存储在本地 JVM,多实例部署时需将消息路由到持有对应连接的节点。
Redis Pub/Sub
@Component
@RequiredArgsConstructor
public class ClusteredChatHandler extends TextWebSocketHandler {
private final ChatHandler localHandler;
private final RedisTemplate<String, String> redisTemplate;
@Override
protected void handleTextMessage(WebSocketSession session,
TextMessage message) throws Exception {
// 发布到 Redis,让所有节点广播
redisTemplate.convertAndSend("ws:broadcast", message.getPayload());
}
// Redis 订阅回调(所有节点都会收到)
public void onRedisMessage(String message) {
localHandler.broadcast(message, null);
}
}详见 Redis集成。
STOMP + 外部消息代理(RabbitMQ/ActiveMQ)是更完善的集群方案,参见 STOMP协议。
心跳与断线检测
@Scheduled(fixedRate = 30_000)
public void pingAll() {
sessions.forEach(session -> {
if (!session.isOpen()) {
sessions.remove(session);
return;
}
try {
session.sendMessage(new PingMessage());
} catch (IOException e) {
sessions.remove(session);
}
});
}SockJS 有内置心跳机制(默认 25 秒);原生 WebSocket 需手动实现。
前端接入
const ws = new WebSocket('ws://localhost:8080/ws/chat?token=xxx');
ws.onopen = () => {
console.log('连接成功');
ws.send(JSON.stringify({ type: 'join', from: 'user1', data: null }));
};
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
console.log('收到:', msg);
};
ws.onerror = (error) => console.error('错误:', error);
ws.onclose = (event) => {
console.log('连接关闭:', event.code, event.reason);
// 断线重连
setTimeout(() => reconnect(), 3000);
};
// 发送消息
ws.send(JSON.stringify({ type: 'chat', from: 'user1', data: 'Hello!' }));STOMP 消息协议
原生 WebSocket 需自行管理消息路由;若需发布/订阅、点对点通道、消息代理支持,使用 STOMP协议:
原生 WebSocket:连接 → 自定义协议 → 手动路由
STOMP:连接 → 帧格式 → /topic 订阅 / /queue 点对点 → 消息代理路由
注意事项
WebSocketSession非线程安全,并发发送时需加锁或使用队列- 生产环境通过 Nginx 反向代理时,需在 location 中添加
Upgrade和Connectionheader setAllowedOriginPatterns("*")仅用于开发,生产应配置具体源- 大量长连接会占用文件描述符,注意系统
ulimit -n配置 - 与 HandlerInterceptor 不同,WebSocket 握手前是 HTTP,握手后脱离 Spring MVC 生命周期,请求头处理应在
HandshakeInterceptor中完成