Spring Framework - WebSocket

API 快速开始

涉及 Spring WebSocket 的主要 API

WebSocketHandler

第一步首先实现 WebSocketHandler ,或者直接继承 TextWebSocketHandlerBinaryWebSocketHandler

1
2
3
4
5
6
7
8
@Component
public class MyTextWebSocketHandler extends TextWebSocketHandler {

@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
// do something
}
}

Configure a WebSocketHandler

将上面实现的 WebSocketHandler 映射到特定的 URL

1
2
3
4
5
6
7
8
9
10
11
12
@Configuration
@EnableWebSocket
public class WebSocketConfiguration implements WebSocketConfigurer {

@Autowired
private MyTextWebSocketHandler myTextWebSocketHandler;

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myTextWebSocketHandler, "/ws/test");
}
}

WebSocket Handshake

自定义初始 HTTP WebSocket 握手请求的最简单方法是通过 HandshakeInterceptor,它暴露了握手相关的 before 和 after 方法,例如有一个内置的拦截器用于将 HTTP session 属性传递给WebSocket session

1
2
3
4
5
6
7
8
9
10
11
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new MyHandler(), "/myHandler")
.addInterceptors(new HttpSessionHandshakeInterceptor());
}

}

或者我们实现一个基于用户鉴权的拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
public class UserVerifyHandshakeInterceptor implements HandshakeInterceptor {

@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
ServletServerHttpRequest httpRequest = (ServletServerHttpRequest) request;
List<String> headerUserId = httpRequest.getHeaders().get("USER-ID");
if (headerUserId == null || headerUserId.isEmpty()) {
return false;
}
attributes.put("USER-ID", headerUserId.get(0));
return true;
}

@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
// do nothing
}
}

拦截器返回 false,表示拒绝连接,在 HTTP Response 的 Status Code 是 200,但不会在 Header 返回 Sec-WebSocket-Accept 等进行 WebSocket 协议的相关属性

成功连接

拦截器拒绝

WebSocketHandler Decoration

WebSocketHandler 也提供了一系列装饰器,比如 ExceptionWebSocketHandlerDecorator ,功能是如果在处理 WebSocket Message 过程中遇到异常,则尝试关闭连接

1
session.close(CloseStatus.SERVER_ERROR);

Configuring the WebSocket Engine

每个底层 WebSocket 引擎都公开了控制运行时特性的配置属性,如消息缓冲区大小、空闲超时等

对于 Tomcat、WildFly、GlassFish,将 ServletServerContainerFactoryBean 添加到 WebSocket Java config 中

1
2
3
4
5
6
7
8
9
10
11
12
13
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxTextMessageBufferSize(8192);
container.setMaxBinaryMessageBufferSize(8192);
return container;
}

}

Configuring allowed origins

Spring Framework 4.1.5 开始,WebSocket 和 SockJS 的默认行为是只接受同源(same origin)请求,也可以允许所有或指定的来源列表

此检查主要是为浏览器客户端设计的,即

  1. 这里同源检查是一个服务端行为
  2. 为浏览器客户端设计,如果自定义客户端修改了 Header 中的 Origin ,该策略也无法感知

三种可能的行为是:

  • 只允许同源请求(默认)
  • 允许指定的源列表:每个提供的允许源必须以 http://https:// 开头
  • 允许所有源:要启用此模式,应该提供 * 作为允许的源配置,在此模式下所有传输都可用

SockJS Fallback

SockJS 是 WebSocket 的替代方案

STOMP

STOMP(Streaming Text Orientated Message Protocol)是流文本定向消息协议,设计的目的是为了面向消息中间件 MOM(Message Oriented Middleware),面向消息的中间件)的简单文本协议

所以相对 WebSocket 来说,STOMP 是一个子协议(High-Level

消息流程

STOMP 端点暴露,Spring 应用程序就成为连接客户端的 STOMP Broker

spring-messaging 提供了对 messaging 应用程序的基础支持,提供了如下抽象:

  • Message:消息的简单定义,包括 header 和负载
  • MessageHandler:消息处理器
  • MessageChannel:用于发送消息的接口,解耦生产者和消费者
  • SubscribableChannel:带有 MessageHandler 订阅者的 MessageChannel
  • ExecutorSubscribableChannel:使用 Executor 传递消息的 SubscriableChannel

启用简单内置消息代理时使用的组件

上图中有三个 Channel:

  • clientInboundChannel:用于传递从 WebSocket 客户端接收到的消息
  • clientOutboundChannel:用于向 WebSocket 客户端发送服务器消息
  • brokerChannel:用于从服务器端应用程序代码中向消息代理发送消息

下图显示了当配置了外部 Broker(如 RabbitMQ)来管理订阅和广播消息时使用的组件

当从 WebSocket 连接接收到消息时,它们将 STOMP frames 转换为 Spring Message 表示,并发送到 clientInboundChannel 进行进一步处理

  • 例如,目标标头以 /app 开头的 STOMP 消息可能会被路由到带注释的控制器中的 @MessageMapping 方法,而 /topic/queue 消息可能会直接路由到 Message Broker
  • 处理来自客户端的 STOMP 消息的带注释的 @Controller 可以通过 brokerChannel 向消息代理发送消息,代理通过 clientOutboundChannel 将消息广播给匹配的订阅者
  • 同一个 Controller 也可以对 HTTP 请求做出同样的响应,因此客户端可以执行 HTTP POST,然后 @PostMapping 方法可以向消息代理发送消息,以向订阅的客户端广播

Simple Broker 示例

如下 set up 一个服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio");
}

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
registry.enableSimpleBroker("/topic");
}
}

