fs-danaus
2021-04-14 f4e40dee92d6e025176637fafb5621c9ba08739f
提交 | 用户 | age
a6a76f 1 package com.yc.open.deli.schedule;
F 2
3 import com.yc.entity.AttachmentConfig;
4 import com.yc.entity.DataSourceEntity;
5 import com.yc.factory.FactoryBean;
6 import com.yc.multiData.MultiDataSource;
7 import com.yc.multiData.SpObserver;
8 import com.yc.open.deli.entity.*;
9 import com.yc.open.deli.service.DeLiIfc;
f4e40d 10 import org.springframework.beans.factory.annotation.Autowired;
a6a76f 11 import org.springframework.context.ApplicationListener;
F 12 import org.springframework.context.event.ContextRefreshedEvent;
f4e40d 13 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
F 14 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
a6a76f 15 import org.springframework.stereotype.Service;
F 16
17 import java.text.SimpleDateFormat;
f4e40d 18 import java.time.Duration;
F 19 import java.time.Instant;
a6a76f 20 import java.util.Date;
F 21 import java.util.List;
22 import java.util.Map;
23 import java.util.concurrent.*;
24
25 /**
26  * spring加载完成后启动
27  *gsystem 表增加一个所属字段表示向哪一个第三方推送信息
28  * 定时把初测,复测,销售单推送到德立营销系统
29  */
30 @Service
31 public class ProdutDataSchedule implements ApplicationListener<ContextRefreshedEvent> {
32
f4e40d 33     //private static ScheduledExecutorService mScheduledExecutorService;
F 34     @Autowired
35     ThreadPoolTaskExecutor threadPoolExecutor;
36     @Autowired
37     ThreadPoolTaskScheduler threadPoolTaskScheduler;
a6a76f 38     //测量单队列
F 39     private  static ArrayBlockingQueue<String> followUpQueue;
40     //销售单队列
41     private  static ArrayBlockingQueue<String> orderQueue;
42     //派工单队列
43     private  static ArrayBlockingQueue<String> dispatchQueue;
44     //采购单队列
45     private  static ArrayBlockingQueue<String> purchaseQueue;
46     //测量单队列
47     private  static ArrayBlockingQueue<String> t640109Queue;
48
49     //CPU线程数量
50     private static  int THREADCOUNT=Runtime.getRuntime().availableProcessors();
51     @Override
52     public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
53         if (contextRefreshedEvent.getApplicationContext().getParent() == null) {
54             return;
55         }
56         String isDeliRunSchedule = AttachmentConfig.get("isDeliRunSchedule");//指定在哪个服务端口运行(9001,9010)
57
58         if ("1".equals(isDeliRunSchedule)) {
59             //在集群下,会出现任务重复执行问题,暂时方法是只有一个实例运行,其他实例跳过
60             ///RedisTemplate redisTemplate = (RedisTemplate) FactoryBean.getBean("redisTemplate");
61             // Object object = redisTemplate.opsForValue().get("isRunSchedule");
62             // if (object == null) {
63             //不存在加到redis里,过期时间设置为15分钟
64             // final int OVERTIME = 300000;
65             // redisTemplate.opsForValue().set("isRunSchedule", 1, OVERTIME, TimeUnit.MILLISECONDS);
66
67             followUpQueue = new ArrayBlockingQueue<>(5);
68             orderQueue = new ArrayBlockingQueue<>(5);
69             dispatchQueue = new ArrayBlockingQueue<>(5);
70             purchaseQueue = new ArrayBlockingQueue<>(5);
71             t640109Queue = new ArrayBlockingQueue<>(5);
72             //定时执行
f4e40d 73             //mScheduledExecutorService = new ScheduledThreadPoolExecutor(THREADCOUNT * 2);
a6a76f 74             //按分钟(TimeUnit.MINUTES)执行:程序启动后3分种开始执行,每隔5分钟重复调用一次
f4e40d 75             //threadPoolTaskScheduler.scheduleWithFixedDelay(new PushData(), 2, 10, TimeUnit.MINUTES);
F 76             threadPoolTaskScheduler.scheduleWithFixedDelay(new PushData(), Instant.now().plusMillis(TimeUnit.MINUTES.toMillis(3)), Duration.ofMinutes(10));
77
a6a76f 78             // }
F 79         }
80
81     }
82
83     private  class PushData implements Runnable {
84
85         @Override
86         public void run() {
87             System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format( new Date()) + "德立推送运行中...");
88             Map<String, DataSourceEntity> infoList = MultiDataSource.getDataSourceMaps();
89             for (Map.Entry<String, DataSourceEntity> entry : infoList.entrySet()) {
90                 DataSourceEntity dataSourceEntity = entry.getValue();
91                 //德立系统
92                 if ("deli".equalsIgnoreCase(dataSourceEntity.getDockingSystem())
93                         && dataSourceEntity.getSystemAccessKey() != null
94                         //&& dataSourceEntity.getDbId() ==497 //本地测试用,正式时候要去掉
95                         && !"".equalsIgnoreCase(dataSourceEntity.getSystemAccessKey())) {
96                     DeLiIfc deLiIfc = (DeLiIfc) FactoryBean.getBean("deLiImpl");
97                     try {
98                         SpObserver.setDBtoInstance("_" + dataSourceEntity.getDbId());
99                         //---------1,推送客户跟进记录---------
100                         this.pushT170139(deLiIfc, dataSourceEntity);
101                         //---------2,推送销售单(销售单,客户资料)------
102                         this.pushT120201(deLiIfc, dataSourceEntity);
103                         //---------3,推送采购单---------
104                         this.pushT130301(deLiIfc, dataSourceEntity);
105                         //---------4,推送派工单---------
106                         this.pushT640201(deLiIfc, dataSourceEntity);
107                         //---------5,推送已审核的测量单---------
108                         this.pushT640109(deLiIfc, dataSourceEntity);
109                     } catch (Exception e) {
110                         e.printStackTrace();
111                     } finally {
112                         SpObserver.setDBtoInstance();
113                     }
114                 }
115             }
116         }
117         /**
118          * 推送派工单
119          * @param deLiIfc
120          * @param dataSourceEntity
121          */
122         private void pushT640201(DeLiIfc deLiIfc, DataSourceEntity dataSourceEntity){
123             List<T640201Entity>  result = deLiIfc.getT640201List();
124             if(result!=null&&result.size()>0) {
f4e40d 125
a6a76f 126                 //执行生产,消费线程
f4e40d 127                 threadPoolExecutor.execute(new ProductThread(result, dataSourceEntity.getDbId() + "", dispatchQueue,"t640201H"));
F 128                 threadPoolExecutor.execute(new T640201Consumer(result, dataSourceEntity.getDbId() + "", dispatchQueue,"t640201H","派工单"));
a6a76f 129             }
F 130         }
131         /**
132          * 推送销售订单,客户资料
133          * @param deLiIfc
134          * @param dataSourceEntity
135          */
136         private void pushT120201(DeLiIfc deLiIfc, DataSourceEntity dataSourceEntity){
137
138             List<T120201Entity>  result = deLiIfc.getT120201List();
139             if(result!=null&&result.size()>0) {
f4e40d 140
a6a76f 141                 //执行生产,消费线程
f4e40d 142                 threadPoolExecutor.execute(new ProductThread(result, dataSourceEntity.getDbId() + "", orderQueue,"t120201H"));
F 143                 threadPoolExecutor.execute(new T120201Consumer(result, dataSourceEntity.getDbId() + "", orderQueue,"t120201H","销售订单"));
a6a76f 144             }
F 145         }
146         /**
147          * 推送采购订单
148          * @param deLiIfc
149          * @param dataSourceEntity
150          */
151         private void pushT130301(DeLiIfc deLiIfc, DataSourceEntity dataSourceEntity){
152             List<T130301Entity>  result = deLiIfc.getT130301List();
153             if(result!=null&&result.size()>0) {
154                 //执行生产,消费线程
f4e40d 155                 threadPoolExecutor.execute(new ProductThread(result, dataSourceEntity.getDbId() + "",purchaseQueue ,"t130301H"));
F 156                 threadPoolExecutor.execute(new T130301Consumer(result, dataSourceEntity.getDbId() + "", purchaseQueue,"t130301H","采购订单"));
a6a76f 157             }
F 158         }
159         /**
160          * 推送客户跟进记录
161          * @param deLiIfc
162          * @param dataSourceEntity
163          */
164         private void pushT170139(DeLiIfc deLiIfc, DataSourceEntity dataSourceEntity){
165             List<T170139Entity>  result = deLiIfc.getT170139List();
166             if(result!=null&&result.size()>0) {
167                 //执行生产,消费线程
f4e40d 168                 threadPoolExecutor.execute(new ProductThread(result, dataSourceEntity.getDbId() + "", followUpQueue,"t170139H"));
F 169                 threadPoolExecutor.execute(new T170139Consumer(result, dataSourceEntity.getDbId() + "", followUpQueue,"t170139H","客户跟进记录"));
a6a76f 170             }
F 171
172         }
173         /**
174          * 推送测量单
175          * @param deLiIfc
176          * @param dataSourceEntity
177          */
178         private void pushT640109(DeLiIfc deLiIfc, DataSourceEntity dataSourceEntity){
179             List<T640109Entity>  result = deLiIfc.getT640109();
180             if(result!=null&&result.size()>0) {
181                 //执行生产,消费线程
f4e40d 182                 threadPoolExecutor.execute(new ProductThread(result, dataSourceEntity.getDbId() + "", t640109Queue,"t640109H"));
F 183                 threadPoolExecutor.execute(new T640109Consumer(result, dataSourceEntity.getDbId() + "", t640109Queue,"t640109H","测量单"));
a6a76f 184             }
F 185
186         }
187
188     }
189
190
191 }