WebSocket STOMP

WebSocket是一个基于单tcp连接支持全双工双向通信的协议. 通俗来讲,就是用webscoket,服务端就可以主动给浏览器推消息了.
它支持文本字节两种类型的消息, 但是消息内容没有规定.
但 WebSocket支持client和server 协商一个子协议来发送消息, 子协议可以更好的约定消息内容的格式.

STOMP (Simple Text Oriented Messaging Protocol) , 就是我们要用到的子协议. 它旨在解决常用消息传递模式的最小子集, 可以在任何可靠的双工通信网络协议上使用, 如TCP和WebSocket.
虽然 STOMP是面向文本的协议, 但是也可以支持字节.

STOMP是一种基于帧的协议. 每帧的结构如下:

1
2
3
4
5
COMMAND
header1:value1
header2:value2

Body^@

stomp 常用的COMMOND 有:

  1. 连接: CONNECT,CONNECTED
  2. 客户端: SEND,SUBSCRIBE,UNSUBSCRIBE, 事务相关的: BEGIN,COMMIT,ABORT
  3. 服务端: MESSAGE,RECEIPT,ERROR

发消息 和 接收消息是两个独立的事件. 并不是说, client 给 server 发了消息, server 就一定要响应消息回来.
这和HTTP的 request/response 交互模式有明显的区别

springboot 中的 websocket stomp 服务端

spring对websocket做了很好的支持5,
先加依赖包

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

配置类,启用stomp和websocket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Configuration
@EnableWebSocketMessageBroker // 启用基于 WebSocket 的 STOMP
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio"); //websocket endpoint. 建立websocket 连接的http地址
}

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
//消息destination以 /app 开关的,将分发给 @Controller类 对应的 @MessageMapping 方法处理
config.setApplicationDestinationPrefixes("/app");
//使用内置消息代理, 处理消息destination 前缀为 /topic /queue 的订阅广播分发操作.
config.enableSimpleBroker("/topic", "/queue");
}
}

接收消息(@Controller 和 @MessageMapping 注解)和服务端主动推送消息的逻辑, 这两个是可以分开的, demo为了简单, 写在了一起

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Controller
public class WSController {

// 服务端可使用SimpMessagingTemplate 主动推送消息
@Resource(name = "brokerMessagingTemplate")
private SimpMessagingTemplate template;

private Timer timer = new Timer("服务端定时主动推送消息");

@PostConstruct
public void init(){
timer.schedule(new TimerTask() {
@Override
public void run() {
String text = "server time: [" + LocalDateTime.now().toString() + "]";
template.convertAndSend("/topic/greeting", text);
}
}, 1000, 5000);
}

/**
* 接收client 发送的消息, client访问此方法的uri是: ${ApplicationDestinationPrefix}/greeting
* 这里return的消息, 并不返回给发起消息的client的.
* 而是返回给消息代理, 消息代理广播给所有订阅了 ${destinationPrefix}/greeting Destination 的订阅者
*/
@MessageMapping("/greeting")
public String handle(String greeting) {
if("bye".equalsIgnoreCase(greeting)) return "bye";
return "[" + System.currentTimeMillis() + "]: " + greeting;
}
}

使用stompjs实现的客户端

stompjs2 支持 stomp v1.0, v1.1, v1.2. 而且是现在维护最活跃的 javascript 客户端. 所以我们决定采用它.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<html>
<head>
<title>stomp-js</title>
<script src="https://cdn.jsdelivr.net/npm/@stomp/stompjs@5.0.0/bundles/stomp.umd.min.js"></script>
</head>
<body>
<input id="sendMsgInput" placeholder="输入要发送的消息"/>
<button id="sendMsgBtn" onclick="sendMsg();">send</button>
<button id="closeBtn" onclick="close();">closed</button>
<div id="receiveMsgDiv"></div>
<script type="text/javascript">
const client = new StompJs.Client();
client.brokerURL = 'ws://localhost:8080/portfolio'; // ws endpoint 路径
client.onConnect = function (frame) {
// 订阅,接收消息 必须写在 onConnect 里. 因为订阅必须在连接建立后才能进行
const subscription = client.subscribe('/topic/greeting', function (message){
var div = document.getElementById("receiveMsgDiv");
var res = document.createElement("p");
var content = message.body;
res.innerText = content;
div.appendChild(res);

if(content === "bye"){
client.deactivate(); //关闭ws client
}
});
};

