fs-danaus
2024-03-16 a2ecbf17d6ba1c7b135fe10bb4cdfefa05b75add
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package com.yc.ionic.schedule;
 
 
import cn.jiguang.common.resp.APIConnectionException;
import cn.jiguang.common.resp.APIRequestException;
import com.yc.api.schedule.ScheduleUtils;
import com.yc.api.utils.PushMessageType;
import com.yc.entity.AttachmentConfig;
import com.yc.entity.DataSourceEntity;
import com.yc.factory.FactoryBean;
import com.yc.ionic.service.AppIfc;
import com.yc.jpush.JpushAction;
import com.yc.multiData.MultiDataSource;
import com.yc.multiData.SpObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Service;
 
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
 *读取待办事宜,通过极光推送到手机端
 * 只取未读且未推送过的信息
 * */
@Service
public class JPushTimer implements ApplicationListener<ContextRefreshedEvent> {
    final Logger log = LoggerFactory.getLogger(this.getClass());
 
   private static JpushAction jpushAction =JpushAction.getInstance();
 
    /**
     * 处理极光推送信息
     */
    private  class ProductThread implements Runnable {
        ArrayBlockingQueue<JpushMsg> queue;  //队列,保存需要推送的消息
 
        public ProductThread(ArrayBlockingQueue<JpushMsg> queue) {
            this.queue = queue;
        }
 
        @Override
        public void run() {
 
                    while (!queue.isEmpty()) {
                        JpushMsg msg = null;
                        try {
                            msg = queue.take();
                            log.info(Thread.currentThread().getName()+"...进行处理[数据源:"+msg.getDbid()+"]的推送信息");
 
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        try {
                        boolean flg = jpushAction.sendPush(
                                JpushAction.buildPushObject_with_extra(
                                        msg.getAlais(),"["+msg.getFormname()+"/"+msg.getDoccode()+"]"+ msg.getTopic(), msg.getExtras())
                        );
                        //更新状态为已推送,下次就不需要再推送
                        if (flg) {
                            try {
                                SpObserver.setDBtoInstance("_" + msg.getDbid());
                                AppIfc jpush = (AppIfc) FactoryBean.getBean("ionicApp");
                                int count = jpush.updatePush(msg.getUnid());
                                if (count < 1) log.info("待办事宜-" + msg.getUnid() + "-更新isPush出错");
                            } finally {
                                SpObserver.setDBtoInstance();
                            }
                        } else {
                            System.out.println(">>>>........等待返回");
                        }
                        } catch (APIConnectionException e) {
                            // Connection error, should retry later
                            System.out.println("Connection error, should retry later" + e.getMessage());
 
                        } catch (APIRequestException e) {
                            // Should review the error, and fix the request
                            System.out.println("dbid: " + msg.getDbid());
                            System.out.println("HTTP Status: " + e.getStatus());
                            System.out.println("Error Code: " + e.getErrorCode());
                            System.out.println("Error Message: " + e.getErrorMessage());
 
                        }  catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
 
 
        }
    }
    private  class PushData implements Runnable {
        @Override
        public void run() {
            //取得所有数据源的待办事宜,加到队列
            AppIfc jpush = (AppIfc) FactoryBean.getBean("ionicApp");
            Map<String, DataSourceEntity> infoList = MultiDataSource.getDataSourceMaps();
            ThreadPoolTaskExecutor threadPoolExecutor= (ThreadPoolTaskExecutor) FactoryBean.getBean("threadPoolExecutor");
            for (Map.Entry<String, DataSourceEntity> entry : infoList.entrySet()) {
 
                DataSourceEntity dataSourceEntity = entry.getValue();
                if(!dataSourceEntity.isUseAPP()) continue;//未启用APP功能,所以推送功能也不需要
                if(ScheduleUtils.isOnbusPlatform(dataSourceEntity)) continue;
                try {
                    log.info("极光...."+dataSourceEntity.getDbId());
                    List<Map<String, Object>> list = null;
                    try {
                        SpObserver.setDBtoInstance("_" + dataSourceEntity.getDbId());
                        list = jpush.getMessage();
                    } finally {
                        SpObserver.setDBtoInstance();
                    }
                    //调用极光输出
                    if (list != null&&list.size()>0) {
                        log.info(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format( new Date())+">>>>" + dataSourceEntity.getDbId() + "-" + list.size());
                        ArrayBlockingQueue<JpushMsg> queue = new ArrayBlockingQueue<>(list.size());
                        for (Map<String, Object> map : list) {
                            String formid = map.get("formid") + "";
                            String formtype = map.get("formtype") + "";
                            String doccode = (String) map.get("doccode");
                            String topic = (String) map.get("topic");
                            String UNID = (String) map.get("UNID");
                            String telephone = (String) map.get("telephone");
                            Object msgCount = map.get("msgCount");
                            String usercode = (String) map.get("usercode");
                            String companyMemo = (String) map.get("companyMemo");
                            Map<String, String> extras = new HashMap<>();
                            extras.put("formid", formid);
                            extras.put("formtype", formtype);
                            extras.put("where", "doccode='" + doccode + "'");
                            extras.put("UNID", UNID);
                            extras.put("msgCount", msgCount + "");
                            extras.put("dbid", dataSourceEntity.getDbId() + "");
                            extras.put("userCode", usercode);
                            extras.put("tel", telephone);
                            extras.put("actionType", PushMessageType.TODO);//待办事宜类型
                            extras.put("title", companyMemo);//公司名称
                            List<String> alais = new ArrayList<String>();
                            alais.add(telephone);
                            alais.add(dataSourceEntity.getDbId() + "_" + usercode);//TODO 兼容处理,app所有版本都更新后,再移除
                            queue.add(new JpushMsg(alais, topic, extras, dataSourceEntity.getDbId(), UNID, map.get("formname") + "", doccode));
                        }
 
                        //放在线程池里执行一个系统的推送
                        threadPoolExecutor.execute(new ProductThread(queue));
 
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        if (contextRefreshedEvent.getApplicationContext().getParent() == null) {
            return;
        }
            String isJpushRunSchedule = AttachmentConfig.get("isJpushRunSchedule");//指定在哪个服务端口运行(9001,9010)
            //System.out.println(">>>>>>>isJpushRunSchedule="+object);
            if ("1".equals(isJpushRunSchedule)) {
                System.out.println(new Date().toLocaleString() + ">>极光推送运行中...");
                log.info(new Date().toLocaleString() + "--极光推送运行中...");
                //定时执行
                ThreadPoolTaskScheduler threadPoolTaskScheduler = (ThreadPoolTaskScheduler) FactoryBean.getBean("threadPoolTaskScheduler");
                threadPoolTaskScheduler.scheduleWithFixedDelay(new PushData(), Instant.now().plusMillis(TimeUnit.MINUTES.toMillis(3)), Duration.ofMinutes(3));
            }
    }
  }