package com.yc.sdk.WebSocketMessage.action;
|
|
import com.alibaba.fastjson.JSON;
|
import com.yc.action.grid.GridUtils;
|
import com.yc.factory.FactoryBean;
|
import com.yc.im.entity.BaseMsg;
|
import com.yc.im.service.RedisDAO;
|
import com.yc.im.util.RedisSocket;
|
import com.yc.sdk.WebSocketMessage.entity.*;
|
import com.yc.sdk.jedis.RedisKey;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.data.redis.core.RedisTemplate;
|
import org.springframework.web.bind.annotation.RestController;
|
|
import javax.websocket.*;
|
import javax.websocket.server.PathParam;
|
import javax.websocket.server.ServerEndpoint;
|
import java.io.IOException;
|
import java.io.Serializable;
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
@RestController
|
@ServerEndpoint(value = "/ws/websocketMessage/{usercode}/{dbid}/{sessionid}/{userfromtype}")
|
public class WebSocketMessageServer implements Serializable {
|
/**
|
*
|
*/
|
private static final long serialVersionUID = -1581777096026790075L;
|
//websocket 在线用户
|
private static Map<String, WsMessageUserEntity> onlineUserList = new ConcurrentHashMap<String, WsMessageUserEntity>();
|
|
final Logger log = LoggerFactory.getLogger(this.getClass());
|
public static Map<String, WsMessageUserEntity> getOnlineUserList() {
|
return onlineUserList;
|
}
|
|
|
/***
|
* 打开连接
|
* @param userCode
|
* @param dbId
|
* @param sessionId
|
* @param userType 用户来源分类:1.ERP 网页端用户,2.ERP APP端用户,3.联盟商城版小程序(旧版),4.活动版小程序(新版)
|
* @param session
|
*/
|
@OnOpen
|
public void onOpen(
|
@PathParam("usercode") String userCode,
|
@PathParam("dbid") Integer dbId,
|
@PathParam("sessionid") String sessionId,
|
@PathParam("userfromtype") String userFromType ,
|
Session session) {
|
session.getUserProperties().put("org.apache.tomcat.websocket.BLOCKING_SEND_TIMEOUT", 30000L);//30秒
|
session.getUserProperties().put("org.apache.tomcat.websocket.textBufferSize", 10240);//10K
|
try {
|
WsMessageUserEntity wsMessageUserEntity = getOnlineUser(dbId, userFromType, userCode);
|
WsUserEntity wsUserEntity = WsUserEntity.builder().dbId(dbId).userCode(userCode).userFromType(userFromType).sessionId(sessionId).build();
|
//log.error("onOpen>>>"+dbId+":usercode:"+userCode);
|
//log.error("onlineUserList.size>>>"+onlineUserList.size());
|
if (wsMessageUserEntity == null ) {
|
wsMessageUserEntity = new WsMessageUserEntity(userCode, dbId, sessionId, userFromType, session);
|
addOnlineUser(dbId, userCode, userFromType, wsMessageUserEntity);
|
session.getUserProperties().put("wsUserEntity", wsUserEntity);
|
}
|
//----取自己的留言信息
|
handleOffLineMessage(wsUserEntity);
|
} catch (Exception e) {
|
e.printStackTrace();
|
printError(e, session);
|
}
|
|
}
|
|
/**
|
* 取自己的留言信息
|
* @param wsUserEntity
|
*/
|
private void handleOffLineMessage(WsUserEntity wsUserEntity) {
|
RedisTemplate redisTemplate = (RedisTemplate) FactoryBean.getBean("redisTemplate");
|
Object object = redisTemplate.opsForValue().get(RedisKey.MUTUAL_QRCODE + ":" + wsUserEntity.getDbId() + ":offline:" + wsUserEntity.getUserCode());
|
if (object != null) {
|
MessageInfo messageInfo = new MessageInfo();
|
messageInfo.setUserCode(wsUserEntity.getUserCode());
|
messageInfo.setDbId(wsUserEntity.getDbId());
|
messageInfo.setUserFromType(wsUserEntity.getUserFromType()) ;
|
messageInfo.setSessionId(wsUserEntity.getSessionId()) ;
|
messageInfo.setMsg(String.valueOf(object));
|
messageInfo.setMsgType(MessageType.MUTUAL_AUTH);
|
WebSocketMessageServer.publishMessageToRedis(messageInfo);
|
}
|
|
}
|
|
|
/**
|
* 断开连接
|
*
|
* @param dbId
|
* @param userCode
|
* @param userFromType 用户来源分类:1.ERP 网页端用户,2.ERP APP端用户,3.联盟商城版小程序(旧版),4.活动版小程序(新版)
|
* @param sessionid
|
*/
|
@OnClose
|
public void onClose(@PathParam("usercode") String userCode,
|
@PathParam("dbid") Integer dbId,
|
@PathParam("userfromtype") String userFromType ,
|
@PathParam("sessionid") String sessionId) {
|
removeOnlineUser(dbId, userCode, sessionId,userFromType);
|
}
|
|
|
@OnError
|
public void onError(Session session, Throwable error) {
|
error.printStackTrace();
|
}
|
|
/**
|
* 新增在线用户
|
*
|
* @param dbId
|
* @param userFromType 用户来源分类:1.ERP 网页端用户,2.ERP APP端用户,3.联盟商城版小程序(旧版),4.活动版小程序(新版)
|
* @param userCode
|
* @param wsMessageUserEntity
|
*/
|
public void addOnlineUser(Integer dbId, String userCode,String userFromType, WsMessageUserEntity wsMessageUserEntity) {
|
onlineUserList.put(RedisSocket.CHANEL_WS_MESSAGES + ":" + dbId + ":" + userFromType +":" + userCode, wsMessageUserEntity);
|
}
|
|
/**
|
* 获取在线用户
|
*
|
* @param dbId
|
* @param userFromType 用户来源分类:1.ERP 网页端用户,2.ERP APP端用户,3.联盟商城版小程序(旧版),4.活动版小程序(新版)
|
* @param userCode
|
* @return
|
*/
|
public static WsMessageUserEntity getOnlineUser(Integer dbId, String userFromType,String userCode) {
|
return onlineUserList.get(RedisSocket.CHANEL_WS_MESSAGES + ":" + dbId + ":" + userFromType + ":" + userCode);
|
}
|
|
|
public static List<WsMessageUserEntity> getOnlineUser(Integer dbId, String userFromType) {
|
List<WsMessageUserEntity> wsMessageUserList = new ArrayList<WsMessageUserEntity>();
|
for(Map.Entry<String, WsMessageUserEntity> entry:onlineUserList.entrySet()){
|
if (entry.getKey().startsWith(RedisSocket.CHANEL_WS_MESSAGES + ":" + dbId + ":" + userFromType + ":")) {
|
wsMessageUserList.add(entry.getValue()) ;
|
}
|
//System.out.println(entry.getKey()+"--->"+entry.getValue());
|
}
|
// return onlineUserList.get(RedisSocket.CHANEL_WS_MESSAGES + ":" + dbId + ":" + userFromType + ":" + userCode);
|
return wsMessageUserList;
|
}
|
|
/**
|
* 删除在线用户
|
*
|
* @param dbId
|
* @param userCode
|
* @param sessionId
|
* @param userFromType 用户来源分类:1.ERP 网页端用户,2.ERP APP端用户,3.联盟商城版小程序(旧版),4.活动版小程序(新版)
|
*/
|
public void removeOnlineUser(Integer dbId, String userCode, String sessionId,String userFromType) {
|
onlineUserList.remove(RedisSocket.CHANEL_WS_MESSAGES + ":" + dbId + ":" +userFromType+":"+ userCode);
|
}
|
|
protected String getError(Exception e) {
|
String error = e.getCause() == null ? e.getMessage() : e.getCause().getMessage();
|
CallBackMessage info = new CallBackMessage();
|
info.setState(-1);
|
info.setInfo(new BaseMsg(error));
|
return JSON.toJSONString(info);
|
|
}
|
|
/**
|
* 接收客户端传过来的消息
|
*
|
* @param message
|
* @throws IOException
|
*/
|
@OnMessage
|
public void onMessage(String message,Session session) throws IOException {
|
MessageInfo messageInfo = MessageInfo.build(message);
|
WsUserEntity wsUserEntity = (WsUserEntity)session.getUserProperties().get("wsUserEntity");
|
if (wsUserEntity != null) {
|
//System.out.println(getClass()+",wsUserEntity:"+wsUserEntity.toString());
|
messageInfo.setUserCode(wsUserEntity.getUserCode());
|
messageInfo.setDbId(wsUserEntity.getDbId());
|
messageInfo.setUserFromType(wsUserEntity.getUserFromType()) ;
|
messageInfo.setSessionId(wsUserEntity.getSessionId()) ;
|
}
|
WebSocketMessageServer.publishMessageToRedis(messageInfo);
|
}
|
|
/**
|
* 输出出错信息
|
*
|
* @param e
|
*/
|
public static void printError(Exception e, Session session) {
|
String error = (e.getCause() == null ? e.getMessage() : e.getCause().getMessage());
|
CallBackMessage info = new CallBackMessage();
|
info.setState(-1);
|
info.setInfo(new BaseMsg(error));
|
session.getAsyncRemote().sendText(JSON.toJSONString(info));
|
}
|
|
/**
|
* 通过redis发布消息到指定频道,redis客户端通过订阅指定频道来接收消息,通过这种方法解决集群带来的问题
|
* 接收监听器: com.yc.sdk.WebSocketMessage.api.messagelistener.WebSocketMessageListener.java
|
*/
|
public static void publishMessageToRedis(MessageInfo message) {
|
//发送消息
|
RedisDAO redisDAO = (RedisDAO) FactoryBean.getBean("redisDAO");
|
redisDAO.publishMessage(RedisSocket.CHANEL_WS_MESSAGES, message);
|
//System.out.println(">>>publishMessage:"+RedisSocket.CHANEL_WS_MESSAGES+"|"+message);
|
}
|
|
|
}
|