WebSocket

返回 Spring Boot 基础

WebSocket 是基于 TCP 的全双工通信协议,一次握手后保持长连接,服务端和客户端均可主动推送消息。Spring Boot 提供两种使用方式:原生 WebSocket(直接操作连接)和 STOMP over WebSocket(消息路由协议,详见 STOMP协议)。

单向推送场景(服务端 → 客户端)可优先考虑更轻量的 SSE服务端推送


WebSocket vs SSE

特性WebSocketSSE
通信方向双向单向(服务端 → 客户端)
协议ws / wssHTTP/HTTPS
消息格式文本 / 二进制纯文本
断线重连需手动实现浏览器自动
适用场景聊天、游戏、协同编辑通知、进度、实时数据

依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

原生 WebSocket

实现 WebSocketHandler

@Component
public class ChatHandler extends TextWebSocketHandler {
 
    // 线程安全的连接管理
    private final Set<WebSocketSession> sessions =
        Collections.newSetFromMap(new ConcurrentHashMap<>());
 
    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        sessions.add(session);
        log.info("新连接: {}, 当前在线: {}", session.getId(), sessions.size());
    }
 
    @Override
    protected void handleTextMessage(WebSocketSession session,
                                     TextMessage message) throws Exception {
        String payload = message.getPayload();
        log.info("收到消息: {} from {}", payload, session.getId());
 
        // 广播给所有在线客户端
        broadcast(payload, session);
    }
 
    @Override
    public void handleTransportError(WebSocketSession session,
                                     Throwable ex) throws Exception {
        log.error("连接异常: {}", session.getId(), ex);
        sessions.remove(session);
        if (session.isOpen()) session.close();
    }
 
    @Override
    public void afterConnectionClosed(WebSocketSession session,
                                      CloseStatus status) {
        sessions.remove(session);
        log.info("连接关闭: {}, 状态: {}", session.getId(), status);
    }
 
    private void broadcast(String message, WebSocketSession sender) {
        sessions.forEach(s -> {
            if (s.isOpen()) {
                try {
                    s.sendMessage(new TextMessage(message));
                } catch (IOException e) {
                    sessions.remove(s);
                }
            }
        });
    }
 
    // 主动推送(业务层调用)
    public void sendToSession(String sessionId, String message) {
        sessions.stream()
            .filter(s -> s.getId().equals(sessionId) && s.isOpen())
            .findFirst()
            .ifPresent(s -> {
                try {
                    s.sendMessage(new TextMessage(message));
                } catch (IOException e) {
                    log.error("推送失败: {}", sessionId, e);
                }
            });
    }
}

二进制消息处理

@Component
public class BinaryHandler extends BinaryWebSocketHandler {
 
    @Override
    protected void handleBinaryMessage(WebSocketSession session,
                                       BinaryMessage message) throws Exception {
        ByteBuffer payload = message.getPayload();
        byte[] bytes = new byte[payload.remaining()];
        payload.get(bytes);
        // 处理二进制数据(图片、音视频帧等)
        session.sendMessage(new BinaryMessage(processBytes(bytes)));
    }
}

注册与配置

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
 
    private final ChatHandler chatHandler;
    private final AuthHandshakeInterceptor authInterceptor;
 
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(chatHandler, "/ws/chat")
            .addInterceptors(authInterceptor)
            .setAllowedOriginPatterns("*");   // 生产环境配置具体域名
 
        // SockJS 降级(兼容不支持 WebSocket 的旧浏览器)
        registry.addHandler(chatHandler, "/ws/chat-sockjs")
            .addInterceptors(authInterceptor)
            .withSockJS()
            .setStreamBytesLimit(512 * 1024)
            .setHttpMessageCacheSize(1000)
            .setDisconnectDelay(30_000);
    }
}

跨域配置也可统一在 跨域处理 中处理,但 WebSocket 握手的 Origin 检查需在此单独配置。


HandshakeInterceptor(握手拦截器)

在 WebSocket 握手阶段获取 HTTP 请求信息(如 Session、Token),传递给 WebSocketSession 的 attributes:

@Component
public class AuthHandshakeInterceptor implements HandshakeInterceptor {
 
    @Override
    public boolean beforeHandshake(ServerHttpRequest request,
                                   ServerHttpResponse response,
                                   WebSocketHandler wsHandler,
                                   Map<String, Object> attributes) {
        // 从 URL 参数获取 token
        if (request instanceof ServletServerHttpRequest servletRequest) {
            String token = servletRequest.getServletRequest()
                                         .getParameter("token");
            if (token == null || !tokenService.validate(token)) {
                response.setStatusCode(HttpStatus.UNAUTHORIZED);
                return false;   // 拒绝握手
            }
            String userId = tokenService.getUserId(token);
            attributes.put("userId", userId);
        }
        return true;   // 允许握手
    }
 
    @Override
    public void afterHandshake(ServerHttpRequest request,
                               ServerHttpResponse response,
                               WebSocketHandler wsHandler,
                               Exception exception) {}
}
 
