1 基础接入配置
@Configuration @EnableWebSocketMessageBroker public class WebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer { @Override public void configureMessageBroker(MessageBrokerRegistry config) { config.enableSimpleBroker("/topic/"); //config.setApplicationDestinationPrefixes("/app/"); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS(); registry.addEndpoint("/ws").setAllowedOrigins("*"); } }
2 Controller
@Controller public class PoetrySofaController extends BaseController { @MessageMapping("/xxx/{code}") @SendTo("/topic/xxx/{code}") public XXXOpResultVO sofaOp( @DestinationVariable String code, SimpMessageHeaderAccessor accessor, XXXOp message) throws Exception { return new XXXOpResultVO(); } }
3 鉴权(Cookie)
@Configuration @EnableWebSocketMessageBroker public class WebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer { @Bean public HandshakeInterceptor handshakeInterceptor() { return new HandshakeInterceptor() { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { if (request instanceof ServletServerHttpRequest) { ServletServerHttpRequest servletServerRequest = (ServletServerHttpRequest) request; HttpServletRequest servletRequest = servletServerRequest.getServletRequest(); // 这里解析cookie到userId变量 attributes.put(USER_ID, userId); if (userId < 0) { // 未登陆,直接websocket拒绝连接(不让升级) return false; } } return true; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { } }; } @Override public void configureMessageBroker(MessageBrokerRegistry config) { config.enableSimpleBroker("/topic/"); //config.setApplicationDestinationPrefixes("/app/"); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS().setInterceptors(handshakeInterceptor()); registry.addEndpoint("/ws").setAllowedOrigins("*").addInterceptors(handshakeInterceptor()); } @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.setInterceptors(new ChannelInterceptorAdapter() { @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); if (StompCommand.CONNECT.equals(accessor.getCommand())) { // 读取并设置到accessor中,后续Controller中就可以直接用了,注意一旦Websocket建立连接,这个字端就不变了 Map<String, Object> sessionAttributes = accessor.getSessionAttributes(); int userId = (int) sessionAttributes.getOrDefault(USER_ID, 0); accessor.setUser(() -> String.valueOf(userId)); } return message; } }); } }
4 检测客户端断线事件
@EventListener public void on(SessionDisconnectEvent event) { int userId = SimpMessagUtils.getUserId(event.getUser()); String sessionId = event.getSessionId(); // do something }
5 优化broker性能
@Override public void configureMessageBroker(MessageBrokerRegistry config) { config.enableSimpleBroker("/topic/"); config.setCacheLimit(100000); }