基于 WebSocket 的 Web 端订阅消息推送

基于 DWR + ActiveMQ 的 Web 端订阅消息推送 里我实现了 Web 端订阅消息推送,然而,实际测试的时候发现速度不够,而且还不稳定,于是决定抛弃 DWRActiveMQ,使用 Socket.IO(封装了 WebSocket)做前端推送,自己写逻辑实现订阅逻辑。在前端使用 Socket.IO 提供的 JavaScript 客户端,在后端把 netty-socketio 整合进基于 Spring + SpringMVC 的 Java Web 后台,作为 Socket.IO 服务端。

接下来记录我的实现,比 DWR 配起来简单多了,感人😆。

Java 实现订阅逻辑

SocketioService 负责把消息推送到相应的前端。topicSubscribers 存储每个主题连接了哪些客户端,topicContent 存储每个主题的最新消息,最新消息是解析好的 JSON 字符串,publish 事件表示有新消息推送到前端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package project.socket.service;

import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

@Service
public class SocketioService implements InitializingBean, DisposableBean {

private static Logger logger = Logger.getLogger(SocketioService.class);

@Value("${serverHost}")
private String serverHost;
@Value("${socketio.port}")
private Integer socketioPort;
private SocketIOServer server;

// 每个主题的订阅用户
// topic --> set of clients
private Map<String, Set<SocketIOClient>> topicSubscribers = new HashMap<>();
// 每个主题的最新消息
// topic --> latest content
private Map<String, String> topicContent = new HashMap<>();

@Override
public void afterPropertiesSet() {
Configuration config = new Configuration();
config.setHostname(serverHost);
config.setPort(socketioPort);
server = new SocketIOServer(config);
// 与前端建立连接
server.addConnectListener(client -> {
logger.info("user " + client.getHandshakeData().getSingleUrlParam("user") + " connected to socketio");
String subscriptions = client.getHandshakeData().getSingleUrlParam("subscriptions");
for (String topic : subscriptions.split(",")) {
if (topicSubscribers.containsKey(topic)) {
topicSubscribers.get(topic).add(client);
} else {
Set<SocketIOClient> s = new HashSet<>();
s.add(client);
topicSubscribers.put(topic, s);
}
if (topicContent.containsKey(topic)) {
client.sendEvent("publish", topic, topicContent.get(topic));
}
}
});
// 与前端断开连接
server.addDisconnectListener(client -> {
logger.info("user " + client.getHandshakeData().getSingleUrlParam("user") + " disconnected to socketio");
String subscriptions = client.getHandshakeData().getSingleUrlParam("subscriptions");
for (String topic : subscriptions.split(",")) {
topicSubscribers.get(topic).remove(client);
}
});
server.start();
logger.info("socketio started on port " + socketioPort);
}

@Override
public void destroy() {
if (null != server) {
server.stop();
logger.info("socketio stopped");
}
}

public void publish(String topic, String content) {
topicContent.put(topic, content);
if (topicSubscribers.containsKey(topic)) {
for (SocketIOClient client : topicSubscribers.get(topic)) {
client.sendEvent("publish", topic, content);
}
}
}
}

前端整合

前端入口程序为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<script src="js/socket.io.js"></script>
<script>
$(document).ready(function () {
const socket = io('http://<%=request.getServerName()%>:${socketioPort}', {
query: {
user: "<%=user.getUsername()%>",
subscriptions: "${subscriptionsString}"
}
});
socket.connect();
socket.on("publish", (topic, content) => {
// ...
});
});
</script>
0%