// Handler 中通过 attributes 获取
@Override
public void afterConnectionEstablished(WebSocketSession session) {
    String userId = (String) session.getAttributes().get("userId");
    userSessionMap.put(userId, session);
}

结构化消息(JSON)

实际项目通常将消息体序列化为 JSON:

// 消息类型定义
public record WsMessage(String type, String from, Object data) {}
 
@Override
protected void handleTextMessage(WebSocketSession session,
                                 TextMessage message) throws Exception {
    WsMessage msg = objectMapper.readValue(message.getPayload(), WsMessage.class);
 
    switch (msg.type()) {
        case "chat"    -> handleChat(session, msg);
        case "join"    -> handleJoin(session, msg);
        case "leave"   -> handleLeave(session, msg);
        default        -> log.warn("未知消息类型: {}", msg.type());
    }
}
 
private void send(WebSocketSession session, WsMessage msg) throws IOException {
    session.sendMessage(new TextMessage(objectMapper.writeValueAsString(msg)));
}

房间 / 频道管理

@Component
public class RoomManager {
 
    // roomId -> 该房间的 sessions
    private final Map<String, Set<WebSocketSession>> rooms = new ConcurrentHashMap<>();
 
    public void joinRoom(String roomId, WebSocketSession session) {
        rooms.computeIfAbsent(roomId,
            k -> Collections.newSetFromMap(new ConcurrentHashMap<>()))
             .add(session);
    }
 
    public void leaveRoom(String roomId, WebSocketSession session) {
        Set<WebSocketSession> room = rooms.get(roomId);
        if (room != null) {
            room.remove(session);
            if (room.isEmpty()) rooms.remove(roomId);
        }
    }
 
    public void sendToRoom(String roomId, String message) {
        Set<WebSocketSession> room = rooms.getOrDefault(roomId, Set.of());
        room.stream().filter(WebSocketSession::isOpen).forEach(s -> {
            try {
                s.sendMessage(new TextMessage(message));
            } catch (IOException e) {
                room.remove(s);
            }
        });
    }
}

集群方案

WebSocketSession 存储在本地 JVM,多实例部署时需将消息路由到持有对应连接的节点。

Redis Pub/Sub

@Component
@RequiredArgsConstructor
public class ClusteredChatHandler extends TextWebSocketHandler {
 
    private final ChatHandler localHandler;
    private final RedisTemplate<String, String> redisTemplate;
 
    @Override
    protected void handleTextMessage(WebSocketSession session,
                                     TextMessage message) throws Exception {
        // 发布到 Redis,让所有节点广播
        redisTemplate.convertAndSend("ws:broadcast", message.getPayload());
    }
 
    // Redis 订阅回调(所有节点都会收到)
    public void onRedisMessage(String message) {
        localHandler.broadcast(message, null);
    }
}

详见 Redis集成

STOMP + 外部消息代理(RabbitMQ/ActiveMQ)是更完善的集群方案,参见 STOMP协议


心跳与断线检测

@Scheduled(fixedRate = 30_000)
public void pingAll() {
    sessions.forEach(session -> {
        if (!session.isOpen()) {
            sessions.remove(session);
            return;
        }
        try {
            session.sendMessage(new PingMessage());
        } catch (IOException e) {
            sessions.remove(session);
        }
    });
}

SockJS 有内置心跳机制(默认 25 秒);原生 WebSocket 需手动实现。


前端接入

const ws = new WebSocket('ws://localhost:8080/ws/chat?token=xxx');
 
ws.onopen = () => {
    console.log('连接成功');
    ws.send(JSON.stringify({ type: 'join', from: 'user1', data: null }));
};
 
ws.onmessage = (event) => {
    const msg = JSON.parse(event.data);
    console.log('收到:', msg);
};
 
ws.onerror = (error) => console.error('错误:', error);
 
ws.onclose = (event) => {
    console.log('连接关闭:', event.code, event.reason);
    // 断线重连
    setTimeout(() => reconnect(), 3000);
};
 
// 发送消息
ws.send(JSON.stringify({ type: 'chat', from: 'user1', data: 'Hello!' }));

STOMP 消息协议

原生 WebSocket 需自行管理消息路由;若需发布/订阅、点对点通道、消息代理支持,使用 STOMP协议

原生 WebSocket:连接 → 自定义协议 → 手动路由
STOMP:连接 → 帧格式 → /topic 订阅 / /queue 点对点 → 消息代理路由

注意事项

  • WebSocketSession 非线程安全,并发发送时需加锁或使用队列
  • 生产环境通过 Nginx 反向代理时,需在 location 中添加 UpgradeConnection header
  • setAllowedOriginPatterns("*") 仅用于开发,生产应配置具体源
  • 大量长连接会占用文件描述符,注意系统 ulimit -n 配置
  • HandlerInterceptor 不同,WebSocket 握手前是 HTTP,握手后脱离 Spring MVC 生命周期,请求头处理应在 HandshakeInterceptor 中完成

相关链接