client.onStompError = function (frame) {
console.log('Broker reported error: ' + frame.headers['message']);
console.log('Additional details: ' + frame.body);
};

client.activate();

//发送消息
function sendMsg() {
var msg = document.getElementById("sendMsgInput").value;
// 发送消息
client.publish({
destination: '/app/greeting',
body: msg,
skipContentLengthHeader: true,
});
}
//关闭连接
function close(){
client.deactivate()
}
</script>
</body>
</html>

如果服务端重启,客户端会自动重接.

websocket 身份认证

WebSocket是通过HTTP建立连接的, 所以可以直接使用HTTP的身份认证信息.
有问题的地方在于, 如果ws地址需要登录才能访问, 在未登录时试图建立 ws连接, 这时候不能像正常的 HTTP请求一样被重定向到登录页面.

stomp 处理spring server实现的 session 过期问题

禁用广播

同一用户多客户端订阅同一地址时, 默认情况下会给所有客户端发送消息, 如果只想给一个客户端发送, 则需要指定sessionId, 此时分为两种情况:

  1. 接收消息处理器在返回消息时, 使用 @SendToUser(destinations="/queue/errors", broadcast=false) 即可, 其中broadcast=false 表示不广播, 只发一个客户端

  2. 服务端主动推送消息. 本身不支持. 需要自行编码处理, 难点在于如何根据用户名获取到websocket的sessionId. @SendToUser 本身是在Http session环境下 通过WebSocketRegistryListener 可找到HttpSession对应的WebsocketSession.
    目前有两种解决方案:

    1. 经过漫长的Debug, 找到了spring websocket 体系一个自带的Bean: SimpUserRegistry , 这里面维护了当前连接的用户SimpUser, 然后通过SimpUser就可以获取到这个用户当前有效的session集合. SimpUserRegistry@EnableWebSocketMessageBroker注解引入, Bean实体在AbstractMessageBrokerConfiguration中配置.

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
        @Autowired
      private SimpUserRegistry simpUserRegistry;

      private MessageHeaders createHeaders(String sessionId) {
      SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
      headerAccessor.setSessionId(sessionId);
      headerAccessor.setLeaveMutable(true);
      return headerAccessor.getMessageHeaders();
      }

      public void pushMsgToUser(String username){
      String destination = "/queue/greeting";
      String text = "server time: [" + LocalDateTime.now().toString() + "]";
      SimpUser aaa = simpUserRegistry.getUser(username);
      if(aaa != null){ // 实现broadcast = false. 对于同一个用户的多个session, 只其中一个发送
      Optional<SimpSession> session = aaa.getSessions().stream()
      .filter(s -> s.getSubscriptions().stream().anyMatch(sub -> sub.getDestination().equals(destination)))
      .findAny(); // 找到任意一个满足条件的session
      if(session.isEmpty()) return;
      // 一定要使用MessageHeaders, 并setLeaveMutable(true). 不然我们设置的header 会被整合到 nativeHeader里, 失去作用
      MessageHeaders headers = createHeaders(session.get().getId());
      template.convertAndSendToUser(userName, destination, "itemId:" + System.currentTimeMillis(), headers); //用convertAndSendToUser 方法向指定用户发送消息, session通过header控制
      }

      }
    2. 实现SmartApplicationListener接口, 监听websocket session 相关ApplicationEvent 自己手工维护用户和session的关系.
      WebSocketConnectHandlerDecoratorFactory 在 ws session 连接时, 会触发SessionConnectedEvent

nginx 代理 websocket

nginx 1.3.134 版本开始支持websocket. 利用隧道在client和后端server建立的连接. client请求时通过”Upgrade” header 来切换到websocket 协议.

Since version 1.3.13, nginx implements special mode of operation that allows setting up a tunnel between a client and proxied server if the proxied server returned a response with the code 101 (Switching Protocols), and the client asked for a protocol switch via the “Upgrade” header in a request.

“Upgrade” 和 “Connection” 这两个逐跳header , nginx默认不会传到后端服务器. 为了让后端服务器切换协议,需要显示的带上这俩header.

1
2
3
4
5
6
location /chat/ {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}

这样就可以了, 有点缺点是 “Connection”是写死的. 更完整的配置方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
http {
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}

server {
...

location /chat/ {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
}
}
}

参考资料