1 基础接入配置
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic/");
//config.setApplicationDestinationPrefixes("/app/");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();
registry.addEndpoint("/ws").setAllowedOrigins("*");
}
}
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic/");
//config.setApplicationDestinationPrefixes("/app/");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();
registry.addEndpoint("/ws").setAllowedOrigins("*");
}
}
@Configuration @EnableWebSocketMessageBroker public class WebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer { @Override public void configureMessageBroker(MessageBrokerRegistry config) { config.enableSimpleBroker("/topic/"); //config.setApplicationDestinationPrefixes("/app/"); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS(); registry.addEndpoint("/ws").setAllowedOrigins("*"); } }
2 Controller
@Controller
public class PoetrySofaController extends BaseController {
@MessageMapping("/xxx/{code}")
@SendTo("/topic/xxx/{code}")
public XXXOpResultVO sofaOp(
@DestinationVariable String code,
SimpMessageHeaderAccessor accessor, XXXOp message) throws Exception {
return new XXXOpResultVO();
}
}
@Controller
public class PoetrySofaController extends BaseController {
@MessageMapping("/xxx/{code}")
@SendTo("/topic/xxx/{code}")
public XXXOpResultVO sofaOp(
@DestinationVariable String code,
SimpMessageHeaderAccessor accessor, XXXOp message) throws Exception {
return new XXXOpResultVO();
}
}
@Controller public class PoetrySofaController extends BaseController { @MessageMapping("/xxx/{code}") @SendTo("/topic/xxx/{code}") public XXXOpResultVO sofaOp( @DestinationVariable String code, SimpMessageHeaderAccessor accessor, XXXOp message) throws Exception { return new XXXOpResultVO(); } }
3 鉴权(Cookie)
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {
@Bean
public HandshakeInterceptor handshakeInterceptor() {
return new HandshakeInterceptor() {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest servletServerRequest = (ServletServerHttpRequest) request;
HttpServletRequest servletRequest = servletServerRequest.getServletRequest();
// 这里解析cookie到userId变量
attributes.put(USER_ID, userId);
if (userId < 0) {
// 未登陆,直接websocket拒绝连接(不让升级)
return false;
}
}
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
}
};
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic/");
//config.setApplicationDestinationPrefixes("/app/");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS().setInterceptors(handshakeInterceptor());
registry.addEndpoint("/ws").setAllowedOrigins("*").addInterceptors(handshakeInterceptor());
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.setInterceptors(new ChannelInterceptorAdapter() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
// 读取并设置到accessor中,后续Controller中就可以直接用了,注意一旦Websocket建立连接,这个字端就不变了
Map<String, Object> sessionAttributes = accessor.getSessionAttributes();
int userId = (int) sessionAttributes.getOrDefault(USER_ID, 0);
accessor.setUser(() -> String.valueOf(userId));
}
return message;
}
});
}
}
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {
@Bean
public HandshakeInterceptor handshakeInterceptor() {
return new HandshakeInterceptor() {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest servletServerRequest = (ServletServerHttpRequest) request;
HttpServletRequest servletRequest = servletServerRequest.getServletRequest();
// 这里解析cookie到userId变量
attributes.put(USER_ID, userId);
if (userId < 0) {
// 未登陆,直接websocket拒绝连接(不让升级)
return false;
}
}
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
}
};
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic/");
//config.setApplicationDestinationPrefixes("/app/");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS().setInterceptors(handshakeInterceptor());
registry.addEndpoint("/ws").setAllowedOrigins("*").addInterceptors(handshakeInterceptor());
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.setInterceptors(new ChannelInterceptorAdapter() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
// 读取并设置到accessor中,后续Controller中就可以直接用了,注意一旦Websocket建立连接,这个字端就不变了
Map<String, Object> sessionAttributes = accessor.getSessionAttributes();
int userId = (int) sessionAttributes.getOrDefault(USER_ID, 0);
accessor.setUser(() -> String.valueOf(userId));
}
return message;
}
});
}
}
@Configuration @EnableWebSocketMessageBroker public class WebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer { @Bean public HandshakeInterceptor handshakeInterceptor() { return new HandshakeInterceptor() { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { if (request instanceof ServletServerHttpRequest) { ServletServerHttpRequest servletServerRequest = (ServletServerHttpRequest) request; HttpServletRequest servletRequest = servletServerRequest.getServletRequest(); // 这里解析cookie到userId变量 attributes.put(USER_ID, userId); if (userId < 0) { // 未登陆,直接websocket拒绝连接(不让升级) return false; } } return true; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { } }; } @Override public void configureMessageBroker(MessageBrokerRegistry config) { config.enableSimpleBroker("/topic/"); //config.setApplicationDestinationPrefixes("/app/"); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS().setInterceptors(handshakeInterceptor()); registry.addEndpoint("/ws").setAllowedOrigins("*").addInterceptors(handshakeInterceptor()); } @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.setInterceptors(new ChannelInterceptorAdapter() { @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); if (StompCommand.CONNECT.equals(accessor.getCommand())) { // 读取并设置到accessor中,后续Controller中就可以直接用了,注意一旦Websocket建立连接,这个字端就不变了 Map<String, Object> sessionAttributes = accessor.getSessionAttributes(); int userId = (int) sessionAttributes.getOrDefault(USER_ID, 0); accessor.setUser(() -> String.valueOf(userId)); } return message; } }); } }
4 检测客户端断线事件
@EventListener
public void on(SessionDisconnectEvent event) {
int userId = SimpMessagUtils.getUserId(event.getUser());
String sessionId = event.getSessionId();
// do something
}
@EventListener
public void on(SessionDisconnectEvent event) {
int userId = SimpMessagUtils.getUserId(event.getUser());
String sessionId = event.getSessionId();
// do something
}
@EventListener public void on(SessionDisconnectEvent event) { int userId = SimpMessagUtils.getUserId(event.getUser()); String sessionId = event.getSessionId(); // do something }
5 优化broker性能
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic/");
config.setCacheLimit(100000);
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic/");
config.setCacheLimit(100000);
}
@Override public void configureMessageBroker(MessageBrokerRegistry config) { config.enableSimpleBroker("/topic/"); config.setCacheLimit(100000); }