package com.yc.open.init;
|
|
import com.yc.api.bean.attendance.T180212;
|
import com.yc.api.schedule.ScheduleUtils;
|
import com.yc.entity.AttachmentConfig;
|
import com.yc.entity.DataSourceEntity;
|
import com.yc.factory.FactoryBean;
|
import com.yc.multiData.MultiDataSource;
|
import com.yc.multiData.SpObserver;
|
import com.yc.open.init.shcedule.AttendanceReMindSchedule;
|
import com.yc.open.init.shcedule.AttendanceSchedule;
|
import com.yc.open.mutual.schedule.GateEntity;
|
import com.yc.open.mutual.schedule.Pull120201Schedule;
|
import com.yc.open.mutual.schedule.Pull140902Schedule;
|
import com.yc.open.mutual.schedule.Push110503Schedule;
|
import com.yc.open.service.SystemTaskIfc;
|
import org.apache.commons.lang3.StringUtils;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.context.ApplicationListener;
|
import org.springframework.context.event.ContextRefreshedEvent;
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
import org.springframework.scheduling.support.CronTrigger;
|
import org.springframework.stereotype.Service;
|
|
import java.time.LocalTime;
|
import java.time.format.DateTimeFormatter;
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ScheduledFuture;
|
|
/**
|
* 管理系统所有任务(定时,推送)的增,删,改,停止,启动
|
*/
|
@Service
|
public class InitSystemTaks implements ApplicationListener<ContextRefreshedEvent> {
|
protected Logger log = LoggerFactory.getLogger(this.getClass());
|
@Autowired
|
SystemTaskIfc systemTaskIfc;
|
@Autowired
|
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
|
@Autowired
|
ThreadPoolTaskExecutor threadPoolExecutor;
|
// 线程存储器,key=dbid+api编号+功能号
|
public static ConcurrentHashMap<String, ScheduledFuture<?>> scheduleTasksMap = new ConcurrentHashMap<>();
|
//key :dbid+"_"+formid
|
public static ConcurrentHashMap<String, Task> pushTasksMap = new ConcurrentHashMap<>();
|
|
@Override
|
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
|
if (contextRefreshedEvent.getApplicationContext().getParent() == null) {
|
return;
|
}
|
threadPoolTaskScheduler = (ThreadPoolTaskScheduler) FactoryBean.getBean("threadPoolTaskScheduler");
|
threadPoolExecutor = (ThreadPoolTaskExecutor) FactoryBean.getBean("threadPoolExecutor");
|
//加载各个系统的定时作业及推送任务
|
Map<String, DataSourceEntity> infoList = MultiDataSource.getDataSourceMaps();
|
log.info("初始化各系统作业(定时,推送).....");
|
systemTaskIfc = (SystemTaskIfc) FactoryBean.getBean("SystemTaskImpl");
|
for (Map.Entry<String, DataSourceEntity> entry : infoList.entrySet()) {
|
|
DataSourceEntity dataSourceEntity = entry.getValue();
|
if (ScheduleUtils.isOnbusPlatform(dataSourceEntity)) {
|
continue;
|
}
|
//if (dataSourceEntity.getDbId() != 82) continue;//TODO 测试用
|
threadPoolExecutor.execute(new InitTaksThread(dataSourceEntity));
|
}
|
}
|
|
private class InitTaksThread implements Runnable {
|
DataSourceEntity dataSourceEntity;
|
|
public InitTaksThread(DataSourceEntity dataSourceEntity) {
|
this.dataSourceEntity = dataSourceEntity;
|
}
|
|
@Override
|
public void run() {
|
try {
|
String isStartUpSchedule = AttachmentConfig.get("isStartUpSchedule");//只在设置了定时任务的服务器上运行
|
SpObserver.setDBtoInstance("_" + dataSourceEntity.getDbId());
|
final List<Task> list = systemTaskIfc.getOpenAPIInfo();
|
if (list != null && list.size() > 0) {
|
if ("1".equals(isStartUpSchedule)) {
|
//设置这个是为了避免,因为集群导致重复开启定时作业,所以只在开启的系统加载
|
addScheduleTaks(list, dataSourceEntity);
|
} else {
|
//每个系统初始化时都需要加载OpenAPI的推送服务
|
addPushTaks(list, dataSourceEntity);
|
}
|
}
|
} catch (Exception e) {
|
System.out.println("dbid="+dataSourceEntity.getDbId()+":"+e.getMessage());
|
e.printStackTrace();
|
} finally {
|
SpObserver.setDBtoInstance();
|
}
|
}
|
}
|
|
/**
|
* 增加定时作业任务
|
*
|
* @param list
|
*/
|
public void addScheduleTaks(List<Task> list, DataSourceEntity dataSourceEntity) {
|
log.info(String.format("初始化%s定时作业.....", dataSourceEntity.getSystemID()));
|
list.parallelStream().forEach(task -> {
|
if ("jobs".equalsIgnoreCase(task.actionType)) {
|
//生成定时执行对象
|
switch (task.getApiCode()) {
|
//考勤统计
|
case "OP0001":
|
createScheduleTask(dataSourceEntity, task,new AttendanceSchedule(dataSourceEntity));
|
break;
|
//---内部对接(总公司与经销商)模块
|
case "OP0013":
|
createScheduleTask(dataSourceEntity, task,new Pull140902Schedule((dataSourceEntity)));
|
break;
|
case "OP0014":
|
createScheduleTask(dataSourceEntity, task,new Pull120201Schedule((dataSourceEntity)));
|
break;
|
case "OP0015":
|
createScheduleTask(dataSourceEntity, task,new Push110503Schedule((dataSourceEntity)));
|
break;
|
//-----end
|
//打卡提醒
|
case "OP0004":
|
create180212Schedule(dataSourceEntity, task.getApiCode());
|
break;
|
default:
|
break;
|
}
|
}
|
|
});
|
|
}
|
|
public void createScheduleTask(DataSourceEntity dataSourceEntity, Task task,Runnable scheduleObject) {
|
ScheduledFuture<?> future;
|
String key = dataSourceEntity.getDbId() + "_" + task.getApiCode();
|
future = scheduleTasksMap.get(key);
|
if (future != null) {
|
//已存在
|
future.cancel(true);
|
}
|
future = threadPoolTaskScheduler.schedule(scheduleObject, new CronTrigger(task.getCronExpression()));
|
scheduleTasksMap.put(key, future);
|
}
|
|
/**
|
* 取打卡提醒的时间段,每一个时间段生成一个定时任务
|
*
|
* @param dataSourceEntity
|
*/
|
private void create180212Schedule(DataSourceEntity dataSourceEntity, String apiCode) {
|
try {
|
SpObserver.setDBtoInstance("_" + dataSourceEntity.getDbId());
|
systemTaskIfc = (SystemTaskIfc) FactoryBean.getBean("SystemTaskImpl");
|
final List<T180212> t180212List = systemTaskIfc.getT180212List();
|
String startTime = "";//避免有重复情况不生成,因为有多个打卡规则
|
String endTime = "";//避免有重复情况不生成,,因为有多个打卡规则
|
for (T180212 t180212 : t180212List) {
|
if (StringUtils.isNotBlank(t180212.getStartWorkingTime()) && !t180212.getStartWorkingTime().equals(startTime)) {
|
startTime = t180212.getStartWorkingTime();
|
pushScheduleTask(dataSourceEntity, apiCode, startTime, 10);
|
}
|
if (StringUtils.isNotBlank(t180212.getEndWorkingTime()) && !t180212.getEndWorkingTime().equals(endTime)) {
|
endTime = t180212.getEndWorkingTime();
|
pushScheduleTask(dataSourceEntity, apiCode, endTime, 0);
|
}
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
} finally {
|
SpObserver.setDBtoInstance();
|
}
|
}
|
|
private void pushScheduleTask(DataSourceEntity dataSourceEntity, String apiCode, String workerTime, int perMinute) {
|
String key = dataSourceEntity.getDbId() + "_" + apiCode + "_" + workerTime;
|
ScheduledFuture<?> future = scheduleTasksMap.get(key);
|
if (future != null) {
|
//已存在
|
future.cancel(true);
|
}
|
future = threadPoolTaskScheduler.schedule(new AttendanceReMindSchedule(dataSourceEntity,workerTime,perMinute), new CronTrigger(getCronExpression(workerTime, perMinute)));
|
scheduleTasksMap.put(key, future);
|
}
|
|
/**
|
* 生成cron表达式结构,这里是传时分,生成指定时间的定时任务,如每天08:30分执行
|
*
|
* @param perMinute 表示提前多少分钟执行
|
* @param cronStr
|
* @return
|
*/
|
private String getCronExpression(String cronStr, int perMinute) {
|
LocalTime parse = LocalTime.parse(cronStr, DateTimeFormatter.ofPattern("HH:mm"));
|
parse = parse.minusMinutes(perMinute);//减去指定分钟,因为上班卡可以提前提醒
|
return String.format("0 %s %s * * ?", (parse.getMinute()), parse.getHour());
|
}
|
|
/**
|
* 增加推送,拉取的任务
|
*
|
* @param list
|
*/
|
public void addPushTaks(List<Task> list, DataSourceEntity dataSourceEntity) {
|
log.info(String.format("初始化%s推送服务.....", dataSourceEntity.getSystemID()));
|
list.parallelStream().forEach(x -> {
|
if (!"jobs".equalsIgnoreCase(x.actionType)) {
|
pushTasksMap.put(dataSourceEntity.getDbId() + "_" + x.getApiCode() + "_" + x.getAffectedFormId(), x);
|
}
|
|
});
|
}
|
|
/**
|
* 取得当前系统已加载的推送服务
|
*
|
* @return
|
*/
|
public static Task getTask(GateEntity entity, String apiCode) {
|
final Task task = pushTasksMap.get(entity.getDbid() + "_" + apiCode + "_" + entity.getFormid());
|
if (task != null && task.getStatus() == 1 && entity.getCheckPoint().equalsIgnoreCase(task.getWhenToExecute())) {
|
//是当前执行点才执行
|
return task;
|
} else {
|
return null;
|
}
|
}
|
/**
|
* 取得当前系统已加载的推送服务
|
*
|
* @return
|
*/
|
public static Task getTaskByIsOpenURL(GateEntity entity, String apiCode) {
|
final Task task = pushTasksMap.get(entity.getDbid() + "_" + apiCode );
|
if (task != null && task.getStatus() == 1 ) {
|
//是当前执行点才执行
|
return task;
|
} else {
|
return null;
|
}
|
}
|
/**
|
* 刷新指定数据库的作业缓存
|
*
|
* @param dbid
|
*/
|
public void refreshTask(int dbid, String docCode) {
|
systemTaskIfc = (SystemTaskIfc) FactoryBean.getBean("SystemTaskImpl");
|
try {
|
SpObserver.setDBtoInstance("_" + dbid);
|
final Task task = systemTaskIfc.getOpenAPI(docCode);
|
if (task != null) {
|
if (!"jobs".equals(task.getActionType())) {
|
//处理推送任务
|
pushTasksMap.put(dbid + "_" + task.getApiCode() + "_" + task.getAffectedFormId(), task);
|
} else {
|
//处理定时任务
|
String isStartUpSchedule = AttachmentConfig.get("isStartUpSchedule");//只在设置了定时任务的服务器上运行
|
if("1".equals(isStartUpSchedule)) {
|
List<Task> list = new ArrayList<>();
|
list.add(task);
|
final DataSourceEntity dataSourceMap = MultiDataSource.getDataSourceMap(dbid + "");
|
addScheduleTaks(list, dataSourceMap);
|
}
|
}
|
}
|
} catch (Exception ex) {
|
log.error(ex.getMessage());
|
} finally {
|
SpObserver.setDBtoInstance();
|
}
|
|
}
|
|
/**
|
* 暂停任务
|
*
|
* @param dbid
|
*/
|
public void pauseTask(int dbid, String docCode) {
|
systemTaskIfc = (SystemTaskIfc) FactoryBean.getBean("SystemTaskImpl");
|
try {
|
SpObserver.setDBtoInstance("_" + dbid);
|
final Task task = systemTaskIfc.getOpenAPI(docCode);
|
if (task != null) {
|
if (!"jobs".equals(task.getActionType())) {
|
//处理推送任务
|
final Task stopTask = pushTasksMap.get(dbid + "_" + task.getApiCode() + "_" + task.getAffectedFormId());
|
if (stopTask != null)
|
stopTask.setStatus(0);//修改为不启用
|
} else {
|
for (Map.Entry<String, ScheduledFuture<?>> entry : scheduleTasksMap.entrySet()) {
|
if (entry.getKey().startsWith(dbid + "_" + task.getApiCode())) {
|
//以dbid+apiCode开头表示都需要停止任务
|
entry.getValue().cancel(true);
|
}
|
}
|
}
|
}
|
} catch (Exception ex) {
|
log.error(ex.getMessage());
|
} finally {
|
SpObserver.setDBtoInstance();
|
}
|
|
}
|
}
|