package com.yc.sdk.WebSocketMessage.api.messagelistener; import com.alibaba.fastjson.JSON; import com.esotericsoftware.minlog.Log; import com.yc.sdk.WebSocketMessage.action.WebSocketMessageServer; import com.yc.sdk.WebSocketMessage.entity.MessageInfo; import com.yc.sdk.WebSocketMessage.entity.MessageType; import com.yc.sdk.WebSocketMessage.entity.WsMessageUserEntity; import com.yc.sdk.WebSocketMessage.service.*; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.RedisSerializer; import java.util.List; /** * 接收订阅消息 * 发布订阅消息程序: com.yc.sdk.WebSocketMessage.action.WebSocketMessageServer.publishMessageToRedis(Message message) * * @author johnswang */ @Slf4j public class WebSocketMessageListener implements MessageListener { @Autowired private RedisTemplate redisTemplate; @Autowired IMMessageIfc imMessageIfc; @Autowired HeXiaoMessageIfc heXiaoMessageIfc; @Autowired ApiAuthMessageIfc apiAuthMessageIfc; @Autowired RefreshPanicBuyingIfc refreshPanicBuyingIfc; @Autowired AbcPayMessageIfc abcPayMessageIfc; @Autowired TasksMessageIfc tasksMessageIfc; @Autowired VersionMessageIfc versionMessageIfc; @Autowired RefreshDataSourceMessageIfc refreshDataSourceMessageIfc; @Autowired SingleAccountMessageIfc singleAccountMessageIfc; @Override public void onMessage(Message message, byte[] pattern) { try { RedisSerializer serializer = redisTemplate.getValueSerializer(); Object body = serializer.deserialize(message.getBody()); //String channel = new String(pattern); //取订阅频道 主题 channel MessageInfo msg = null; if (body instanceof MessageInfo) { msg = (MessageInfo) body; } else { msg = MessageInfo.build(String.valueOf(body)); } //log.info("订阅接收到的消息:" + msg.toString()); //群发所有小程序用户 if ((msg.getMsgType().equals(MessageType.REFRESH_PANICBUYING) && (msg.getUserCode() == null || msg.getUserCode().equals(""))) || MessageType.NOTICE_SYSTEM_STOP.equals(msg.getMsgType())|| MessageType.NOTICE_SYSTEM_START.equals(msg.getMsgType())) { //msg.getUserFromType()==null,表示取系统的所有用户 List wsMessageUserEntityList = WebSocketMessageServer.getOnlineUser(msg.getDbId(), msg.getUserFromType()); //只有是存在于当前的用户信息才需要处理,否则不需要处理 //log.info("OnlineUserList:"+JSON.toJSONString(WebSocketMessageServer.getOnlineUserList())); if (wsMessageUserEntityList != null && wsMessageUserEntityList.size() != 0) { for (WsMessageUserEntity wsMessageUserEntity : wsMessageUserEntityList) { handlerMessage(msg, wsMessageUserEntity); } } } else if (MessageType.REFRESH_TASK.equals(msg.getMsgType()) || MessageType.REFRESH_LOCALVERSION_TASK.equals(msg.getMsgType())|| MessageType.CHECK_SYSTEM_DISABLED.equals(msg.getMsgType()) ) { //对于没用websocket接收的情况,直接处理相关消息 handlerMessage(msg, null); } else { List wsMessageUserEntityList = WebSocketMessageServer.getOnlineUser(msg.getDbId(), msg.getUserFromType(), msg.getUserCode()); //只有是存在于当前的用户信息才需要处理,否则不需要处理 if (wsMessageUserEntityList != null && wsMessageUserEntityList.size() != 0) { for (WsMessageUserEntity wsMessageUserEntity : wsMessageUserEntityList) { handlerMessage(msg, wsMessageUserEntity); } } } } catch (Exception e) { e.printStackTrace(); } } /** * 根据消息类型调用不同的类处理 * * @return */ protected void handlerMessage(MessageInfo message, WsMessageUserEntity userEntity) { MessageIfc messageIfc = null; if (message != null && message.getMsgType() != null) { switch (message.getMsgType()) { // case 6001: //APP扫码登录 // // messageIfc=imMessageIfc; // break; // case 6002: //待办事宜(PC端右下角) // case 6003: //系统级弹窗信息(要避免高密度重复弹出) // messageIfc=imMessageIfc; // break; // case 6004: //收取服务费续费通知 // //messageIfc=imMessageIfc; // break; case 6005: //检查系统是否停用,代替maintaince.do case 6014: //关闭系统通知 case 6015: //启用系统通知 case 6018: //切换手机号删除会话 messageIfc = refreshDataSourceMessageIfc; break; case 6006: //小程序扫码核销(扫码了,等待按“核销”按钮) messageIfc = heXiaoMessageIfc; break; case 6007: //发送即时消息 im messageIfc = imMessageIfc; case 6008: //API认证消息 messageIfc = apiAuthMessageIfc; break; case 6009: //刷新小程序端活动内容 MessageType.REFRESH_PANICBUYING messageIfc = refreshPanicBuyingIfc; break; case 6010: //小程序扫码核销(点了“核销”按钮) messageIfc = heXiaoMessageIfc; break; case 6011: //农行支付结果通知回调 messageIfc = abcPayMessageIfc; break; case 6012: //刷新任务作业 messageIfc = tasksMessageIfc; break; case 6013: //刷新内存版本号 messageIfc = versionMessageIfc; break; case 6016: //只能存在一个账号登录 case 6017: //会话失效 messageIfc = singleAccountMessageIfc; break; default: } if (messageIfc != null) { messageIfc.handlerMessage(message, userEntity); } } } public RedisTemplate getRedisTemplate() { return redisTemplate; } public void setRedisTemplate(RedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } }