package com.yc.open.init; import com.yc.MaintenanceFee.schedule.MaintainPayMsgSchedule; import com.yc.api.bean.attendance.T180212; import com.yc.api.schedule.ScheduleUtils; import com.yc.entity.AttachmentConfig; import com.yc.entity.DataSourceEntity; import com.yc.exception.ApplicationException; import com.yc.factory.FactoryBean; import com.yc.multiData.MultiDataSource; import com.yc.multiData.SpObserver; import com.yc.open.init.shcedule.*; import com.yc.open.init.shcedule.log.DeleteSystemLogSchedule; import com.yc.open.init.shcedule.log.RsyncAppCatalogSchedule; import com.yc.open.lujn.controller.KingDeeSchedule; import com.yc.open.mutual.schedule.GateEntity; import com.yc.open.mutual.schedule.Pull120201Schedule; import com.yc.open.mutual.schedule.Pull140902Schedule; import com.yc.open.mutual.schedule.Push110503Schedule; import com.yc.open.service.SystemTaskIfc; import com.yc.open.ufida.schedule.MatCodeDataSchedule; import com.yc.open.ufida.schedule.OrderDataSchedule; import com.yc.open.ufida.schedule.StockDataSchedule; import com.yc.sdk.shopping.action.jiazhuang.RefreshGoodsInLiveThread; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.support.CronTrigger; import org.springframework.stereotype.Service; import java.time.LocalTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.stream.Collectors; /** * 管理系统所有任务(定时,推送)的增,删,改,停止,启动 */ @Service public class InitSystemTaks implements ApplicationListener { protected Logger log = LoggerFactory.getLogger(this.getClass()); @Autowired SystemTaskIfc systemTaskIfc; @Autowired private ThreadPoolTaskScheduler threadPoolTaskScheduler; @Autowired ThreadPoolTaskExecutor threadPoolExecutor; // 线程存储器,key=dbid+api编号+功能号 public static ConcurrentHashMap> scheduleTasksMap = new ConcurrentHashMap<>(); //key :dbid+"_"+formid public static ConcurrentHashMap pushTasksMap = new ConcurrentHashMap<>(); @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { if (contextRefreshedEvent.getApplicationContext().getParent() == null) { return; } threadPoolTaskScheduler = (ThreadPoolTaskScheduler) FactoryBean.getBean("threadPoolTaskScheduler"); threadPoolExecutor = (ThreadPoolTaskExecutor) FactoryBean.getBean("threadPoolExecutor"); //加载各个系统的定时作业及推送任务 Map infoList = MultiDataSource.getDataSourceMaps(); log.info("初始化各系统作业(定时,推送)....."); systemTaskIfc = (SystemTaskIfc) FactoryBean.getBean("SystemTaskImpl"); for (Map.Entry entry : infoList.entrySet()) { DataSourceEntity dataSourceEntity = entry.getValue(); if (ScheduleUtils.isOnbusPlatform(dataSourceEntity)||dataSourceEntity.getDbId()==1624) { //dbid=1624,巴士软件(佛山公司),不需要执行定时任务 continue; } // if (dataSourceEntity.getDbId() != 338) continue;//TODO 测试用 threadPoolExecutor.execute(new InitTaksThread(dataSourceEntity)); } String isStartUpSchedule = AttachmentConfig.get("isStartUpSchedule");//只在设置了定时任务的服务器上运行 String isStartUpOn9001 = AttachmentConfig.get("isStartUpOn9001");//只在设置了的服务器上运行 if ("1".equals(isStartUpSchedule)) { //---只需要执行一次的作业,不需要每个数据源系统都执行,放在这里调用 //---线程调用是为了避免数据库死锁的情况,每天早上3点执行一次删除系统日志 threadPoolTaskScheduler.schedule(new DeleteSystemLogSchedule(), new CronTrigger("0 10 3 * * ?")); } if ("1".equals(isStartUpOn9001)) { //---只需要执行一次的作业,不需要每个数据源系统都执行,放在这里调用 //---线程调用是为了避免数据库死锁的情况,每天早上3点执行一次删除系统日志 threadPoolTaskScheduler.schedule(new RsyncAppCatalogSchedule(), new CronTrigger("0 30 3 * * ?")); } //定时输出已加载的定时作业内容 TODO 测试用 // threadPoolTaskScheduler.schedule(new ListSchedule(), new CronTrigger("0 */20 * * * ?"));//20分钟一次 } private class InitTaksThread implements Runnable { DataSourceEntity dataSourceEntity; public InitTaksThread(DataSourceEntity dataSourceEntity) { this.dataSourceEntity = dataSourceEntity; } @Override public void run() { try { String isStartUpSchedule = AttachmentConfig.get("isStartUpSchedule");//只在设置了定时任务的服务器上运行 String isStartUpOn9001 = AttachmentConfig.get("isStartUpOn9001");//多tomcat实例部署只在设置了的服务器上运行 SpObserver.setDBtoInstance("_" + dataSourceEntity.getDbId()); final List list = systemTaskIfc.getOpenAPIInfo(true); if (list != null && list.size() > 0) { if ("1".equals(isStartUpOn9001)) { //先处理定时作业是多tomcat实例部署的情况 addScheduleTaks(list, dataSourceEntity, "multijob"); }else{ //OP0050是右下角弹窗,9010也需要执行,所以需要特殊处理 addScheduleTaks(list.stream().filter(x->"OP0050".equals(x.getApiCode())).collect(Collectors.toList()), dataSourceEntity, "multijob"); } if ("1".equals(isStartUpSchedule)) { //设置这个是为了避免,因为集群导致重复开启定时作业,所以只在开启的系统加载 addScheduleTaks(list, dataSourceEntity,"jobs"); } else { //每个系统初始化时都需要加载OpenAPI的推送服务 addPushTaks(list, dataSourceEntity); } } } catch (Exception e) { System.out.println("dbid="+dataSourceEntity.getDbId()+":"+e.getMessage()); e.printStackTrace(); } finally { SpObserver.setDBtoInstance(); } } } /** * 增加定时作业任务 * * @param list */ public void addScheduleTaks(List list, DataSourceEntity dataSourceEntity,String actipType) { if(list==null||list.size()==0){ return; } log.info(String.format("初始化%s定时作业(%s)条.....", dataSourceEntity.getSystemID(),list.size())); list.stream().forEach(task -> { if (actipType.equalsIgnoreCase(task.actionType)) { log.info("作业名称:"+task.getApiName()); //生成定时执行对象 switch (task.getApiCode()) { //考勤统计 case "OP0001": createScheduleTask(dataSourceEntity, task,new AttendanceSchedule(dataSourceEntity)); break; //---内部对接(总公司与经销商)模块 case "OP0013": createScheduleTask(dataSourceEntity, task,new Pull140902Schedule((dataSourceEntity))); break; case "OP0014": createScheduleTask(dataSourceEntity, task,new Pull120201Schedule(dataSourceEntity,task)); break; case "OP0015": createScheduleTask(dataSourceEntity, task,new Push110503Schedule((dataSourceEntity))); break; //-----end //打卡提醒 case "OP0004": create180212Schedule(dataSourceEntity, task.getApiCode()); break; //检查附件占用空间配额 case "OP0020": createScheduleTask(dataSourceEntity, task,new SpaceCheckSchedule(dataSourceEntity)); break; case "OP0027": createScheduleTask(dataSourceEntity, task,new MaintainceSchedule(dataSourceEntity)); break; case "OP0026": createScheduleTask(dataSourceEntity, task,new MaSyncLiveRoomSchedule(dataSourceEntity)); break; case "OP0025": createScheduleTask(dataSourceEntity, task,new ChcekPasswordHasExpiredSchedule(dataSourceEntity)); break; case "OP0024": createScheduleTask(dataSourceEntity, task,new MaSendTemplateMsgSchedule(dataSourceEntity)); break; case "OP0023": createScheduleTask(dataSourceEntity, task,new SendWeiXinTemplateMsgSchedule(dataSourceEntity)); break; case "OP0022": createScheduleTask(dataSourceEntity, task,new SendWeiXinMsgSchedule(dataSourceEntity)); break; case "OP0021": createScheduleTask(dataSourceEntity, task,new SendEmailsSchedule(dataSourceEntity)); break; case "OP0030": createScheduleTask(dataSourceEntity, task,new MatCodeDataSchedule(dataSourceEntity,task)); break; case "OP0031": createScheduleTask(dataSourceEntity, task,new StockDataSchedule(dataSourceEntity,task)); break; case "OP0032": createScheduleTask(dataSourceEntity, task, new OrderDataSchedule(dataSourceEntity, task)); break; case "OP0034"://维护费通知 createScheduleTask(dataSourceEntity, task, new MaintainPayMsgSchedule(dataSourceEntity)); break; case "OP0038"://更新直播商品库中的商品状态 createScheduleTask(dataSourceEntity, task, new RefreshGoodsInLiveThread(dataSourceEntity)); break; case "OP0039"://获取客户系统活跃情况 createScheduleTask(dataSourceEntity, task, new SystemActiveRecordSchedule(dataSourceEntity)); break; case "OP0044"://卤江南上传销售数据 createScheduleTask(dataSourceEntity, task, new KingDeeSchedule(dataSourceEntity,task)); break; case "OP0050"://右下角弹窗显示 createScheduleTask(dataSourceEntity, task, new MessagePopTipsSchedule(dataSourceEntity)); break; case "OP0063"://检查维护费到期自动停用系统(Onbus系统专用) createScheduleTask(dataSourceEntity, task, new AutoStopMaintainceSchedule(dataSourceEntity)); break; case "OP0066"://自动生成维护费(Onbus系统专用) createScheduleTask(dataSourceEntity, task, new CreateMaintainceOrderSchedule(dataSourceEntity)); break; default: break; } } }); } public void createScheduleTask(DataSourceEntity dataSourceEntity, Task task,Runnable scheduleObject) { ScheduledFuture future; String key = dataSourceEntity.getDbId() + "_" + task.getApiCode(); future = scheduleTasksMap.get(key); if (future != null) { //已存在 future.cancel(true); } if(StringUtils.isBlank(task.getCronExpression())){ throw new ApplicationException(task.getApiName()+"-cron表达式不能为空"); } future = threadPoolTaskScheduler.schedule(scheduleObject, new CronTrigger(task.getCronExpression())); scheduleTasksMap.put(key, future); } /** * 取打卡提醒的时间段,每一个时间段生成一个定时任务 * * @param dataSourceEntity */ private void create180212Schedule(DataSourceEntity dataSourceEntity, String apiCode) { try { SpObserver.setDBtoInstance("_" + dataSourceEntity.getDbId()); systemTaskIfc = (SystemTaskIfc) FactoryBean.getBean("SystemTaskImpl"); final List t180212List = systemTaskIfc.getT180212List(); String startTime = "";//避免有重复情况不生成,因为有多个打卡规则 String endTime = "";//避免有重复情况不生成,,因为有多个打卡规则 for (T180212 t180212 : t180212List) { if (StringUtils.isNotBlank(t180212.getStartWorkingTime()) && !t180212.getStartWorkingTime().equals(startTime)) { startTime = t180212.getStartWorkingTime(); pushScheduleTask(dataSourceEntity, apiCode, startTime, 10); } if (StringUtils.isNotBlank(t180212.getEndWorkingTime()) && !t180212.getEndWorkingTime().equals(endTime)) { endTime = t180212.getEndWorkingTime(); pushScheduleTask(dataSourceEntity, apiCode, endTime, 0); } } } catch (Exception e) { e.printStackTrace(); } finally { SpObserver.setDBtoInstance(); } } private void pushScheduleTask(DataSourceEntity dataSourceEntity, String apiCode, String workerTime, int perMinute) { String key = dataSourceEntity.getDbId() + "_" + apiCode + "_" + workerTime; ScheduledFuture future = scheduleTasksMap.get(key); if (future != null) { //已存在 future.cancel(true); } future = threadPoolTaskScheduler.schedule(new AttendanceReMindSchedule(dataSourceEntity,workerTime,perMinute), new CronTrigger(getCronExpression(workerTime, perMinute))); scheduleTasksMap.put(key, future); } /** * 生成cron表达式结构,这里是传时分,生成指定时间的定时任务,如每天08:30分执行 * * @param perMinute 表示提前多少分钟执行 * @param cronStr * @return */ private String getCronExpression(String cronStr, int perMinute) { LocalTime parse = LocalTime.parse(cronStr, DateTimeFormatter.ofPattern("HH:mm")); parse = parse.minusMinutes(perMinute);//减去指定分钟,因为上班卡可以提前提醒 return String.format("0 %s %s * * ?", (parse.getMinute()), parse.getHour()); } /** * 增加推送,拉取的任务 * * @param list */ public void addPushTaks(List list, DataSourceEntity dataSourceEntity) { log.info(String.format("初始化%s推送服务.....", dataSourceEntity.getSystemID())); list.parallelStream().forEach(x -> { if (!"jobs".equalsIgnoreCase(x.actionType)&&!"multijob".equalsIgnoreCase(x.actionType)) { pushTasksMap.put(dataSourceEntity.getDbId() + "_" + x.getApiCode() + "_" + x.getAffectedFormId(), x); } }); } /** * 取得当前系统已加载的推送服务 * * @return */ public static Task getTask(GateEntity entity, String apiCode) { final Task task = pushTasksMap.get(entity.getDbid() + "_" + apiCode + "_" + entity.getFormid()); if (task != null && task.getStatus() == 1 // && // ( //// ((entity.getBntName().equals("起草")||entity.getBntName().equals("提交")|| //// entity.getBntName().equals("审核结束")|| //// entity.getBntName().equals("取消确认")|| //// entity.getBntName().equals("取消审核"))&& // // entity.getCheckPoint().equalsIgnoreCase(CheckPointType.EXECUTE_AFTER_POST)//普通情况都只有审核后才执行 // || // ((task.getApiCode().equals("OP0003")) // && entity.getCheckPoint().equalsIgnoreCase(CheckPointType.EXECUTE_BEFORE_POST))//特殊情况,需要审核前执行,像推送前需要查询对方系统才确定要不要推送 // ) ) { //是当前执行点才执行 return task; } else { return null; } } /** * 取得当前系统已加载的推送服务,针对取消确认,取消审核用 * * @return */ public static List getTaskByFormid(String dbid,int formid) { List task = new ArrayList<>(); for (Map.Entry entry : pushTasksMap.entrySet()) { if (entry.getKey().startsWith(dbid) && entry.getKey().endsWith(formid + "")) { if((entry.getValue().getAffectedDocStatus().contains("_取消确认")||entry.getValue().getAffectedDocStatus().contains("_取消审核"))&&entry.getValue().getStatus()==1) { task .add(entry.getValue()); }else { continue; } } } if (task != null && task.size()>0) { //启用才返回 return task; }else{ return null; } } /** * 取得当前系统已加载的在线交流推送服务 * * @return */ public static Task getTaskByOnlineMessage(String dbid,int formid) { Task task = null; for (Map.Entry entry : pushTasksMap.entrySet()) { if (entry.getKey().startsWith(dbid) && entry.getKey().endsWith(formid + "")) { if("pushOnlineMessages".equalsIgnoreCase(entry.getValue().getActionType())) { //取在线交流的task task = entry.getValue(); break; }else { continue; } } } if (task != null && task.getStatus() == 1) { //启用才返回 return task; }else{ return null; } } /** * 取得当前系统已加载的推送服务 * * @return */ public static Task getTaskByIsOpenURL(String dbid, String apiCode,int formid) { final Task task = pushTasksMap.get(dbid + "_" + apiCode+"_"+formid ); if (task != null && task.getStatus() == 1 ) { //是当前执行点才执行 return task; } else { return null; } } /** * 刷新指定数据库的作业缓存 * * @param dbid */ public void refreshTask(int dbid, String docCode) { systemTaskIfc = (SystemTaskIfc) FactoryBean.getBean("SystemTaskImpl"); try { SpObserver.setDBtoInstance("_" + dbid); final Task task = systemTaskIfc.getOpenAPI(docCode); if (task != null) { if(task.getStatus()==0){//停用 //取消作业,则需要在缓存时去掉 if (!"jobs".equals(task.getActionType()) && !"multijob".equals(task.getActionType())) { //处理推送任务 pushTasksMap.remove(dbid + "_" + task.getApiCode() + "_" + task.getAffectedFormId()); } else { for (Map.Entry> entry : scheduleTasksMap.entrySet()) { if (entry.getKey().startsWith(dbid + "_" + task.getApiCode())) { //以dbid+apiCode开头表示都需要停止任务 entry.getValue().cancel(true); break; } } scheduleTasksMap.remove(dbid + "_" + task.getApiCode()); } }else {//启用 if (!"jobs".equals(task.getActionType()) && !"multijob".equals(task.getActionType())) { //处理推送任务 pushTasksMap.put(dbid + "_" + task.getApiCode() + "_" + task.getAffectedFormId(), task); } else { //处理定时任务 List list = new ArrayList<>(); list.add(task); final DataSourceEntity dataSourceMap = MultiDataSource.getDataSourceMap(dbid + ""); addScheduleTaks(list, dataSourceMap, "multijob"); String isStartUpSchedule = AttachmentConfig.get("isStartUpSchedule");//只在设置了定时任务的服务器上运行 if ("1".equals(isStartUpSchedule)) { addScheduleTaks(list, dataSourceMap, task.getActionType()); } } } } } catch (Exception ex) { log.error(ex.getMessage()); } finally { SpObserver.setDBtoInstance(); } } /** * 暂停任务 * * @param dbid */ public void pauseTask(int dbid, String docCode) { systemTaskIfc = (SystemTaskIfc) FactoryBean.getBean("SystemTaskImpl"); try { SpObserver.setDBtoInstance("_" + dbid); final Task task = systemTaskIfc.getOpenAPI(docCode); if (task != null) { if (!"jobs".equals(task.getActionType())||!"multijob".equals(task.getActionType())) { //处理推送任务 Task stopTask = pushTasksMap.get(dbid + "_" + task.getApiCode() + "_" + task.getAffectedFormId()); if (stopTask != null) stopTask.setStatus(0);//修改为不启用 } else { for (Map.Entry> entry : scheduleTasksMap.entrySet()) { if (entry.getKey().startsWith(dbid + "_" + task.getApiCode())) { //以dbid+apiCode开头表示都需要停止任务 entry.getValue().cancel(true); } } } } } catch (Exception ex) { log.error(ex.getMessage()); } finally { SpObserver.setDBtoInstance(); } } }