博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java web项目使用webSocket
阅读量:4688 次
发布时间:2019-06-09

本文共 21610 字,大约阅读时间需要 72 分钟。

前端:

<%@ page language="java" import="java.util.*" pageEncoding="UTF-8" %><%    String path = request.getContextPath();    String basePath = request.getScheme() + "://"            + request.getServerName() + ":" + request.getServerPort()            + path + "/";    String ppp = request.getServerName() + ":" + request.getServerPort()            + path + "/";%>        My JSP 'MyJsp.jsp' starting page    

后端需要三个类:注册类、握手类、处理类(终端类)

握手类:

import java.net.InetSocketAddress;import java.net.URI;import java.util.Map;import javax.servlet.http.HttpSession;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.http.server.ServerHttpRequest;import org.springframework.http.server.ServerHttpResponse;import org.springframework.http.server.ServletServerHttpRequest;import org.springframework.web.socket.WebSocketHandler;import org.springframework.web.socket.server.HandshakeInterceptor;public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {    private static Logger logger = LoggerFactory            .getLogger(HandshakeInterceptor.class);    @Override    public boolean beforeHandshake(ServerHttpRequest request,            ServerHttpResponse response, WebSocketHandler wsHandler,            Map
attributes) throws Exception { if (request instanceof ServletServerHttpRequest) {// ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;// HttpSession session = servletRequest.getServletRequest()// .getSession(true); // 保存session中已登录的用户到websocket的上下文环境中。在推送消息的时候,需要根据当前登录用户获取点位权限 final IbmsUser user = IbmsUserHolder.getUser(); attributes.put(IBMSConstant.SESSION_WEBSOCKET_LOGINED_USER, user); // if (session != null) { // // 使用userName区分WebSocketHandler,以便定向发送消息 // String userName = (String) session // .getAttribute(Constants.SESSION_USERNAME); // if(userName==null){ // userName = "qianshihua"; // } // attributes.put(Constants.WEBSOCKET_USERNAME, userName); // } } return true; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { URI uri = request.getURI(); InetSocketAddress remoteAddress = request.getRemoteAddress(); String msg = "afterHandshake*******************\r\nuri:" + uri + ";\r\nremoteAddress;" + remoteAddress; System.err.println(msg); logger.debug(msg); }}

 

websocket注册类,注册类依赖握手类,可以编码实现,也可以直接通过spring配置实现:

import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.web.servlet.config.annotation.EnableWebMvc;import org.springframework.web.socket.WebSocketHandler;import org.springframework.web.socket.config.annotation.EnableWebSocket;import org.springframework.web.socket.config.annotation.WebSocketConfigurer;import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;@Configuration@EnableWebMvc@EnableWebSocketpublic class WebSocketConfig  implements WebSocketConfigurer {    @Override    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {        registry.addHandler(systemWebSocketHandler(),"/point/webSocketServer.do").addInterceptors(new WebSocketHandshakeInterceptor())//        .setAllowedOrigins("http://localhost:8087","http://10.16.38.21:8087","http://localhost:63342")        ;        registry.addHandler(systemWebSocketHandler(), "/point/sockjs/webSocketServer.do").addInterceptors(new WebSocketHandshakeInterceptor())                .withSockJS();        registry.addHandler(indexConfigWebSocketHandler(),"/app/indexConfig/indexConfigWebSocket.do").addInterceptors(new WebSocketHandshakeInterceptor());    }    @Bean    public WebSocketHandler systemWebSocketHandler(){        return new IbmsWebSocketHandler();    }    @Bean    public WebSocketHandler indexConfigWebSocketHandler(){        return new IndexConfigWebSocketHandler();    }}

后端可以注册多个handler,如上图配置。

handler:

import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import org.apache.commons.lang3.StringUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.web.socket.CloseStatus;import org.springframework.web.socket.WebSocketHandler;import org.springframework.web.socket.WebSocketMessage;import org.springframework.web.socket.WebSocketSession;import java.util.List;/** * 数据订阅处理类 */public class IndexConfigWebSocketHandler implements WebSocketHandler {    private static final Logger logger = LoggerFactory.getLogger(IndexConfigWebSocketHandler.class);    @Override    public void afterConnectionEstablished(WebSocketSession webSocketSession) throws Exception {        logger.debug("indexconfig connect to the websocket success......");    }    /**     * 处理前端发起的订阅信息     * 订阅列表中的id包含fmt前缀     *     * @param session     * @param message     * @throws Exception     */    @Override    public void handleMessage(WebSocketSession session, WebSocketMessage
message) throws Exception { String jsontext = (String) message.getPayload(); logger.info("收到统计项订阅:::" + jsontext); Object objUser = session.getAttributes().get( IBMSConstant.SESSION_WEBSOCKET_LOGINED_USER); if (objUser == null) { // 取不到session中的用户信息 throw new RuntimeException("会话中无用户信息"); } JSONObject jsonObject = JSONObject.parseObject(jsontext); Object optType = jsonObject.get("optType");//状态字段 String data = jsonObject.getString("data");//数据字段 //将数据字段解析成SubscribeBO列表 List
subscribeBOs = JSON.parseArray(data, SubscribeBO.class); boolean ignoreSession = false; if (subscribeBOs == null || subscribeBOs.size() == 0) { if ("pausePush".equals(optType)) { //如果data为空,并且optType==pausePush,关闭该session的所有推送 this.removeReader(session); } return; } if (optType != null && "hb".equals(optType)) { //心跳 return; } if (optType != null && "pausePush".equals(optType)) { //暂时关闭推送 ignoreSession = true; } for (int i = 0; i < subscribeBOs.size(); i++) { SubscribeBO item = subscribeBOs.get(i); String id = item.getSubscribeId(); String type = item.getSubscribeTypeId(); if (StringUtils.isBlank(id) || StringUtils.isBlank(type)) { continue; } /*if(SubscribeType.WEATHER.getCode().equals(type)){ //如果是天气预报,构造唯一的天气订阅 item.setSubscribeData(JOBDATA_KEY_WEATHER); item.setSubscribeId(JOBDATA_KEY_WEATHER); }*/ //根据类型不同,选择不同的存储空间 BaseWSSHolder holder = this.getHolderByType(type); //根据SubscribeBO获取已订阅的session列表 List
sessions = holder.getSessionBySubscribe(item); boolean exists = false; for (WebSocketSession wss : sessions) { //将本次session与session列表进行比对,已存在则 exists = true; if (wss.equals(session)) { exists = true; } } String msg = "关注"; if (ignoreSession == true) { msg = "取消关注"; } logger.info("websocket会话:" + session + msg + "了:" + SubscribeType.getDes(item.getSubscribeTypeId()) + " " + item.getSubscribeData()); //如果session列表中不存在本次session,则加入 if (exists == false && ignoreSession == false) { holder.putSession(session, item); } if (exists == true && ignoreSession == true) { //暂时关闭推送 sessions.remove(session); } } } @Override public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) throws Exception { if (webSocketSession.isOpen()) { webSocketSession.close(); } logger.debug("indexconfig websocket connection closed......"); } @Override public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) throws Exception { logger.debug("indexconfig websocket connection closed......"); } @Override public boolean supportsPartialMessages() { return false; } /** * 根据类型获取session holder * * @param type * @return * @throws Exception */ private BaseWSSHolder getHolderByType(String type) throws Exception { SubscribeType subscribeType = SubscribeType.getByCode(type); BaseWSSHolder holder = null; if (subscribeType == null) { throw new Exception("数据传入错误"); } switch (subscribeType) { case DEVICE_COUNT: //设备数量 holder = DeviceWSSHolder.getInstance(); break; case ALARM_DEVICE_COUNT: //报警数量 holder = AlarmDeviceWSSHolder.getInstance(); break; case STATE_DEVICE_COUNT: //某状态设备数量 holder = StateDeviceWSSHolder.getInstance(); break; case POINT_COUNT: //点位值 holder = PointWSSHolder.getInstance(); break; case WEATHER: //点位值 holder = WeatherWSSHolder.getInstance(); break; } if (holder == null) { logger.error("不存在对应的存储:" + type); throw new Exception("不存在对应的存储:" + type); } return holder; } private void removeReader(WebSocketSession session) { AlarmDeviceWSSHolder.getInstance().removeReader(session, null); DeviceWSSHolder.getInstance().removeReader(session, null); PointWSSHolder.getInstance().removeReader(session, null); StateDeviceWSSHolder.getInstance().removeReader(session, null); WeatherWSSHolder.getInstance().removeReader(session, null); }}

订阅的信息存储在内存中,形式为<session,List<data>>的键值对

存储工具类:

import com.google.common.collect.Lists;import org.springframework.web.socket.WebSocketSession;import java.util.*;/** * 保存设备数量的订阅(哪些session订阅了设备数量) * 不存储统计数据值 */public class DeviceWSSHolder implements BaseWSSHolder {    /**     * key值为统计,value值回哪些session关心这个点位     */    private Map
> sessions; private DeviceWSSHolder() { } private static class SingletonHolder { public final static DeviceWSSHolder holder = new DeviceWSSHolder(); } public static DeviceWSSHolder getInstance() { return SingletonHolder.holder; } /** * 保存统计ID和websocket会话的关系 * * @param s * @param subscribeBO */ @Override public void putSession(WebSocketSession s, SubscribeBO subscribeBO) { if (getInstance().sessions == null) { getInstance().sessions = new HashMap
>(); } if (getInstance().sessions.get(subscribeBO) == null) { getInstance().sessions.put(subscribeBO, new ArrayList
()); } final List
list = getInstance().sessions.get(subscribeBO); list.add(s); } @Override public void removeReader(WebSocketSession reader, SubscribeBO subscribeBO) { if (getInstance().sessions != null && reader != null) { if (subscribeBO != null) { //移除该session的某个具体订阅 List
readers = this.getSessionBySubscribe(subscribeBO); if (readers.size() > 0 && readers.contains(reader)) { readers.remove(reader); } } else { //移除该session的所有订阅 for (Map.Entry
> entry : getInstance().sessions.entrySet()) { List
readers = entry.getValue(); //确定有session订阅 if (readers.size() > 0 && readers.contains(reader)) { readers.remove(reader); break; } } } } } /** * 根据统计ID获取websocket的会话信息 * * @param subscribeBO * @return */ @Override public List
getSessionBySubscribe(SubscribeBO subscribeBO) { if (getInstance().sessions == null) { getInstance().sessions = new HashMap
>(); } if (getInstance().sessions.get(subscribeBO) == null) { getInstance().sessions.put(subscribeBO, new ArrayList
()); } return getInstance().sessions.get(subscribeBO); } /** * 获取所有有session订阅的业务ID * 业务ID带de前缀 * @return */ public List
getEffectDataIds() { List
ids = Lists.newArrayList(); if (getInstance().sessions != null) { for (Map.Entry
> entry : getInstance().sessions.entrySet()) { List
readers = entry.getValue(); //确定有session订阅 if (readers != null && readers.size() > 0) { SubscribeBO bo = entry.getKey(); ids.add(bo.getSubscribeData());//真正的业务id } } } //String idsStr = Joiner.on(",").join(ids); return ids; } /** * 根据SubscribeBO获取一条订阅信息 * @param condition * @return */ public Map.Entry
> getItemBySubscribeBO(SubscribeBO condition) { if (getInstance().sessions != null && condition != null) { for (Map.Entry
> entry : getInstance().sessions.entrySet()) { if (entry.getKey().equals(condition)) { return entry; } } } return null; } /*public SubscribeBO getSubscribeByData(Long data) { Set
boSet = getInstance().sessions.keySet(); for (SubscribeBO bo : boSet) { System.out.println(str); } List
ids = Lists.newArrayList(); if (getInstance().sessions != null) { for (Map.Entry
> entry : getInstance().sessions.entrySet()) { List
readers = entry.getValue(); //确定有session订阅 if (readers != null && readers.size() > 0) { SubscribeBO bo = entry.getKey(); ids.add(Long.parseLong(bo.getData()));//真正的业务id } } } //String idsStr = Joiner.on(",").join(ids); return ids; }*/}

订阅工具类(subscribeBO):主要就是将接收到的websocket信息转成java对象

import com.alibaba.fastjson.annotation.JSONField;/** * 订阅的对象 */public class SubscribeBO {    @JSONField(name="statisticsId")    private String subscribeId;    @JSONField(name="statisticsTypeId")    private String subscribeTypeId;    @JSONField(name="statisticsData")    private String subscribeData;    @JSONField(name="statisticsValue")    private String subscribeValue;    public SubscribeBO() {    }    public SubscribeBO(String subscribeTypeId, String subscribeData) {        this.subscribeTypeId = subscribeTypeId;        this.subscribeData = subscribeData;    }    public SubscribeBO(String subscribeId, String subscribeTypeId, String subscribeData) {        this.subscribeId = subscribeId;        this.subscribeTypeId = subscribeTypeId;        this.subscribeData = subscribeData;    }    public String getSubscribeId() {        return subscribeId;    }    public void setSubscribeId(String subscribeId) {        this.subscribeId = subscribeId;    }    public String getSubscribeTypeId() {        return subscribeTypeId;    }    public void setSubscribeTypeId(String subscribeTypeId) {        this.subscribeTypeId = subscribeTypeId;    }    public String getSubscribeData() {        return subscribeData;    }    public void setSubscribeData(String subscribeData) {        this.subscribeData = subscribeData;    }    public String getSubscribeValue() {        return subscribeValue;    }    public void setSubscribeValue(String subscribeValue) {        this.subscribeValue = subscribeValue;    }    @Override    public boolean equals(Object o) {        if (this == o) return true;        if (o == null || getClass() != o.getClass()) return false;        SubscribeBO bo = (SubscribeBO) o;        if (!subscribeTypeId.equals(bo.subscribeTypeId)) return false;        return subscribeData != null ? subscribeData.equals(bo.subscribeData) : bo.subscribeData == null;    }    @Override    public int hashCode() {        int result = subscribeTypeId.hashCode();        result = 31 * result + (subscribeData != null ? subscribeData.hashCode() : 0);        return result;    }}

推送代码太多,主要是通过spring+Quartz进行后台运算,运算完毕之后将值按照订阅(DeviceWSSHolder)反查session,发送到客户端

import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.google.common.base.Joiner;import com.google.common.base.Splitter;import com.google.common.collect.Lists;import org.apache.commons.lang.math.RandomUtils;import org.quartz.DisallowConcurrentExecution;import org.quartz.JobDataMap;import org.quartz.JobExecutionContext;import org.quartz.JobExecutionException;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.ApplicationContext;import org.springframework.scheduling.quartz.QuartzJobBean;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.web.socket.TextMessage;import org.springframework.web.socket.WebSocketSession;import java.io.IOException;import java.util.*;/** * 设备数量推送任务 */@DisallowConcurrentExecutionpublic class DevicePushJob extends QuartzJobBean {    private Logger log = LoggerFactory.getLogger(DevicePushJob.class);   /* @Autowired    private DeviceService deviceService;*/    @SuppressWarnings("unused")    @Override    protected void executeInternal(JobExecutionContext context)            throws JobExecutionException {        final JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();        final Object object = jobDataMap.get(JOBDATA_KEY_APPCTX);        final ApplicationContext appCtx = (ApplicationContext) object;        final DeviceWSSHolder deviceWSSHolder = (DeviceWSSHolder) jobDataMap                .get(JOBDATA_KEY_INDEXCONFIG_DEVICE);        List
ids = deviceWSSHolder.getEffectDataIds(); if(ids.size()==0){ return; } //String idsStr = Joiner.on(",").join(ids); //System.out.println("××××××××××××××××××要查询的设备类别是:"+idsStr); //log.info("××××××××××××××××××要查询的设备类别是:"+idsStr); //查询数据 id,type,value 把数据都装到新的List
中发送,DeviceWSSHolder数据仅作为字典查询用 List
integers = Lists.newArrayList(); for (String typeIdFmt : ids) { List
result = Splitter.onPattern(PORTLET_TREE_DEVICETYPE_FORMAT) .omitEmptyStrings().splitToList(typeIdFmt); Integer id = Integer.parseInt(result.get(0)); integers.add(id); } List
subscribeBOs = Lists.newArrayList(); DeviceService deviceService = appCtx.getBean(DeviceService.class); Map
deviceCounts = deviceService.countDeviceByDeviceType(integers,false); if (deviceCounts == null || deviceCounts.size()==0) { return; } for (Map.Entry
deviceCount : deviceCounts.entrySet()) { Integer deviceTypeId = deviceCount.getKey(); Integer count = deviceCount.getValue(); SubscribeBO sb = new SubscribeBO(SubscribeType.DEVICE_COUNT .getCode(),PORTLET_TREE_DEVICETYPE_FORMAT+deviceTypeId.toString()); sb.setSubscribeValue(""+count); subscribeBOs.add(sb); } for(SubscribeBO bo:subscribeBOs){ Map.Entry
> entry = DeviceWSSHolder .getInstance().getItemBySubscribeBO(bo); if(entry !=null){ SubscribeBO temp = entry.getKey(); bo.setSubscribeId(temp.getSubscribeId()); List
sessions = entry.getValue(); Iterator
iterator = sessions.iterator(); while (iterator.hasNext()) { WebSocketSession session = iterator.next(); if (session != null && session.isOpen()) { try { JSONObject ret = new JSONObject(); ret.put("success", true); List
retSbo = Lists.newArrayList(bo); ret.put("data", retSbo); String jsonString = JSON.toJSONString(ret); //System.err.println(jsonString); log.info(jsonString); session.sendMessage(new TextMessage(jsonString)); } catch (IOException e) { log.error(e.getMessage()); } }else{ iterator.remove(); } } } } }}

 附:seajs封装的前端js工具类

define(function (require, exports, module) {    var devWebSocket = {};    var indexConfigSocket = function (opt) {        if ('WebSocket' in window) {            devWebSocket = new WebSocket("ws://" + window.location.host + basePath + "/app/indexConfig/indexConfigWebSocket.do");        } else if ('MozWebSocket' in window) {            devWebSocket = new MozWebSocket(                "ws://" + window.location.host + basePath + "/ws/point/webSocketServer.do");        } else {            devWebSocket = new SockJS(                "http://" + window.location.host + basePath + "/ws/point/webSocketServer.do");        }        devWebSocket.onopen = function (evnt) {        };        devWebSocket.onmessage = function (evnt) {            //console.log("onMessage:"+"
(" + evnt.data + ")") window.PubSub.publish('indexConfigSocket-onMessage', evnt); }; devWebSocket.onerror = function (e) { console.log('indexConfig webSocket error...'); for (var p in e) { //alert(p + "=" + e[p]); } }; devWebSocket.onclose = function (evnt) { console.log('indexConfig webSocket error...'); }; }; indexConfigSocket.prototype.send = function (indexConfigIdsStr) { var indexConfig = {}; indexConfig.data = indexConfigIdsStr; indexConfig.optType = ''; var t = JSON.stringify(indexConfig); console.log("
请求报文:" + t + "") devWebSocket.send(t); }; indexConfigSocket.prototype.close = function (indexConfigIdsStr) { var indexConfig = {}; indexConfig.data = indexConfigIdsStr == null ? [] : indexConfigIdsStr; indexConfig.optType = 'pausePush'; var t = JSON.stringify(indexConfig); console.log("关闭报文:" + t); devWebSocket.send(t); }; module.exports = indexConfigSocket;})

 

转载于:https://www.cnblogs.com/winkey4986/p/5478332.html

你可能感兴趣的文章
OVER(PARTITION BY)函数用法
查看>>
altium annotate 选项设置 complete existing packages
查看>>
【模式识别与机器学习】——SVM举例
查看>>
【转】IT名企面试:微软笔试题(1)
查看>>
IO流入门-第十章-DataInputStream_DataOutputStream
查看>>
DRF的分页
查看>>
Mysql 模糊匹配(字符串str中是否包含子字符串substr)
查看>>
IIS的ISAPI接口简介
查看>>
python:open/文件操作
查看>>
16 乘法口诀输出
查看>>
mac 常用地址
查看>>
鼠标经过切换图片
查看>>
流程控制 Day06
查看>>
Linux下安装Tomcat
查看>>
windows live writer 2012 0x80070643
查看>>
C程序的启动和终止
查看>>
tomcat 和MySQL的安装
查看>>
11.5 内部类
查看>>
Cosine Similarity
查看>>
halt和shutdown 的区别
查看>>