package com.yc.open.init; import com.yc.api.bean.attendance.T180212; import com.yc.api.schedule.ScheduleUtils; import com.yc.entity.AttachmentConfig; import com.yc.entity.DataSourceEntity; import com.yc.factory.FactoryBean; import com.yc.multiData.MultiDataSource; import com.yc.multiData.SpObserver; import com.yc.open.init.shcedule.AttendanceReMindSchedule; import com.yc.open.init.shcedule.AttendanceSchedule; import com.yc.open.mutual.schedule.GateEntity; import com.yc.open.mutual.schedule.Pull120201Schedule; import com.yc.open.mutual.schedule.Pull140902Schedule; import com.yc.open.mutual.schedule.Push110503Schedule; import com.yc.open.service.SystemTaskIfc; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.support.CronTrigger; import org.springframework.stereotype.Service; import java.time.LocalTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; /** * 管理系统所有任务(定时,推送)的增,删,改,停止,启动 */ @Service public class InitSystemTaks implements ApplicationListener { protected Logger log = LoggerFactory.getLogger(this.getClass()); @Autowired SystemTaskIfc systemTaskIfc; @Autowired private ThreadPoolTaskScheduler threadPoolTaskScheduler; @Autowired ThreadPoolTaskExecutor threadPoolExecutor; // 线程存储器,key=dbid+api编号+功能号 public static ConcurrentHashMap> scheduleTasksMap = new ConcurrentHashMap<>(); //key :dbid+"_"+formid public static ConcurrentHashMap pushTasksMap = new ConcurrentHashMap<>(); @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { if (contextRefreshedEvent.getApplicationContext().getParent() == null) { return; } threadPoolTaskScheduler = (ThreadPoolTaskScheduler) FactoryBean.getBean("threadPoolTaskScheduler"); threadPoolExecutor = (ThreadPoolTaskExecutor) FactoryBean.getBean("threadPoolExecutor"); //加载各个系统的定时作业及推送任务 Map infoList = MultiDataSource.getDataSourceMaps(); log.info("初始化各系统作业(定时,推送)....."); systemTaskIfc = (SystemTaskIfc) FactoryBean.getBean("SystemTaskImpl"); for (Map.Entry entry : infoList.entrySet()) { DataSourceEntity dataSourceEntity = entry.getValue(); if (ScheduleUtils.isOnbusPlatform(dataSourceEntity)) { continue; } //if (dataSourceEntity.getDbId() != 82) continue;//TODO 测试用 threadPoolExecutor.execute(new InitTaksThread(dataSourceEntity)); } } private class InitTaksThread implements Runnable { DataSourceEntity dataSourceEntity; public InitTaksThread(DataSourceEntity dataSourceEntity) { this.dataSourceEntity = dataSourceEntity; } @Override public void run() { try { String isStartUpSchedule = AttachmentConfig.get("isStartUpSchedule");//只在设置了定时任务的服务器上运行 SpObserver.setDBtoInstance("_" + dataSourceEntity.getDbId()); final List list = systemTaskIfc.getOpenAPIInfo(); if (list != null && list.size() > 0) { if ("1".equals(isStartUpSchedule)) { //设置这个是为了避免,因为集群导致重复开启定时作业,所以只在开启的系统加载 addScheduleTaks(list, dataSourceEntity); } else { //每个系统初始化时都需要加载OpenAPI的推送服务 addPushTaks(list, dataSourceEntity); } } } catch (Exception e) { System.out.println("dbid="+dataSourceEntity.getDbId()+":"+e.getMessage()); e.printStackTrace(); } finally { SpObserver.setDBtoInstance(); } } } /** * 增加定时作业任务 * * @param list */ public void addScheduleTaks(List list, DataSourceEntity dataSourceEntity) { log.info(String.format("初始化%s定时作业.....", dataSourceEntity.getSystemID())); list.parallelStream().forEach(task -> { if ("jobs".equalsIgnoreCase(task.actionType)) { //生成定时执行对象 switch (task.getApiCode()) { //考勤统计 case "OP0001": createScheduleTask(dataSourceEntity, task,new AttendanceSchedule(dataSourceEntity)); break; //---内部对接(总公司与经销商)模块 case "OP0013": createScheduleTask(dataSourceEntity, task,new Pull140902Schedule((dataSourceEntity))); break; case "OP0014": createScheduleTask(dataSourceEntity, task,new Pull120201Schedule((dataSourceEntity))); break; case "OP0015": createScheduleTask(dataSourceEntity, task,new Push110503Schedule((dataSourceEntity))); break; //-----end //打卡提醒 case "OP0004": create180212Schedule(dataSourceEntity, task.getApiCode()); break; default: break; } } }); } public void createScheduleTask(DataSourceEntity dataSourceEntity, Task task,Runnable scheduleObject) { ScheduledFuture future; String key = dataSourceEntity.getDbId() + "_" + task.getApiCode(); future = scheduleTasksMap.get(key); if (future != null) { //已存在 future.cancel(true); } future = threadPoolTaskScheduler.schedule(scheduleObject, new CronTrigger(task.getCronExpression())); scheduleTasksMap.put(key, future); } /** * 取打卡提醒的时间段,每一个时间段生成一个定时任务 * * @param dataSourceEntity */ private void create180212Schedule(DataSourceEntity dataSourceEntity, String apiCode) { try { SpObserver.setDBtoInstance("_" + dataSourceEntity.getDbId()); systemTaskIfc = (SystemTaskIfc) FactoryBean.getBean("SystemTaskImpl"); final List t180212List = systemTaskIfc.getT180212List(); String startTime = "";//避免有重复情况不生成,因为有多个打卡规则 String endTime = "";//避免有重复情况不生成,,因为有多个打卡规则 for (T180212 t180212 : t180212List) { if (StringUtils.isNotBlank(t180212.getStartWorkingTime()) && !t180212.getStartWorkingTime().equals(startTime)) { startTime = t180212.getStartWorkingTime(); pushScheduleTask(dataSourceEntity, apiCode, startTime, 10); } if (StringUtils.isNotBlank(t180212.getEndWorkingTime()) && !t180212.getEndWorkingTime().equals(endTime)) { endTime = t180212.getEndWorkingTime(); pushScheduleTask(dataSourceEntity, apiCode, endTime, 0); } } } catch (Exception e) { e.printStackTrace(); } finally { SpObserver.setDBtoInstance(); } } private void pushScheduleTask(DataSourceEntity dataSourceEntity, String apiCode, String workerTime, int perMinute) { String key = dataSourceEntity.getDbId() + "_" + apiCode + "_" + workerTime; ScheduledFuture future = scheduleTasksMap.get(key); if (future != null) { //已存在 future.cancel(true); } future = threadPoolTaskScheduler.schedule(new AttendanceReMindSchedule(dataSourceEntity,workerTime,perMinute), new CronTrigger(getCronExpression(workerTime, perMinute))); scheduleTasksMap.put(key, future); } /** * 生成cron表达式结构,这里是传时分,生成指定时间的定时任务,如每天08:30分执行 * * @param perMinute 表示提前多少分钟执行 * @param cronStr * @return */ private String getCronExpression(String cronStr, int perMinute) { LocalTime parse = LocalTime.parse(cronStr, DateTimeFormatter.ofPattern("HH:mm")); parse = parse.minusMinutes(perMinute);//减去指定分钟,因为上班卡可以提前提醒 return String.format("0 %s %s * * ?", (parse.getMinute()), parse.getHour()); } /** * 增加推送,拉取的任务 * * @param list */ public void addPushTaks(List list, DataSourceEntity dataSourceEntity) { log.info(String.format("初始化%s推送服务.....", dataSourceEntity.getSystemID())); list.parallelStream().forEach(x -> { if (!"jobs".equalsIgnoreCase(x.actionType)) { pushTasksMap.put(dataSourceEntity.getDbId() + "_" + x.getApiCode() + "_" + x.getAffectedFormId(), x); } }); } /** * 取得当前系统已加载的推送服务 * * @return */ public static Task getTask(GateEntity entity, String apiCode) { final Task task = pushTasksMap.get(entity.getDbid() + "_" + apiCode + "_" + entity.getFormid()); if (task != null && task.getStatus() == 1 && entity.getCheckPoint().equalsIgnoreCase(task.getWhenToExecute())) { //是当前执行点才执行 return task; } else { return null; } } /** * 取得当前系统已加载的推送服务 * * @return */ public static Task getTaskByIsOpenURL(GateEntity entity, String apiCode) { final Task task = pushTasksMap.get(entity.getDbid() + "_" + apiCode ); if (task != null && task.getStatus() == 1 ) { //是当前执行点才执行 return task; } else { return null; } } /** * 刷新指定数据库的作业缓存 * * @param dbid */ public void refreshTask(int dbid, String docCode) { systemTaskIfc = (SystemTaskIfc) FactoryBean.getBean("SystemTaskImpl"); try { SpObserver.setDBtoInstance("_" + dbid); final Task task = systemTaskIfc.getOpenAPI(docCode); if (task != null) { if (!"jobs".equals(task.getActionType())) { //处理推送任务 pushTasksMap.put(dbid + "_" + task.getApiCode() + "_" + task.getAffectedFormId(), task); } else { //处理定时任务 String isStartUpSchedule = AttachmentConfig.get("isStartUpSchedule");//只在设置了定时任务的服务器上运行 if("1".equals(isStartUpSchedule)) { List list = new ArrayList<>(); list.add(task); final DataSourceEntity dataSourceMap = MultiDataSource.getDataSourceMap(dbid + ""); addScheduleTaks(list, dataSourceMap); } } } } catch (Exception ex) { log.error(ex.getMessage()); } finally { SpObserver.setDBtoInstance(); } } /** * 暂停任务 * * @param dbid */ public void pauseTask(int dbid, String docCode) { systemTaskIfc = (SystemTaskIfc) FactoryBean.getBean("SystemTaskImpl"); try { SpObserver.setDBtoInstance("_" + dbid); final Task task = systemTaskIfc.getOpenAPI(docCode); if (task != null) { if (!"jobs".equals(task.getActionType())) { //处理推送任务 final Task stopTask = pushTasksMap.get(dbid + "_" + task.getApiCode() + "_" + task.getAffectedFormId()); if (stopTask != null) stopTask.setStatus(0);//修改为不启用 } else { for (Map.Entry> entry : scheduleTasksMap.entrySet()) { if (entry.getKey().startsWith(dbid + "_" + task.getApiCode())) { //以dbid+apiCode开头表示都需要停止任务 entry.getValue().cancel(true); } } } } } catch (Exception ex) { log.error(ex.getMessage()); } finally { SpObserver.setDBtoInstance(); } } }