基于 DWR + ActiveMQ 的 Web 端订阅消息推送

基于 Web 的消息订阅/发布系统可以分两部分考虑:一是使用什么工具搭建订阅/发布系统,二是如何进行 Web 前后端交互。

对于前者,业界很多 MQ 产品可以满足需求:RabbitMQActiveMQZeroMQ 等,本项目采用 ActiveMQ。关于 Web 前后端交互,目前最流行的技术是 HTML5 的 WebSocket,然而项目方要求兼容 IE8,因此只能寻求旧一点的技术,DWR 完全能满足项目的需求。

接下来介绍如何把这两种技术整合进基于 Spring + SpringMVC 的 Java Web 后台。

ActiveMQ 整合

  1. 引入 jar 包 org.apache.activemq:activemq-spring:5.14.5
  2. 由于 Spring 驱动的 ActiveMQ 连接方式无法满足项目特定需求,我手动管理 ActiveMQConnectionFactory。在 applicationContext-spring.xml 定义 ActiveMQ 的连接工厂:
1
2
3
4
5
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${amq.brokerURL}" />
<property name="userName" value="${amq.username}" />
<property name="password" value="${amq.username}" />
</bean>

DWR 整合

  1. 引入 jar 包 org.directwebremoting:dwr:3.0.2-RELEASE
  2. 根据官方文档把 DWR 整合进 Spring 和 SpringMVC,在 web.xml 添加 DWR 的 JavaScript 文件路径:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<servlet>
<servlet-name>SpringMVC</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:applicationContext-springmvc.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>SpringMVC</servlet-name>
<url-pattern>/dwr/*</url-pattern>
</servlet-mapping>
<servlet-mapping>
<servlet-name>SpringMVC</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
  1. applicationContext-spring.xml 添加 DWR 的 Servlet 配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<!-- DWR3 -->
<dwr:configuration/>
<dwr:annotation-config/>
<dwr:annotation-scan base-package="project.vo" scanDataTransferObject="true"/>

<dwr:controller id="dwrController" debug="true">
<dwr:config-param name="allowScriptTagRemoting" value="true"/>
<dwr:config-param name="activeReverseAjaxEnabled" value="true"/>
<dwr:config-param name="initApplicationScopeCreatorsAtStartup" value="true"/>
<dwr:config-param name="maxWaitAfterWrite" value="3000"/>
<dwr:config-param name="crossDomainSessionSecurity" value="false"/>
<dwr:config-param name="org.directwebremoting.extend.ScriptSessionManager"
value="project.dwr.DWRScriptSessionManager"/>
</dwr:controller>

<bean class="org.springframework.web.servlet.handler.SimpleUrlHandlerMapping">
<property name="alwaysUseFullPath" value="true"/>
<property name="mappings">
<props>
<prop key="/dwr/**/*">dwrController</prop>
</props>
</property>
</bean>

业务需求实现

  1. 监控 DWR 的 ScriptSession 的生命周期:
1
2
3
4
5
6
7
8
9
10
11
12
package project.dwr;

import org.directwebremoting.impl.DefaultScriptSessionManager;

public class DWRScriptSessionManager extends DefaultScriptSessionManager {

public DWRScriptSessionManager() {
this.addScriptSessionListener(new DWRScriptSessionListener());
System.out.println("binded DWRScriptSessionListener.");
}

}
  1. 为每个用户的连接绑定一个 ActiveMQ 的连接,用户离线后关闭连接:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package project.dwr;

import org.directwebremoting.ScriptSession;
import org.directwebremoting.WebContext;
import org.directwebremoting.WebContextFactory;
import org.directwebremoting.event.ScriptSessionEvent;
import org.directwebremoting.event.ScriptSessionListener;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.servlet.http.HttpSession;
import java.util.HashMap;
import java.util.Map;