@Controller
public class GreetingController {

@MessageMapping("/greeting")
public String handle(String greeting) {
return "[" + getTimestamp() + ": " + greeting;
}
}

支持以下流程:

  1. 客户端连接 localhost:8080/portfolio,一旦建立了 WebSocket 连接,STOMP frame 就开始进行流程
  2. 客户端发送 SUBSCRIBE frame,其中 header 中的 destination/topic/greeting,一旦被接收和解码,消息就会被发送到 clientInboundChannel,然后被路由到存储客户端订阅的消息代理
  3. 客户端向 /app/greeting 发送一个 SEND frame,/app 前缀将其路由到带注释的 Controller,去掉 /app 前缀后,destination 剩余 /agreeting 部分被映射到 GreetingController 中的 @MessageMapping 方法
  4. GreetingController 返回的值将转换为 Spring Message,其有效载荷基于返回值和默认的 destination header /topic/heading(从输入 destination 派生,将 /app 替换为 /topic),生成的消息被发送到 brokerChannel,并由消息 Broker 处理
  5. 消息代理会找到所有匹配的订阅者,并通过 clientOutboundChannel 发送一帧 MESSAGE 给每个订阅者,消息会被编码为 STOMP frame 并通过 WebSocket 连接发送

Controller 注解

可以使用带注释的 @Controller 类来处理来自客户端的消息,这些类可以声明@MessageMapping@SubscribeMapping@MessageExceptionHandler 方法

  • MessageMapping:映射消息处理
  • SubscribeMapping:映射订阅
  • MessageExceptionHandler:异常处理;全局可以使用 @ControllerAdvice,和 MVC 其他支持

MessageMapping

@MessageMapping 用于根据消息地址路由消息,配置在类上时表示类下所有方法都共享的 mapping

默认情况下,映射值是 Ant 风格的路径模式,路径参数可以通过 @DestinationVariable 方法参数引用,还可以切换到 dot-separated 模式,见下文

支持的方法参数:

  • Message:完整消息
  • @DestinationVariableMessageHeaders:消息的 headers
  • MessageHeaderAccessor SimpMessageHeaderAccessor StompHeaderAccessor:通过类型访问器获取 headers
  • @Payload:访问消息的 payload
  • @Header:访问特定的标头值;有必要可以使用 org.springframework.core.convert.converter.Converter
  • @Headers:用于访问消息中的所有标头
  • @DestinationVariable:路径参数
  • java.security.Principal:WebSocket HTTP 握手时登录的用户

返回值默认拼接 /topic 前缀,可以使用 @SendTo@SendToUser 覆盖默认路径和类级别上的对应配置

消息返回值可以异步处理,ListenableFutureCompletableFutureCompletionStage

对于更高级的场景,@MessageMapping 方法可以直接使用 SimpMessagingTemplate ,可以代替返回值,也可以作为返回值的补充

SubscribeMapping

用来订阅消息时立即返回数据,常用于初始化消息或一次性消息;返回值会被放入 clientOutboundChannel 直接返回给当前客户端(一般用于确认订阅)而不是发送给 Broker

但也可以使用 @SendTo@SendToUser 覆盖默认操作,使其发送给代理

  • 什么时候应该用?
    • 消息代理和控制器映射到不同的前缀(例如 /app/topic/queue),为了重复广播而设计的
    • 订阅某个 /app 地址,控制器可以在不涉及消息代理的情况下直接返回值,用于初始化消息
  • 什么时候不该用?
    • 消息代理和控制器映射相同的前缀,他们会共同处理消息导致出现问题

发送消息

使用 SimpMessagingTemplate 可以实现任何应用程序组件向 brokerChannel 发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Controller
public class GreetingController {

private SimpMessagingTemplate template;

@Autowired
public GreetingController(SimpMessagingTemplate template) {
this.template = template;
}

@RequestMapping(path="/greetings", method=POST)
public void greet(String greeting) {
String text = "[" + getTimestamp() + "]:" + greeting;
this.template.convertAndSend("/topic/greetings", text);
}

}

如果存在另一个相同类型的 bean 可以通过其名称(brokerMessagingTemplate)对其进行限定

Broker

Spring WebSocket STOMP 可以使用简单 Broker 和外部 Broker

  • 简单 Broker 基于服务的内存
  • 外部 Broker 即功能齐全的 Broker(例如 RabbitMQ),可以实现 ACK、集群等机制

这里使用 Simple Broker 演示

