xinyb
2024-09-10 513b8e930844d01da9f74289771de1c1e1e15808
提交 | 用户 | age
05aec6 1 package com.yc.crm.schedule;
F 2
3 import com.yc.action.grid.GridUtils;
4 import com.yc.entity.DataSourceEntity;
5 import com.yc.exception.ApplicationException;
6 import com.yc.factory.FactoryBean;
7 import com.yc.multiData.SpObserver;
8 import com.yc.open.init.shcedule.BaseSchedule;
9 import com.yc.open.init.shcedule.MessageTipsEntity;
10 import com.yc.sdk.WebSocketMessage.action.WebSocketMessageServer;
11 import com.yc.sdk.WebSocketMessage.entity.MessageInfo;
12 import com.yc.sdk.WebSocketMessage.entity.MessageType;
13 import com.yc.sdk.WebSocketMessage.entity.WsMessageUserEntity;
14 import com.yc.service.impl.BaseDoIfc;
15
16 import java.util.List;
17 import java.util.StringJoiner;
18 import java.util.regex.Matcher;
19 import java.util.regex.Pattern;
20 import java.util.stream.Collectors;
21
22 /**
23  * 日程提醒
24  * 系统加载所有未推送日程,符合条件则推送日程
25  *
26  */
27 public class MessagePopSchedule extends BaseSchedule implements Runnable {
28
29     public MessagePopSchedule(DataSourceEntity dataSourceEntity) {
30         super(dataSourceEntity);
31     }
32
33     @Override
34     public void run() {
35         try {
36             if (Thread.interrupted()) {
37                 throw new InterruptedException();
38             }
39             //log.info(dataSourceEntity.getSystemID() + "右下角弹窗开始.....");
40             SpObserver.setDBtoInstance("_" + dataSourceEntity.getDbId());
41             BaseDoIfc doIfc = (BaseDoIfc) FactoryBean.getBean("baseDoImpl");
42             //BaseService baseService=(BaseService)FactoryBean.getBean("BaseService");
43             //获取当前已连接webscoket的用户
44             final List<WsMessageUserEntity> onlineUser = WebSocketMessageServer.getOnlineUser(dataSourceEntity.getDbId(), null);
45             StringJoiner joiner = new StringJoiner(",");
46             if (onlineUser != null && onlineUser.size() > 0){
47                 for(WsMessageUserEntity entity:onlineUser){
48                     joiner.add(entity.getUserCode());
49                 }
50                 //因为用户已下线,为了避免发送失败导致用户下次登录收不到通知的情况,取数及更新次数分开处理
51                 final List<MessageTipsEntity> list = doIfc.doQuery("set nocount on ; declare @UserCodes varchar(max) =" +GridUtils.prossSqlParm(joiner.toString())+
52                         "\n select a.messid,a.messagetxt,a.unvaliddate,a.createuser,a.createtime,\n" +
53                         " a.rejustsrvflag,a.Readers,a.UsrReaded,a.tipcount,a.formid,\n" +
54                         " a.formtype,a.origfields,a.linkfields,a.linkmode,a.self_datafields,\n" +
55                         " a.link_datafields,a.efilter , b.usercode ,a.url ,\n" +
56                         " a.createusername ,a.topic ,a.messagetype ,a.isPublicUser,"+dataSourceEntity.getDbId()+"as dbid\n" +
57                         "    from _sysMessageCount b  \n" +
58                         "   join _sysmessage a on a.messid = b.messid\n" +
59                         "       where b.usercode in (select list from getinstr(@UserCodes))  \n" +
60                         "       and b.tipcount < a.tipcount\n" +
61                         "       and a.unvaliddate >= getdate() \n" +
62                         "       and b.isRead = 0 ", MessageTipsEntity.class);
63             if (list != null && list.size() > 0) {
64                 final StringJoiner updateJoiner=new StringJoiner("\n");
65                 list.stream().unordered().distinct().forEach(x -> {
66                     //取每一个用户的所有消息
67                     String userCode = x.getUsercode();
68                     //--分二种消息类型,1右下角弹窗,2系统级消息显示
69                     final List<MessageTipsEntity> collect = list.stream().filter(y -> y.getUsercode().equalsIgnoreCase(x.getUsercode())).collect(Collectors.toList());
70                     if (collect != null && collect.size() > 0) {
71                         //----发送出去
72                         //---通知webscoket
73                         MessageInfo messageInfo = new MessageInfo();
74                         messageInfo.setDbId(dataSourceEntity.getDbId());
75                         messageInfo.setMsgType(MessageType.NOTICE_AND_TODO);
76                         messageInfo.setUserFromType("1");//TODO PC端
77                         messageInfo.setUserCode(userCode);
78                         messageInfo.setMsg(GridUtils.toJson(collect));
79                         //直接发送
80                         List<WsMessageUserEntity> wsMessageUserEntityList = WebSocketMessageServer.getOnlineUserByTips(dataSourceEntity.getDbId(), userCode);
81                         if (wsMessageUserEntityList != null && wsMessageUserEntityList.size() != 0) {
82                             for (WsMessageUserEntity wsMessageUserEntity : wsMessageUserEntityList) {
83                               boolean flag= wsMessageUserEntity.sendMessageV3(messageInfo);
84                               if(flag){
85                                   //成功才更新次数
86                                   collect.stream().forEach(z->{
87                                       updateJoiner.add(" if not exists(select 1 from @table where MessId = "+z.getMessid()+" and UserCode = "+GridUtils.prossSqlParm(userCode)+")\n" +
88                                               " begin\n" +
89                                               " insert into @table(MessId,UserCode) values ( "+z.getMessid()+","+GridUtils.prossSqlParm(userCode)+")\n" +
90                                               " end\n");
91                                   });
92                               }
93                             }
94                         }
95
96                     }
97                 });
98                 //---更新次数
99                 if(updateJoiner.length()>0) {
100                     String sql = "set nocount on \n declare @table table(MessId int,UserCode varchar(50), Primary Key(MessId,UserCode))\n" +
101                             updateJoiner.toString() +
102                             " \n update a  set tipcount = isnull(tipcount,0) + 1,LastPushTime=getDate()  \n" +
103                             "  from _sysMessageCount a \n" +
104                             "where exists ( select 1 from @table b where a.MessId = b.MessId and a.UserCode = b.UserCode and (a.LastPushTime is null or datediff(second,a.LastPushTime,getdate()) > 45 )) ";
105                     doIfc.doExecute(sql);
106                     //log.info(dataSourceEntity.getSystemID() + "右下角弹窗完成:"+sql);
107                 }
108             }
109         }
110         } catch (InterruptedException ex) {
111             log.info(dataSourceEntity.getSystemID() + "右下角弹窗任务已被终止");
112         }  catch (Exception ex) {
113             ex.printStackTrace();
114             log.error(dataSourceEntity.getSystemDescribe() + ":" + ex.getMessage());
115         }finally {
116         SpObserver.setDBtoInstance();
117     }
118     }
119     public String replaceBlank(String str) {
120         if (str == null || str == "") { return ""; }
121         Matcher m = null;
122         try {
123             Pattern p = Pattern.compile("\t|\r|\n");
124             m = p.matcher(str);
125         } catch (Exception e) {
126             e.printStackTrace();
127             throw new ApplicationException(str + "-解析出错,存在有特殊字符");
128         }
129         return m.replaceAll(" ").replaceAll("\\$", "\\\\\\$");
130     }
131     public static String toString(Object obj){
132         if(obj==null)return "";
133         return obj.toString();
134     }
135 }