public class DWRScriptSessionListener implements ScriptSessionListener {

private static final Map<String, Connection> amqConnectionPool = new HashMap<>();

public static void pourAmqConnection(String sessionId, Connection connection) {
amqConnectionPool.put(sessionId, connection);
}

@Override
public void sessionCreated(ScriptSessionEvent event) {
WebContext webContext = WebContextFactory.get();
HttpSession session = webContext.getSession();
ScriptSession scriptSession = event.getSession();
System.out.println("session: " + session.getId() + " scriptSession: " + scriptSession.getId() + "is created!");
Connection connection = amqConnectionPool.get(session.getId());
if (connection != null) {
scriptSession.setAttribute("connection", connection);
amqConnectionPool.remove(session.getId());
}
}

@Override
public void sessionDestroyed(ScriptSessionEvent event) {
WebContext webContext = WebContextFactory.get();
HttpSession session = webContext.getSession();
ScriptSession scriptSession = event.getSession();
System.out.println("session: " + session.getId() + " scriptSession: " + scriptSession.getId() + "is destroyed!");
try {
Connection connection = (Connection) scriptSession.getAttribute("connection");
Session jSession = (Session) scriptSession.getAttribute("session");
MessageConsumer consumer = (MessageConsumer) scriptSession.getAttribute("consumer");
if (consumer != null)
consumer.close();
if (jSession != null)
jSession.close();
if (connection != null) {
connection.stop();
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
  1. 前端调用的入口程序,这里前端定义自己的用户名和订阅内容:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package project.dwr.service;

import org.directwebremoting.ScriptSession;
import org.directwebremoting.WebContextFactory;
import org.directwebremoting.annotations.RemoteMethod;
import org.directwebremoting.annotations.RemoteProxy;
import org.springframework.stereotype.Service;

import javax.jms.*;

@Service
@RemoteProxy
public class DWRService {

@RemoteMethod
public void connect(String clientId, String subscriptions) {
ScriptSession scriptSession = WebContextFactory.get().getScriptSession();
Connection connection = (Connection) scriptSession.getAttribute("connection");
if (connection != null && !subscriptions.isEmpty()) {
try {
connection.setClientID(clientId);
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(subscriptions);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new DWRMessageListener(scriptSession));
System.out.println("[" + clientId + "] (" + subscriptions + ") connected.");
scriptSession.setAttribute("consumer", consumer);
scriptSession.setAttribute("session", session);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
  1. 前端入口程序为:
1
2
3
4
5
6
7
8
9
10
<script src="dwr/engine.js"></script>
<script src="dwr/util.js"></script>
<script src="dwr/interface/DWRService.js"></script>
<script>
$(document).ready(function () {
dwr.engine.setActiveReverseAjax(true);
dwr.engine.setNotifyServerOnPageUnload(true);
DWRService.connect("<%=user.getUsername()%>-" + new Date().toLocaleString(), generateTopicsDestination());
});
</script>
  1. DWRMessageListener 获取到 MQ 发布的消息后,通过 DWR 的 反向 Ajax 机制通知前端(传递消息并执行前端函数),目前前端控制还没完成,用 console.log 函数替代。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package project.dwr.service;

import project.vo.TopicMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.directwebremoting.ScriptBuffer;
import org.directwebremoting.ScriptSession;

import javax.jms.*;

public class DWRMessageListener implements MessageListener {

private ScriptSession scriptSession;

public DWRMessageListener(ScriptSession scriptSession) {
this.scriptSession = scriptSession;
}

@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
String client = ((Connection) scriptSession.getAttribute("connection")).getClientID();
String topic = ((ActiveMQTextMessage) message).getDestination().getPhysicalName();
String text = ((TextMessage) message).getText();
System.out.println("[" + client + "] received [" + topic + "] " + text);
ScriptBuffer scriptBuffer = new ScriptBuffer();
scriptBuffer.appendCall("console.log", new TopicMessage(topic, text));
scriptSession.addScript(scriptBuffer);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
0%