提交 | 用户 | age
|
a6a76f
|
1 |
package com.yc.open.deli.schedule; |
F |
2 |
|
|
3 |
import com.yc.entity.AttachmentConfig; |
|
4 |
import com.yc.entity.DataSourceEntity; |
|
5 |
import com.yc.factory.FactoryBean; |
|
6 |
import com.yc.multiData.MultiDataSource; |
|
7 |
import com.yc.multiData.SpObserver; |
|
8 |
import com.yc.open.deli.entity.*; |
|
9 |
import com.yc.open.deli.service.DeLiIfc; |
f4e40d
|
10 |
import org.springframework.beans.factory.annotation.Autowired; |
a6a76f
|
11 |
import org.springframework.context.ApplicationListener; |
F |
12 |
import org.springframework.context.event.ContextRefreshedEvent; |
f4e40d
|
13 |
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; |
F |
14 |
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; |
a6a76f
|
15 |
import org.springframework.stereotype.Service; |
F |
16 |
|
|
17 |
import java.text.SimpleDateFormat; |
f4e40d
|
18 |
import java.time.Duration; |
F |
19 |
import java.time.Instant; |
a6a76f
|
20 |
import java.util.Date; |
F |
21 |
import java.util.List; |
|
22 |
import java.util.Map; |
|
23 |
import java.util.concurrent.*; |
|
24 |
|
|
25 |
/** |
|
26 |
* spring加载完成后启动 |
|
27 |
*gsystem 表增加一个所属字段表示向哪一个第三方推送信息 |
|
28 |
* 定时把初测,复测,销售单推送到德立营销系统 |
|
29 |
*/ |
|
30 |
@Service |
|
31 |
public class ProdutDataSchedule implements ApplicationListener<ContextRefreshedEvent> { |
|
32 |
|
f4e40d
|
33 |
//private static ScheduledExecutorService mScheduledExecutorService; |
F |
34 |
@Autowired |
|
35 |
ThreadPoolTaskExecutor threadPoolExecutor; |
|
36 |
@Autowired |
|
37 |
ThreadPoolTaskScheduler threadPoolTaskScheduler; |
a6a76f
|
38 |
//测量单队列 |
F |
39 |
private static ArrayBlockingQueue<String> followUpQueue; |
|
40 |
//销售单队列 |
|
41 |
private static ArrayBlockingQueue<String> orderQueue; |
|
42 |
//派工单队列 |
|
43 |
private static ArrayBlockingQueue<String> dispatchQueue; |
|
44 |
//采购单队列 |
|
45 |
private static ArrayBlockingQueue<String> purchaseQueue; |
|
46 |
//测量单队列 |
|
47 |
private static ArrayBlockingQueue<String> t640109Queue; |
|
48 |
|
|
49 |
//CPU线程数量 |
|
50 |
private static int THREADCOUNT=Runtime.getRuntime().availableProcessors(); |
|
51 |
@Override |
|
52 |
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { |
|
53 |
if (contextRefreshedEvent.getApplicationContext().getParent() == null) { |
|
54 |
return; |
|
55 |
} |
|
56 |
String isDeliRunSchedule = AttachmentConfig.get("isDeliRunSchedule");//指定在哪个服务端口运行(9001,9010) |
|
57 |
|
|
58 |
if ("1".equals(isDeliRunSchedule)) { |
|
59 |
//在集群下,会出现任务重复执行问题,暂时方法是只有一个实例运行,其他实例跳过 |
|
60 |
///RedisTemplate redisTemplate = (RedisTemplate) FactoryBean.getBean("redisTemplate"); |
|
61 |
// Object object = redisTemplate.opsForValue().get("isRunSchedule"); |
|
62 |
// if (object == null) { |
|
63 |
//不存在加到redis里,过期时间设置为15分钟 |
|
64 |
// final int OVERTIME = 300000; |
|
65 |
// redisTemplate.opsForValue().set("isRunSchedule", 1, OVERTIME, TimeUnit.MILLISECONDS); |
|
66 |
|
|
67 |
followUpQueue = new ArrayBlockingQueue<>(5); |
|
68 |
orderQueue = new ArrayBlockingQueue<>(5); |
|
69 |
dispatchQueue = new ArrayBlockingQueue<>(5); |
|
70 |
purchaseQueue = new ArrayBlockingQueue<>(5); |
|
71 |
t640109Queue = new ArrayBlockingQueue<>(5); |
|
72 |
//定时执行 |
f4e40d
|
73 |
//mScheduledExecutorService = new ScheduledThreadPoolExecutor(THREADCOUNT * 2); |
a6a76f
|
74 |
//按分钟(TimeUnit.MINUTES)执行:程序启动后3分种开始执行,每隔5分钟重复调用一次 |
f4e40d
|
75 |
//threadPoolTaskScheduler.scheduleWithFixedDelay(new PushData(), 2, 10, TimeUnit.MINUTES); |
F |
76 |
threadPoolTaskScheduler.scheduleWithFixedDelay(new PushData(), Instant.now().plusMillis(TimeUnit.MINUTES.toMillis(3)), Duration.ofMinutes(10)); |
|
77 |
|
a6a76f
|
78 |
// } |
F |
79 |
} |
|
80 |
|
|
81 |
} |
|
82 |
|
|
83 |
private class PushData implements Runnable { |
|
84 |
|
|
85 |
@Override |
|
86 |
public void run() { |
|
87 |
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format( new Date()) + "德立推送运行中..."); |
|
88 |
Map<String, DataSourceEntity> infoList = MultiDataSource.getDataSourceMaps(); |
|
89 |
for (Map.Entry<String, DataSourceEntity> entry : infoList.entrySet()) { |
|
90 |
DataSourceEntity dataSourceEntity = entry.getValue(); |
|
91 |
//德立系统 |
|
92 |
if ("deli".equalsIgnoreCase(dataSourceEntity.getDockingSystem()) |
|
93 |
&& dataSourceEntity.getSystemAccessKey() != null |
|
94 |
//&& dataSourceEntity.getDbId() ==497 //本地测试用,正式时候要去掉 |
|
95 |
&& !"".equalsIgnoreCase(dataSourceEntity.getSystemAccessKey())) { |
|
96 |
DeLiIfc deLiIfc = (DeLiIfc) FactoryBean.getBean("deLiImpl"); |
|
97 |
try { |
|
98 |
SpObserver.setDBtoInstance("_" + dataSourceEntity.getDbId()); |
|
99 |
//---------1,推送客户跟进记录--------- |
|
100 |
this.pushT170139(deLiIfc, dataSourceEntity); |
|
101 |
//---------2,推送销售单(销售单,客户资料)------ |
|
102 |
this.pushT120201(deLiIfc, dataSourceEntity); |
|
103 |
//---------3,推送采购单--------- |
|
104 |
this.pushT130301(deLiIfc, dataSourceEntity); |
|
105 |
//---------4,推送派工单--------- |
|
106 |
this.pushT640201(deLiIfc, dataSourceEntity); |
|
107 |
//---------5,推送已审核的测量单--------- |
|
108 |
this.pushT640109(deLiIfc, dataSourceEntity); |
|
109 |
} catch (Exception e) { |
|
110 |
e.printStackTrace(); |
|
111 |
} finally { |
|
112 |
SpObserver.setDBtoInstance(); |
|
113 |
} |
|
114 |
} |
|
115 |
} |
|
116 |
} |
|
117 |
/** |
|
118 |
* 推送派工单 |
|
119 |
* @param deLiIfc |
|
120 |
* @param dataSourceEntity |
|
121 |
*/ |
|
122 |
private void pushT640201(DeLiIfc deLiIfc, DataSourceEntity dataSourceEntity){ |
|
123 |
List<T640201Entity> result = deLiIfc.getT640201List(); |
|
124 |
if(result!=null&&result.size()>0) { |
f4e40d
|
125 |
|
a6a76f
|
126 |
//执行生产,消费线程 |
f4e40d
|
127 |
threadPoolExecutor.execute(new ProductThread(result, dataSourceEntity.getDbId() + "", dispatchQueue,"t640201H")); |
F |
128 |
threadPoolExecutor.execute(new T640201Consumer(result, dataSourceEntity.getDbId() + "", dispatchQueue,"t640201H","派工单")); |
a6a76f
|
129 |
} |
F |
130 |
} |
|
131 |
/** |
|
132 |
* 推送销售订单,客户资料 |
|
133 |
* @param deLiIfc |
|
134 |
* @param dataSourceEntity |
|
135 |
*/ |
|
136 |
private void pushT120201(DeLiIfc deLiIfc, DataSourceEntity dataSourceEntity){ |
|
137 |
|
|
138 |
List<T120201Entity> result = deLiIfc.getT120201List(); |
|
139 |
if(result!=null&&result.size()>0) { |
f4e40d
|
140 |
|
a6a76f
|
141 |
//执行生产,消费线程 |
f4e40d
|
142 |
threadPoolExecutor.execute(new ProductThread(result, dataSourceEntity.getDbId() + "", orderQueue,"t120201H")); |
F |
143 |
threadPoolExecutor.execute(new T120201Consumer(result, dataSourceEntity.getDbId() + "", orderQueue,"t120201H","销售订单")); |
a6a76f
|
144 |
} |
F |
145 |
} |
|
146 |
/** |
|
147 |
* 推送采购订单 |
|
148 |
* @param deLiIfc |
|
149 |
* @param dataSourceEntity |
|
150 |
*/ |
|
151 |
private void pushT130301(DeLiIfc deLiIfc, DataSourceEntity dataSourceEntity){ |
|
152 |
List<T130301Entity> result = deLiIfc.getT130301List(); |
|
153 |
if(result!=null&&result.size()>0) { |
|
154 |
//执行生产,消费线程 |
f4e40d
|
155 |
threadPoolExecutor.execute(new ProductThread(result, dataSourceEntity.getDbId() + "",purchaseQueue ,"t130301H")); |
F |
156 |
threadPoolExecutor.execute(new T130301Consumer(result, dataSourceEntity.getDbId() + "", purchaseQueue,"t130301H","采购订单")); |
a6a76f
|
157 |
} |
F |
158 |
} |
|
159 |
/** |
|
160 |
* 推送客户跟进记录 |
|
161 |
* @param deLiIfc |
|
162 |
* @param dataSourceEntity |
|
163 |
*/ |
|
164 |
private void pushT170139(DeLiIfc deLiIfc, DataSourceEntity dataSourceEntity){ |
|
165 |
List<T170139Entity> result = deLiIfc.getT170139List(); |
|
166 |
if(result!=null&&result.size()>0) { |
|
167 |
//执行生产,消费线程 |
f4e40d
|
168 |
threadPoolExecutor.execute(new ProductThread(result, dataSourceEntity.getDbId() + "", followUpQueue,"t170139H")); |
F |
169 |
threadPoolExecutor.execute(new T170139Consumer(result, dataSourceEntity.getDbId() + "", followUpQueue,"t170139H","客户跟进记录")); |
a6a76f
|
170 |
} |
F |
171 |
|
|
172 |
} |
|
173 |
/** |
|
174 |
* 推送测量单 |
|
175 |
* @param deLiIfc |
|
176 |
* @param dataSourceEntity |
|
177 |
*/ |
|
178 |
private void pushT640109(DeLiIfc deLiIfc, DataSourceEntity dataSourceEntity){ |
|
179 |
List<T640109Entity> result = deLiIfc.getT640109(); |
|
180 |
if(result!=null&&result.size()>0) { |
|
181 |
//执行生产,消费线程 |
f4e40d
|
182 |
threadPoolExecutor.execute(new ProductThread(result, dataSourceEntity.getDbId() + "", t640109Queue,"t640109H")); |
F |
183 |
threadPoolExecutor.execute(new T640109Consumer(result, dataSourceEntity.getDbId() + "", t640109Queue,"t640109H","测量单")); |
a6a76f
|
184 |
} |
F |
185 |
|
|
186 |
} |
|
187 |
|
|
188 |
} |
|
189 |
|
|
190 |
|
|
191 |
} |