STOMP 协议

STOMP(Simple Text Oriented Messaging Protocol)是基于帧的消息协议,运行在 WebSocket 之上,提供发布/订阅点对点等消息语义。相比裸 WebSocket,STOMP 的优势在于:

  • 定义了统一的消息格式(destination、header、body)
  • 内置订阅管理,服务端无需手动维护连接映射
  • 客户端库(stomp.js)成熟,与 Spring 集成无缝

WebSocket 本身只提供全双工通信通道,STOMP 在此之上定义了消息路由规则。底层 WebSocket 配置详见 WebSocket


依赖

STOMP 支持包含在 Spring WebSocket 依赖中:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

服务端配置

消息代理配置

@Configuration
@EnableWebSocketMessageBroker
public class StompConfig implements WebSocketMessageBrokerConfigurer {
 
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 客户端订阅前缀:/topic(广播)、/queue(点对点)
        registry.enableSimpleBroker("/topic", "/queue");
 
        // 客户端发送消息的目标前缀,路由到 @MessageMapping 方法
        registry.setApplicationDestinationPrefixes("/app");
 
        // 点对点消息前缀(默认已是 /user,可省略)
        registry.setUserDestinationPrefix("/user");
    }
 
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws")          // WebSocket 握手端点
                .setAllowedOriginPatterns("*")
                .withSockJS();               // 降级支持(浏览器不支持 WS 时用 HTTP 长轮询)
    }
}

外部消息代理(RabbitMQ / ActiveMQ)

生产环境多实例部署时,简单内存代理无法跨节点广播,需接入外部代理:

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
    registry.enableStompBrokerRelay("/topic", "/queue")
            .setRelayHost("rabbitmq")
            .setRelayPort(61613)            // STOMP over TCP 端口
            .setClientLogin("guest")
            .setClientPasscode("guest");
 
    registry.setApplicationDestinationPrefixes("/app");
}

消息处理

广播消息(@MessageMapping + @SendTo)

客户端发送到 /app/chat,服务端处理后广播到 /topic/messages

@Controller
public class ChatController {
 
    // 接收:客户端发往 /app/chat
    // 发送:广播到 /topic/messages(所有订阅者)
    @MessageMapping("/chat")
    @SendTo("/topic/messages")
    public ChatMessage handleChat(ChatMessage message, Principal principal) {
        message.setSender(principal.getName());
        message.setTimestamp(Instant.now());
        return message;
    }
}
public record ChatMessage(String sender, String content, Instant timestamp) {}

点对点消息(@SendToUser)

只发送给指定用户,路由到 /user/{username}/queue/reply

@MessageMapping("/private")
@SendToUser("/queue/reply")
public ReplyMessage handlePrivate(PrivateMessage msg, Principal principal) {
    log.info("来自 {} 的私信: {}", principal.getName(), msg.content());
    return new ReplyMessage("已收到: " + msg.content());
}

主动推送(SimpMessagingTemplate)

不依赖请求触发,可在任意 Bean 中主动推送:

@Service
public class NotificationService {
 
    private final SimpMessagingTemplate messagingTemplate;
 
    // 广播给所有订阅 /topic/alert 的客户端
    public void broadcastAlert(String alert) {
        messagingTemplate.convertAndSend("/topic/alert", alert);
    }
 
    // 发给指定用户(/user/{username}/queue/notification)
    public void sendToUser(String username, Notification notification) {
        messagingTemplate.convertAndSendToUser(username,
                "/queue/notification", notification);
    }
 
    // 结合定时任务实时推送数据
    @Scheduled(fixedRate = 5000)
    public void pushMetrics() {
        SystemMetrics metrics = metricsCollector.collect();
        messagingTemplate.convertAndSend("/topic/metrics", metrics);
    }
}

鉴权与安全

连接握手时鉴权

@Configuration
public class StompSecurityConfig implements WebSocketMessageBrokerConfigurer {
 
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new ChannelInterceptor() {
            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                StompHeaderAccessor accessor =
                    MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
 
                if (StompCommand.CONNECT.equals(accessor.getCommand())) {
                    String token = accessor.getFirstNativeHeader("Authorization");
                    if (token == null || !token.startsWith("Bearer ")) {
                        throw new AccessDeniedException("未携带 Token");
                    }
                    // 验证 Token,设置认证主体
                    Authentication auth = jwtService.authenticate(
                        token.substring(7));
                    accessor.setUser(auth);
                }
                return message;
            }
        });
    }
}

Spring Security 集成

@Bean
public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
    http.authorizeHttpRequests(auth -> auth
            .requestMatchers("/ws/**").permitAll()   // WebSocket 握手端点放行
            .anyRequest().authenticated()
        )
        .csrf(csrf -> csrf
            .ignoringRequestMatchers("/ws/**")       // WebSocket 不需要 CSRF
        );
    return http.build();
}

JWT 集成详见 OAuth2与JWT,整体安全配置详见 安全


消息拦截与异常处理

@Controller
public class StompErrorHandler {
 
    // 处理 @MessageMapping 方法抛出的异常
    @MessageExceptionHandler
    @SendToUser("/queue/errors")
    public ErrorMessage handleException(Exception e) {
        log.error("消息处理异常", e);
        return new ErrorMessage(e.getMessage());
    }
}

客户端(JavaScript)

<!-- 引入 SockJS + STOMP.js -->
<script src="https://cdn.jsdelivr.net/npm/sockjs-client/dist/sockjs.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/@stomp/stompjs/bundles/stomp.umd.min.js"></script>
const client = new StompJs.Client({
    webSocketFactory: () => new SockJS('/ws'),
    connectHeaders: {
        Authorization: 'Bearer ' + getToken()
    },
    onConnect: () => {
        // 订阅广播频道
        client.subscribe('/topic/messages', (frame) => {
            const msg = JSON.parse(frame.body);
            renderMessage(msg);
        });
 
        // 订阅个人消息
        client.subscribe('/user/queue/reply', (frame) => {
            console.log('私信:', JSON.parse(frame.body));
        });
 
        // 发送消息
        client.publish({
            destination: '/app/chat',
            body: JSON.stringify({ content: '你好!' })
        });
    },
    onDisconnect: () => console.log('连接断开'),
    reconnectDelay: 3000    // 断线自动重连间隔(ms)
});
 
client.activate();

消息流转图

客户端 A                   Spring 服务端                  客户端 B
   │                            │                             │
   │── /app/chat ──────────────►│ @MessageMapping("/chat")    │
   │                            │── /topic/messages ─────────►│
   │                            │                             │
   │── /app/private ───────────►│ @MessageMapping("/private") │
   │      (to: userB)           │── /user/userB/queue/reply ──►│(userB)
   │                            │                             │

STOMP vs 裸 WebSocket vs SSE

特性裸 WebSocketSTOMP over WSSSE
方向全双工全双工单向(服务→客户端)
消息路由手动维护内置发布/订阅
客户端库原生 APIstomp.jsEventSource
适用场景自定义协议聊天、通知、协作数据推送、进度

单向推送场景详见 SSE服务端推送


相关链接