前端:
<%@ page language="java" import="java.util.*" pageEncoding="UTF-8" %><% String path = request.getContextPath(); String basePath = request.getScheme() + "://" + request.getServerName() + ":" + request.getServerPort() + path + "/"; String ppp = request.getServerName() + ":" + request.getServerPort() + path + "/";%>My JSP 'MyJsp.jsp' starting page
后端需要三个类:注册类、握手类、处理类(终端类)
握手类:
import java.net.InetSocketAddress;import java.net.URI;import java.util.Map;import javax.servlet.http.HttpSession;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.http.server.ServerHttpRequest;import org.springframework.http.server.ServerHttpResponse;import org.springframework.http.server.ServletServerHttpRequest;import org.springframework.web.socket.WebSocketHandler;import org.springframework.web.socket.server.HandshakeInterceptor;public class WebSocketHandshakeInterceptor implements HandshakeInterceptor { private static Logger logger = LoggerFactory .getLogger(HandshakeInterceptor.class); @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Mapattributes) throws Exception { if (request instanceof ServletServerHttpRequest) {// ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;// HttpSession session = servletRequest.getServletRequest()// .getSession(true); // 保存session中已登录的用户到websocket的上下文环境中。在推送消息的时候,需要根据当前登录用户获取点位权限 final IbmsUser user = IbmsUserHolder.getUser(); attributes.put(IBMSConstant.SESSION_WEBSOCKET_LOGINED_USER, user); // if (session != null) { // // 使用userName区分WebSocketHandler,以便定向发送消息 // String userName = (String) session // .getAttribute(Constants.SESSION_USERNAME); // if(userName==null){ // userName = "qianshihua"; // } // attributes.put(Constants.WEBSOCKET_USERNAME, userName); // } } return true; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { URI uri = request.getURI(); InetSocketAddress remoteAddress = request.getRemoteAddress(); String msg = "afterHandshake*******************\r\nuri:" + uri + ";\r\nremoteAddress;" + remoteAddress; System.err.println(msg); logger.debug(msg); }}
websocket注册类,注册类依赖握手类,可以编码实现,也可以直接通过spring配置实现:
import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.web.servlet.config.annotation.EnableWebMvc;import org.springframework.web.socket.WebSocketHandler;import org.springframework.web.socket.config.annotation.EnableWebSocket;import org.springframework.web.socket.config.annotation.WebSocketConfigurer;import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;@Configuration@EnableWebMvc@EnableWebSocketpublic class WebSocketConfig implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(systemWebSocketHandler(),"/point/webSocketServer.do").addInterceptors(new WebSocketHandshakeInterceptor())// .setAllowedOrigins("http://localhost:8087","http://10.16.38.21:8087","http://localhost:63342") ; registry.addHandler(systemWebSocketHandler(), "/point/sockjs/webSocketServer.do").addInterceptors(new WebSocketHandshakeInterceptor()) .withSockJS(); registry.addHandler(indexConfigWebSocketHandler(),"/app/indexConfig/indexConfigWebSocket.do").addInterceptors(new WebSocketHandshakeInterceptor()); } @Bean public WebSocketHandler systemWebSocketHandler(){ return new IbmsWebSocketHandler(); } @Bean public WebSocketHandler indexConfigWebSocketHandler(){ return new IndexConfigWebSocketHandler(); }}
后端可以注册多个handler,如上图配置。
handler:
import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import org.apache.commons.lang3.StringUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.web.socket.CloseStatus;import org.springframework.web.socket.WebSocketHandler;import org.springframework.web.socket.WebSocketMessage;import org.springframework.web.socket.WebSocketSession;import java.util.List;/** * 数据订阅处理类 */public class IndexConfigWebSocketHandler implements WebSocketHandler { private static final Logger logger = LoggerFactory.getLogger(IndexConfigWebSocketHandler.class); @Override public void afterConnectionEstablished(WebSocketSession webSocketSession) throws Exception { logger.debug("indexconfig connect to the websocket success......"); } /** * 处理前端发起的订阅信息 * 订阅列表中的id包含fmt前缀 * * @param session * @param message * @throws Exception */ @Override public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { String jsontext = (String) message.getPayload(); logger.info("收到统计项订阅:::" + jsontext); Object objUser = session.getAttributes().get( IBMSConstant.SESSION_WEBSOCKET_LOGINED_USER); if (objUser == null) { // 取不到session中的用户信息 throw new RuntimeException("会话中无用户信息"); } JSONObject jsonObject = JSONObject.parseObject(jsontext); Object optType = jsonObject.get("optType");//状态字段 String data = jsonObject.getString("data");//数据字段 //将数据字段解析成SubscribeBO列表 ListsubscribeBOs = JSON.parseArray(data, SubscribeBO.class); boolean ignoreSession = false; if (subscribeBOs == null || subscribeBOs.size() == 0) { if ("pausePush".equals(optType)) { //如果data为空,并且optType==pausePush,关闭该session的所有推送 this.removeReader(session); } return; } if (optType != null && "hb".equals(optType)) { //心跳 return; } if (optType != null && "pausePush".equals(optType)) { //暂时关闭推送 ignoreSession = true; } for (int i = 0; i < subscribeBOs.size(); i++) { SubscribeBO item = subscribeBOs.get(i); String id = item.getSubscribeId(); String type = item.getSubscribeTypeId(); if (StringUtils.isBlank(id) || StringUtils.isBlank(type)) { continue; } /*if(SubscribeType.WEATHER.getCode().equals(type)){ //如果是天气预报,构造唯一的天气订阅 item.setSubscribeData(JOBDATA_KEY_WEATHER); item.setSubscribeId(JOBDATA_KEY_WEATHER); }*/ //根据类型不同,选择不同的存储空间 BaseWSSHolder holder = this.getHolderByType(type); //根据SubscribeBO获取已订阅的session列表 List sessions = holder.getSessionBySubscribe(item); boolean exists = false; for (WebSocketSession wss : sessions) { //将本次session与session列表进行比对,已存在则 exists = true; if (wss.equals(session)) { exists = true; } } String msg = "关注"; if (ignoreSession == true) { msg = "取消关注"; } logger.info("websocket会话:" + session + msg + "了:" + SubscribeType.getDes(item.getSubscribeTypeId()) + " " + item.getSubscribeData()); //如果session列表中不存在本次session,则加入 if (exists == false && ignoreSession == false) { holder.putSession(session, item); } if (exists == true && ignoreSession == true) { //暂时关闭推送 sessions.remove(session); } } } @Override public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) throws Exception { if (webSocketSession.isOpen()) { webSocketSession.close(); } logger.debug("indexconfig websocket connection closed......"); } @Override public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) throws Exception { logger.debug("indexconfig websocket connection closed......"); } @Override public boolean supportsPartialMessages() { return false; } /** * 根据类型获取session holder * * @param type * @return * @throws Exception */ private BaseWSSHolder getHolderByType(String type) throws Exception { SubscribeType subscribeType = SubscribeType.getByCode(type); BaseWSSHolder holder = null; if (subscribeType == null) { throw new Exception("数据传入错误"); } switch (subscribeType) { case DEVICE_COUNT: //设备数量 holder = DeviceWSSHolder.getInstance(); break; case ALARM_DEVICE_COUNT: //报警数量 holder = AlarmDeviceWSSHolder.getInstance(); break; case STATE_DEVICE_COUNT: //某状态设备数量 holder = StateDeviceWSSHolder.getInstance(); break; case POINT_COUNT: //点位值 holder = PointWSSHolder.getInstance(); break; case WEATHER: //点位值 holder = WeatherWSSHolder.getInstance(); break; } if (holder == null) { logger.error("不存在对应的存储:" + type); throw new Exception("不存在对应的存储:" + type); } return holder; } private void removeReader(WebSocketSession session) { AlarmDeviceWSSHolder.getInstance().removeReader(session, null); DeviceWSSHolder.getInstance().removeReader(session, null); PointWSSHolder.getInstance().removeReader(session, null); StateDeviceWSSHolder.getInstance().removeReader(session, null); WeatherWSSHolder.getInstance().removeReader(session, null); }}
订阅的信息存储在内存中,形式为<session,List<data>>的键值对
存储工具类:
import com.google.common.collect.Lists;import org.springframework.web.socket.WebSocketSession;import java.util.*;/** * 保存设备数量的订阅(哪些session订阅了设备数量) * 不存储统计数据值 */public class DeviceWSSHolder implements BaseWSSHolder { /** * key值为统计,value值回哪些session关心这个点位 */ private Map> sessions; private DeviceWSSHolder() { } private static class SingletonHolder { public final static DeviceWSSHolder holder = new DeviceWSSHolder(); } public static DeviceWSSHolder getInstance() { return SingletonHolder.holder; } /** * 保存统计ID和websocket会话的关系 * * @param s * @param subscribeBO */ @Override public void putSession(WebSocketSession s, SubscribeBO subscribeBO) { if (getInstance().sessions == null) { getInstance().sessions = new HashMap >(); } if (getInstance().sessions.get(subscribeBO) == null) { getInstance().sessions.put(subscribeBO, new ArrayList ()); } final List list = getInstance().sessions.get(subscribeBO); list.add(s); } @Override public void removeReader(WebSocketSession reader, SubscribeBO subscribeBO) { if (getInstance().sessions != null && reader != null) { if (subscribeBO != null) { //移除该session的某个具体订阅 List readers = this.getSessionBySubscribe(subscribeBO); if (readers.size() > 0 && readers.contains(reader)) { readers.remove(reader); } } else { //移除该session的所有订阅 for (Map.Entry > entry : getInstance().sessions.entrySet()) { List readers = entry.getValue(); //确定有session订阅 if (readers.size() > 0 && readers.contains(reader)) { readers.remove(reader); break; } } } } } /** * 根据统计ID获取websocket的会话信息 * * @param subscribeBO * @return */ @Override public List getSessionBySubscribe(SubscribeBO subscribeBO) { if (getInstance().sessions == null) { getInstance().sessions = new HashMap >(); } if (getInstance().sessions.get(subscribeBO) == null) { getInstance().sessions.put(subscribeBO, new ArrayList ()); } return getInstance().sessions.get(subscribeBO); } /** * 获取所有有session订阅的业务ID * 业务ID带de前缀 * @return */ public List getEffectDataIds() { List ids = Lists.newArrayList(); if (getInstance().sessions != null) { for (Map.Entry > entry : getInstance().sessions.entrySet()) { List readers = entry.getValue(); //确定有session订阅 if (readers != null && readers.size() > 0) { SubscribeBO bo = entry.getKey(); ids.add(bo.getSubscribeData());//真正的业务id } } } //String idsStr = Joiner.on(",").join(ids); return ids; } /** * 根据SubscribeBO获取一条订阅信息 * @param condition * @return */ public Map.Entry > getItemBySubscribeBO(SubscribeBO condition) { if (getInstance().sessions != null && condition != null) { for (Map.Entry > entry : getInstance().sessions.entrySet()) { if (entry.getKey().equals(condition)) { return entry; } } } return null; } /*public SubscribeBO getSubscribeByData(Long data) { Set boSet = getInstance().sessions.keySet(); for (SubscribeBO bo : boSet) { System.out.println(str); } List ids = Lists.newArrayList(); if (getInstance().sessions != null) { for (Map.Entry > entry : getInstance().sessions.entrySet()) { List readers = entry.getValue(); //确定有session订阅 if (readers != null && readers.size() > 0) { SubscribeBO bo = entry.getKey(); ids.add(Long.parseLong(bo.getData()));//真正的业务id } } } //String idsStr = Joiner.on(",").join(ids); return ids; }*/}
订阅工具类(subscribeBO):主要就是将接收到的websocket信息转成java对象
import com.alibaba.fastjson.annotation.JSONField;/** * 订阅的对象 */public class SubscribeBO { @JSONField(name="statisticsId") private String subscribeId; @JSONField(name="statisticsTypeId") private String subscribeTypeId; @JSONField(name="statisticsData") private String subscribeData; @JSONField(name="statisticsValue") private String subscribeValue; public SubscribeBO() { } public SubscribeBO(String subscribeTypeId, String subscribeData) { this.subscribeTypeId = subscribeTypeId; this.subscribeData = subscribeData; } public SubscribeBO(String subscribeId, String subscribeTypeId, String subscribeData) { this.subscribeId = subscribeId; this.subscribeTypeId = subscribeTypeId; this.subscribeData = subscribeData; } public String getSubscribeId() { return subscribeId; } public void setSubscribeId(String subscribeId) { this.subscribeId = subscribeId; } public String getSubscribeTypeId() { return subscribeTypeId; } public void setSubscribeTypeId(String subscribeTypeId) { this.subscribeTypeId = subscribeTypeId; } public String getSubscribeData() { return subscribeData; } public void setSubscribeData(String subscribeData) { this.subscribeData = subscribeData; } public String getSubscribeValue() { return subscribeValue; } public void setSubscribeValue(String subscribeValue) { this.subscribeValue = subscribeValue; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; SubscribeBO bo = (SubscribeBO) o; if (!subscribeTypeId.equals(bo.subscribeTypeId)) return false; return subscribeData != null ? subscribeData.equals(bo.subscribeData) : bo.subscribeData == null; } @Override public int hashCode() { int result = subscribeTypeId.hashCode(); result = 31 * result + (subscribeData != null ? subscribeData.hashCode() : 0); return result; }}
推送代码太多,主要是通过spring+Quartz进行后台运算,运算完毕之后将值按照订阅(DeviceWSSHolder)反查session,发送到客户端
import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.google.common.base.Joiner;import com.google.common.base.Splitter;import com.google.common.collect.Lists;import org.apache.commons.lang.math.RandomUtils;import org.quartz.DisallowConcurrentExecution;import org.quartz.JobDataMap;import org.quartz.JobExecutionContext;import org.quartz.JobExecutionException;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.ApplicationContext;import org.springframework.scheduling.quartz.QuartzJobBean;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.web.socket.TextMessage;import org.springframework.web.socket.WebSocketSession;import java.io.IOException;import java.util.*;/** * 设备数量推送任务 */@DisallowConcurrentExecutionpublic class DevicePushJob extends QuartzJobBean { private Logger log = LoggerFactory.getLogger(DevicePushJob.class); /* @Autowired private DeviceService deviceService;*/ @SuppressWarnings("unused") @Override protected void executeInternal(JobExecutionContext context) throws JobExecutionException { final JobDataMap jobDataMap = context.getJobDetail().getJobDataMap(); final Object object = jobDataMap.get(JOBDATA_KEY_APPCTX); final ApplicationContext appCtx = (ApplicationContext) object; final DeviceWSSHolder deviceWSSHolder = (DeviceWSSHolder) jobDataMap .get(JOBDATA_KEY_INDEXCONFIG_DEVICE); Listids = deviceWSSHolder.getEffectDataIds(); if(ids.size()==0){ return; } //String idsStr = Joiner.on(",").join(ids); //System.out.println("××××××××××××××××××要查询的设备类别是:"+idsStr); //log.info("××××××××××××××××××要查询的设备类别是:"+idsStr); //查询数据 id,type,value 把数据都装到新的List 中发送,DeviceWSSHolder数据仅作为字典查询用 List integers = Lists.newArrayList(); for (String typeIdFmt : ids) { List result = Splitter.onPattern(PORTLET_TREE_DEVICETYPE_FORMAT) .omitEmptyStrings().splitToList(typeIdFmt); Integer id = Integer.parseInt(result.get(0)); integers.add(id); } List subscribeBOs = Lists.newArrayList(); DeviceService deviceService = appCtx.getBean(DeviceService.class); Map deviceCounts = deviceService.countDeviceByDeviceType(integers,false); if (deviceCounts == null || deviceCounts.size()==0) { return; } for (Map.Entry deviceCount : deviceCounts.entrySet()) { Integer deviceTypeId = deviceCount.getKey(); Integer count = deviceCount.getValue(); SubscribeBO sb = new SubscribeBO(SubscribeType.DEVICE_COUNT .getCode(),PORTLET_TREE_DEVICETYPE_FORMAT+deviceTypeId.toString()); sb.setSubscribeValue(""+count); subscribeBOs.add(sb); } for(SubscribeBO bo:subscribeBOs){ Map.Entry > entry = DeviceWSSHolder .getInstance().getItemBySubscribeBO(bo); if(entry !=null){ SubscribeBO temp = entry.getKey(); bo.setSubscribeId(temp.getSubscribeId()); List sessions = entry.getValue(); Iterator iterator = sessions.iterator(); while (iterator.hasNext()) { WebSocketSession session = iterator.next(); if (session != null && session.isOpen()) { try { JSONObject ret = new JSONObject(); ret.put("success", true); List retSbo = Lists.newArrayList(bo); ret.put("data", retSbo); String jsonString = JSON.toJSONString(ret); //System.err.println(jsonString); log.info(jsonString); session.sendMessage(new TextMessage(jsonString)); } catch (IOException e) { log.error(e.getMessage()); } }else{ iterator.remove(); } } } } }}
附:seajs封装的前端js工具类
define(function (require, exports, module) { var devWebSocket = {}; var indexConfigSocket = function (opt) { if ('WebSocket' in window) { devWebSocket = new WebSocket("ws://" + window.location.host + basePath + "/app/indexConfig/indexConfigWebSocket.do"); } else if ('MozWebSocket' in window) { devWebSocket = new MozWebSocket( "ws://" + window.location.host + basePath + "/ws/point/webSocketServer.do"); } else { devWebSocket = new SockJS( "http://" + window.location.host + basePath + "/ws/point/webSocketServer.do"); } devWebSocket.onopen = function (evnt) { }; devWebSocket.onmessage = function (evnt) { //console.log("onMessage:"+"(" + evnt.data + ")") window.PubSub.publish('indexConfigSocket-onMessage', evnt); }; devWebSocket.onerror = function (e) { console.log('indexConfig webSocket error...'); for (var p in e) { //alert(p + "=" + e[p]); } }; devWebSocket.onclose = function (evnt) { console.log('indexConfig webSocket error...'); }; }; indexConfigSocket.prototype.send = function (indexConfigIdsStr) { var indexConfig = {}; indexConfig.data = indexConfigIdsStr; indexConfig.optType = ''; var t = JSON.stringify(indexConfig); console.log("请求报文:" + t + "") devWebSocket.send(t); }; indexConfigSocket.prototype.close = function (indexConfigIdsStr) { var indexConfig = {}; indexConfig.data = indexConfigIdsStr == null ? [] : indexConfigIdsStr; indexConfig.optType = 'pausePush'; var t = JSON.stringify(indexConfig); console.log("关闭报文:" + t); devWebSocket.send(t); }; module.exports = indexConfigSocket;})