fs-danaus
2023-07-10 4849078e3450b8d3b3030a658a34dd58b0630fc5
提交 | 用户 | age
a6a76f 1 package com.yc.sdk.WebSocketMessage.api.messagelistener;
F 2
3
484907 4 import com.alibaba.fastjson.JSON;
F 5 import com.esotericsoftware.minlog.Log;
55d506 6 import com.yc.sdk.WebSocketMessage.action.WebSocketMessageServer;
f959ba 7 import com.yc.sdk.WebSocketMessage.entity.MessageInfo;
349b07 8 import com.yc.sdk.WebSocketMessage.entity.MessageType;
55d506 9 import com.yc.sdk.WebSocketMessage.entity.WsMessageUserEntity;
251add 10 import com.yc.sdk.WebSocketMessage.service.*;
484907 11 import lombok.extern.slf4j.Slf4j;
a6a76f 12 import org.springframework.beans.factory.annotation.Autowired;
F 13 import org.springframework.data.redis.connection.Message;
14 import org.springframework.data.redis.connection.MessageListener;
15 import org.springframework.data.redis.core.RedisTemplate;
16 import org.springframework.data.redis.serializer.RedisSerializer;
17
55d506 18 import java.util.List;
a6a76f 19
F 20 /**
21  * 接收订阅消息
55d506 22  * 发布订阅消息程序: com.yc.sdk.WebSocketMessage.action.WebSocketMessageServer.publishMessageToRedis(Message message)
a6a76f 23  *
F 24  * @author johnswang
25  */
484907 26 @Slf4j
a6a76f 27 public class WebSocketMessageListener implements MessageListener {
F 28     @Autowired
29     private RedisTemplate<String, String> redisTemplate;
55d506 30     @Autowired
f959ba 31     IMMessageIfc imMessageIfc;
55d506 32     @Autowired
ad9fb8 33     HeXiaoMessageIfc heXiaoMessageIfc;
6990b5 34     @Autowired
565fe8 35     ApiAuthMessageIfc apiAuthMessageIfc;
6990b5 36     @Autowired
J 37     RefreshPanicBuyingIfc refreshPanicBuyingIfc;
251add 38     @Autowired
F 39     AbcPayMessageIfc abcPayMessageIfc;
9cd417 40     @Autowired
F 41     TasksMessageIfc tasksMessageIfc;
5fc8e3 42     @Autowired
F 43     VersionMessageIfc versionMessageIfc;
6fe1e6 44     @Autowired
F 45     RefreshDataSourceMessageIfc refreshDataSourceMessageIfc;
3c1697 46     @Autowired
F 47     SingleAccountMessageIfc singleAccountMessageIfc;
a6a76f 48     @Override
484907 49
a6a76f 50     public void onMessage(Message message, byte[] pattern) {
55d506 51         try {
F 52             RedisSerializer<?> serializer = redisTemplate.getValueSerializer();
53             Object body = serializer.deserialize(message.getBody());
54             //String channel = new String(pattern);   //取订阅频道 主题 channel
6fe1e6 55             MessageInfo msg = null;
F 56             if (body instanceof MessageInfo) {
284c55 57                 msg = (MessageInfo) body;
6fe1e6 58             } else {
284c55 59                 msg = MessageInfo.build(String.valueOf(body));
F 60             }
484907 61             //log.info("订阅接收到的消息:" + msg.toString());
9cd417 62
6fe1e6 63             //群发所有小程序用户
F 64             if ((msg.getMsgType().equals(MessageType.REFRESH_PANICBUYING) && (msg.getUserCode() == null || msg.getUserCode().equals(""))) ||
65                     MessageType.NOTICE_SYSTEM_STOP.equals(msg.getMsgType())||
66                     MessageType.NOTICE_SYSTEM_START.equals(msg.getMsgType())) {
484907 67                 //msg.getUserFromType()==null,表示取系统的所有用户
6fe1e6 68                 List<WsMessageUserEntity> wsMessageUserEntityList = WebSocketMessageServer.getOnlineUser(msg.getDbId(), msg.getUserFromType());
F 69                 //只有是存在于当前的用户信息才需要处理,否则不需要处理
484907 70                 //log.info("OnlineUserList:"+JSON.toJSONString(WebSocketMessageServer.getOnlineUserList()));
6fe1e6 71                 if (wsMessageUserEntityList != null && wsMessageUserEntityList.size() != 0) {
F 72                     for (WsMessageUserEntity wsMessageUserEntity : wsMessageUserEntityList) {
73                         handlerMessage(msg, wsMessageUserEntity);
74                     }
75                 }
c86295 76             } else if (MessageType.REFRESH_TASK.equals(msg.getMsgType()) ||
F 77                     MessageType.REFRESH_LOCALVERSION_TASK.equals(msg.getMsgType())||
78                     MessageType.CHECK_SYSTEM_DISABLED.equals(msg.getMsgType())
79             ) {
6fe1e6 80                 //对于没用websocket接收的情况,直接处理相关消息
F 81                 handlerMessage(msg, null);
82
83             } else {
84                 List<WsMessageUserEntity> wsMessageUserEntityList = WebSocketMessageServer.getOnlineUser(msg.getDbId(), msg.getUserFromType(), msg.getUserCode());
da7436 85                 //只有是存在于当前的用户信息才需要处理,否则不需要处理
F 86                 if (wsMessageUserEntityList != null && wsMessageUserEntityList.size() != 0) {
87                     for (WsMessageUserEntity wsMessageUserEntity : wsMessageUserEntityList) {
88                         handlerMessage(msg, wsMessageUserEntity);
89                     }
90                 }
55d506 91             }
F 92         } catch (Exception e) {
93             e.printStackTrace();
94         }
a6a76f 95     }
55d506 96
f959ba 97     /**
F 98      * 根据消息类型调用不同的类处理
55d506 99      *
f959ba 100      * @return
F 101      */
55d506 102     protected void handlerMessage(MessageInfo message, WsMessageUserEntity userEntity) {
6fe1e6 103         MessageIfc messageIfc = null;
F 104         if (message != null && message.getMsgType() != null) {
55d506 105             switch (message.getMsgType()) {
c61217 106 //                case 6001:  //APP扫码登录
F 107 //                    // messageIfc=imMessageIfc;
108 //                    break;
109 //                case 6002:  //待办事宜(PC端右下角)
110 //                case 6003:  //系统级弹窗信息(要避免高密度重复弹出)
111 //                    messageIfc=imMessageIfc;
112 //                    break;
113 //                case 6004:  //收取服务费续费通知
114 //                    //messageIfc=imMessageIfc;
115 //                    break;
6fe1e6 116                 case 6005:  //检查系统是否停用,代替maintaince.do
F 117                 case 6014: //关闭系统通知
118                 case 6015: //启用系统通知
484907 119                 case 6018: //切换手机号删除会话
6fe1e6 120                     messageIfc = refreshDataSourceMessageIfc;
f959ba 121                     break;
74f517 122                 case 6006:   //小程序扫码核销(扫码了,等待按“核销”按钮)
6fe1e6 123                     messageIfc = heXiaoMessageIfc;
f959ba 124                     break;
ad9fb8 125                 case 6007:  //发送即时消息 im 
55d506 126                     messageIfc = imMessageIfc;
565fe8 127                 case 6008:  //API认证消息
F 128                     messageIfc = apiAuthMessageIfc;
f959ba 129                     break;
6990b5 130                 case 6009:  //刷新小程序端活动内容  MessageType.REFRESH_PANICBUYING
6fe1e6 131                     messageIfc = refreshPanicBuyingIfc;
F 132                     break;
74f517 133                 case 6010:   //小程序扫码核销(点了“核销”按钮)
6fe1e6 134                     messageIfc = heXiaoMessageIfc;
74f517 135                     break;
251add 136                 case 6011:   //农行支付结果通知回调
6fe1e6 137                     messageIfc = abcPayMessageIfc;
251add 138                     break;
9cd417 139                 case 6012:   //刷新任务作业
6fe1e6 140                     messageIfc = tasksMessageIfc;
9cd417 141                     break;
5fc8e3 142                 case 6013:   //刷新内存版本号
6fe1e6 143                     messageIfc = versionMessageIfc;
5fc8e3 144                     break;
0099c9 145                 case 6016:   //只能存在一个账号登录
F 146                 case 6017:   //会话失效
3c1697 147                     messageIfc = singleAccountMessageIfc;
F 148                     break;
f959ba 149                 default:
F 150             }
ad9fb8 151             if (messageIfc != null) {
6fe1e6 152                 messageIfc.handlerMessage(message, userEntity);
ad9fb8 153             }
f959ba 154         }
F 155     }
55d506 156
a6a76f 157     public RedisTemplate<String, String> getRedisTemplate() {
F 158         return redisTemplate;
159     }
160
161     public void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
162         this.redisTemplate = redisTemplate;
163     }
164 }