package com.yc.open.deli.schedule; import com.yc.entity.AttachmentConfig; import com.yc.entity.DataSourceEntity; import com.yc.factory.FactoryBean; import com.yc.multiData.MultiDataSource; import com.yc.multiData.SpObserver; import com.yc.open.deli.entity.*; import com.yc.open.deli.service.DeLiIfc; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Service; import java.text.SimpleDateFormat; import java.time.Duration; import java.time.Instant; import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.*; /** * spring加载完成后启动 *gsystem 表增加一个所属字段表示向哪一个第三方推送信息 * 定时把初测,复测,销售单推送到德立营销系统 */ @Service public class ProdutDataSchedule implements ApplicationListener { //private static ScheduledExecutorService mScheduledExecutorService; @Autowired ThreadPoolTaskExecutor threadPoolExecutor; @Autowired ThreadPoolTaskScheduler threadPoolTaskScheduler; //测量单队列 private static ArrayBlockingQueue followUpQueue; //销售单队列 private static ArrayBlockingQueue orderQueue; //派工单队列 private static ArrayBlockingQueue dispatchQueue; //采购单队列 private static ArrayBlockingQueue purchaseQueue; //测量单队列 private static ArrayBlockingQueue t640109Queue; //CPU线程数量 private static int THREADCOUNT=Runtime.getRuntime().availableProcessors(); @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { if (contextRefreshedEvent.getApplicationContext().getParent() == null) { return; } String isDeliRunSchedule = AttachmentConfig.get("isDeliRunSchedule");//指定在哪个服务端口运行(9001,9010) if ("1".equals(isDeliRunSchedule)) { //在集群下,会出现任务重复执行问题,暂时方法是只有一个实例运行,其他实例跳过 ///RedisTemplate redisTemplate = (RedisTemplate) FactoryBean.getBean("redisTemplate"); // Object object = redisTemplate.opsForValue().get("isRunSchedule"); // if (object == null) { //不存在加到redis里,过期时间设置为15分钟 // final int OVERTIME = 300000; // redisTemplate.opsForValue().set("isRunSchedule", 1, OVERTIME, TimeUnit.MILLISECONDS); followUpQueue = new ArrayBlockingQueue<>(5); orderQueue = new ArrayBlockingQueue<>(5); dispatchQueue = new ArrayBlockingQueue<>(5); purchaseQueue = new ArrayBlockingQueue<>(5); t640109Queue = new ArrayBlockingQueue<>(5); //定时执行 //mScheduledExecutorService = new ScheduledThreadPoolExecutor(THREADCOUNT * 2); //按分钟(TimeUnit.MINUTES)执行:程序启动后3分种开始执行,每隔5分钟重复调用一次 //threadPoolTaskScheduler.scheduleWithFixedDelay(new PushData(), 2, 10, TimeUnit.MINUTES); threadPoolTaskScheduler.scheduleWithFixedDelay(new PushData(), Instant.now().plusMillis(TimeUnit.MINUTES.toMillis(3)), Duration.ofMinutes(10)); // } } } private class PushData implements Runnable { @Override public void run() { System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format( new Date()) + "德立推送运行中..."); Map infoList = MultiDataSource.getDataSourceMaps(); for (Map.Entry entry : infoList.entrySet()) { DataSourceEntity dataSourceEntity = entry.getValue(); //德立系统 if ("deli".equalsIgnoreCase(dataSourceEntity.getDockingSystem()) && dataSourceEntity.getSystemAccessKey() != null //&& dataSourceEntity.getDbId() ==497 //本地测试用,正式时候要去掉 && !"".equalsIgnoreCase(dataSourceEntity.getSystemAccessKey())) { DeLiIfc deLiIfc = (DeLiIfc) FactoryBean.getBean("deLiImpl"); try { SpObserver.setDBtoInstance("_" + dataSourceEntity.getDbId()); //---------1,推送客户跟进记录--------- this.pushT170139(deLiIfc, dataSourceEntity); //---------2,推送销售单(销售单,客户资料)------ this.pushT120201(deLiIfc, dataSourceEntity); //---------3,推送采购单--------- this.pushT130301(deLiIfc, dataSourceEntity); //---------4,推送派工单--------- this.pushT640201(deLiIfc, dataSourceEntity); //---------5,推送已审核的测量单--------- this.pushT640109(deLiIfc, dataSourceEntity); } catch (Exception e) { e.printStackTrace(); } finally { SpObserver.setDBtoInstance(); } } } } /** * 推送派工单 * @param deLiIfc * @param dataSourceEntity */ private void pushT640201(DeLiIfc deLiIfc, DataSourceEntity dataSourceEntity){ List result = deLiIfc.getT640201List(); if(result!=null&&result.size()>0) { //执行生产,消费线程 threadPoolExecutor.execute(new ProductThread(result, dataSourceEntity.getDbId() + "", dispatchQueue,"t640201H")); threadPoolExecutor.execute(new T640201Consumer(result, dataSourceEntity.getDbId() + "", dispatchQueue,"t640201H","派工单")); } } /** * 推送销售订单,客户资料 * @param deLiIfc * @param dataSourceEntity */ private void pushT120201(DeLiIfc deLiIfc, DataSourceEntity dataSourceEntity){ List result = deLiIfc.getT120201List(); if(result!=null&&result.size()>0) { //执行生产,消费线程 threadPoolExecutor.execute(new ProductThread(result, dataSourceEntity.getDbId() + "", orderQueue,"t120201H")); threadPoolExecutor.execute(new T120201Consumer(result, dataSourceEntity.getDbId() + "", orderQueue,"t120201H","销售订单")); } } /** * 推送采购订单 * @param deLiIfc * @param dataSourceEntity */ private void pushT130301(DeLiIfc deLiIfc, DataSourceEntity dataSourceEntity){ List result = deLiIfc.getT130301List(); if(result!=null&&result.size()>0) { //执行生产,消费线程 threadPoolExecutor.execute(new ProductThread(result, dataSourceEntity.getDbId() + "",purchaseQueue ,"t130301H")); threadPoolExecutor.execute(new T130301Consumer(result, dataSourceEntity.getDbId() + "", purchaseQueue,"t130301H","采购订单")); } } /** * 推送客户跟进记录 * @param deLiIfc * @param dataSourceEntity */ private void pushT170139(DeLiIfc deLiIfc, DataSourceEntity dataSourceEntity){ List result = deLiIfc.getT170139List(); if(result!=null&&result.size()>0) { //执行生产,消费线程 threadPoolExecutor.execute(new ProductThread(result, dataSourceEntity.getDbId() + "", followUpQueue,"t170139H")); threadPoolExecutor.execute(new T170139Consumer(result, dataSourceEntity.getDbId() + "", followUpQueue,"t170139H","客户跟进记录")); } } /** * 推送测量单 * @param deLiIfc * @param dataSourceEntity */ private void pushT640109(DeLiIfc deLiIfc, DataSourceEntity dataSourceEntity){ List result = deLiIfc.getT640109(); if(result!=null&&result.size()>0) { //执行生产,消费线程 threadPoolExecutor.execute(new ProductThread(result, dataSourceEntity.getDbId() + "", t640109Queue,"t640109H")); threadPoolExecutor.execute(new T640109Consumer(result, dataSourceEntity.getDbId() + "", t640109Queue,"t640109H","测量单")); } } } }