配置

  • endpoint:握手端点
  • applicationDestinationPrefixes:应用路径前缀
  • enableSimpleBroker:使用 SimpleBroker 并注册代理前缀
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio");
}

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.setApplicationDestinationPrefixes("/app");
config.enableSimpleBroker("/topic", "/queue");
}
}

心跳

STOMP 也需要心跳支持 STOMP heartbeats

可以手动配置调度器,但是注意如果以使用 WebSocketMessageBrokerConfigurer 中声明,需要 @Lazy 来避免 WebSocket 配置和 WebSocketMessageBrokerConfigurer 循环依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

private TaskScheduler messageBrokerTaskScheduler;

@Autowired
public void setMessageBrokerTaskScheduler(@Lazy TaskScheduler taskScheduler) {
this.messageBrokerTaskScheduler = taskScheduler;
}

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/queue/", "/topic/")
.setHeartbeatValue(new long[] {10000, 20000})
.setTaskScheduler(this.messageBrokerTaskScheduler);

// ...
}
}

消息有序

Broker 会将消息发送至 clientOutboundChannel,Channel 基于线程池实现,所以处理消息时是并发的;同理消息接收会被并发发送至 clientInboundChannel

可以设置消息有序,代码如下

1
2
3
4
5
6
7
8
9
10
11
@Configuration
@EnableWebSocketMessageBroker
public class MyConfig implements WebSocketMessageBrokerConfigurer {

@Override
protected void configureMessageBroker(MessageBrokerRegistry registry) {
// ...
registry.setPreservePublishOrder(true);
registry.setPreserveReceiveOrder(true);
}
}

拦截器 & 鉴权

可以注册 ChannelInterceptor 来拦截处理流程中的每一条消息

1
2
3
4
5
6
7
8
9
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new MyChannelInterceptor());
}
}

自定义的 ChannelInterceptor 可以使用 StompHeaderAccessorSimpMessageHeaderAccessor 来访问有关消息的信息

Invoked inside the {@link Runnable} submitted to the Executor just before calling the target MessageHandler to handle the message.

还可以实现 ExecutorChannelInterceptor,这是 ChannelInterceptor 的一个子接口,每当一条消息被发送到通道时,ChannelInterceptor 都会被触发一次

STOMP Headers 用于鉴权

在 STOMP 消息协议级别使用 Headers 进行认证,需要两个步骤:

  1. 使用 STOMP 客户端在连接时传递认证 Headers
  2. 使用 ChannelInterceptor 处理

注意拦截器只需要认证并在 CONNECT 消息上设置用户头部。Spring 会记录并保存认证用户,并将其与同一会话上的后续 STOMP 消息关联

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
@EnableWebSocketMessageBroker
public class MyConfig implements WebSocketMessageBrokerConfigurer {

@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
Authentication user = ... ; // 从 headers 获取
accessor.setUser(user);
}
return message;
}
});
}
}

此外需要注意当对消息使用 Spring Security 鉴权时,需要确保身份验证 ChannelInterceptor 顺序早于 Spring Security;最好在 WebSocketMessageBrokerConfigurer 的自身实现中声明自定义拦截器,该拦截器标记为 @Order(Ordered.HIGHEST_PRECEDENCE+99)

补充

委派模式

委派模式(Delegate Pattern)不属于常说的 Gang of Four 23 设计模式之一,但其实开发过程中经常使用

个人理解就是使用组合将受委托的实现放在委托对象内部,业务和委托对象交互,来达成解耦的目的;口语化表达就是:虽然调用的是我,但其实我再调用内部实例的方法来处理,双亲委派的委派也即是指的委派模式

1
业务 → 委托类 → 受委托类 1 ~ n

经常在策略模式中创建的类似 ManagerHolder 等角色其实就是委派模式的实现

委派角色提供以下能力:

  • 暴露统一入口方法
  • 管理受委托类

对于如今 Spring 的 IOC 机制来说,策略结合委派模式作用不大了,因为可以直接按类型注入集合

但是 SpringBoot 的 stater 往往在很多场景基于 IOC 和委派模式来实现灵活自动装配


例如 spring-websocket 中的 @EnableWebSocketMessageBrokerWebSocketMessageBrokerConfigurer 配合来实现对 WebSocketMessageBroker 的配置

  1. @EnableWebSocketMessageBroker import DelegatingWebSocketMessageBrokerConfiguration
  2. 从名字就可以看出 DelegatingWebSocketMessageBrokerConfiguration 是一个委派配置类
  3. registerStompEndpoints 方法为例,将其实现委派给 IOC 注入的 configurers 来执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Configuration(proxyBeanMethods = false)
public class DelegatingWebSocketMessageBrokerConfiguration extends WebSocketMessageBrokerConfigurationSupport {

// 受委托类
private final List<WebSocketMessageBrokerConfigurer> configurers = new ArrayList<>();

@Override
protected void registerStompEndpoints(StompEndpointRegistry registry) {
// 进行委派
for (WebSocketMessageBrokerConfigurer configurer : this.configurers) {
configurer.registerStompEndpoints(registry);
}
}

...
}

这样就可以通过注解启动 Broker 这一套流程,并且业务可以自定义配置实现类

参考

WebSockets :: Spring Framework