Quartz 实现定时任务调度
1. 导包
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
</dependency>
2. 添加配置文件
在resources下添加quartz.properties 配置文件
# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
#org.quartz.scheduler.instanceName: DefaultQuartzScheduler
#org.quartz.scheduler.rmi.export: false
#org.quartz.scheduler.rmi.proxy: false
#org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
#org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount= 20
#org.quartz.threadPool.threadPriority: 5
#org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
#org.quartz.jobStore.misfireThreshold: 60000
#org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
3. 添加定时器工厂
@Component
public class JobFactory extends AdaptableJobFactory {
/**
* Job对象的实例化过程是在Quartz中进行的,这时候我们需要将spring的东西注入进来
*/
@Autowired
private AutowireCapableBeanFactory capableBeanFactory;
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
//调用父类的方法
Object jobInstance = super.createJobInstance(bundle);
//进行注入
capableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
4. 添加定时器配置
@Configuration
public class JobConfigration {
@Bean
public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory) {
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
try {
schedulerFactoryBean.setOverwriteExistingJobs(true);
schedulerFactoryBean.setQuartzProperties(quartzProperties());
schedulerFactoryBean.setJobFactory(jobFactory);
} catch (Exception e) {
e.printStackTrace();
}
return schedulerFactoryBean;
}
@Bean
public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}
/**
* 创建schedule
* @return Scheduler
*/
@Bean(name = "scheduler")
public Scheduler scheduler(JobFactory jobFactory) {
return schedulerFactoryBean(jobFactory).getScheduler();
}
}
5. 定时任务操作类
@Component
@Slf4j
public class JobManagerUtil {
@Resource
private Scheduler scheduler;
/**
* 添加任务2
*
* @param jobName jobName
* @param jobGroup jobGroup
* @param cronExpression cronExpression
* @param beanClass beanClass
*/
public void addCommonJob(String jobName, String jobGroup, String cronExpression, Class<? extends Job> beanClass) {
try {
// 创建jobDetail实例,绑定Job实现类 ,指明job的名称,所在组的名称,以及绑定job类
JobDetail jobDetail = JobBuilder.newJob(beanClass).withIdentity(jobName, jobGroup).build();
// 定义调度触发规则 使用 cornTrigger 规则
Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroup)
.startAt(DateBuilder.futureDate(1, DateBuilder.IntervalUnit.SECOND))
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)).startNow().build();
// 把作业和触发器注册到任务调度中
scheduler.scheduleJob(jobDetail, trigger);
// 启动
if (!scheduler.isStarted()) {
scheduler.start();
}
} catch (Exception e) {
log.error("add Job error", e);
}
}
/**
* 添加任务3 一次性任务
*
* @param jobName jobName
* @param jobGroup jobGroup
* @param startTime startTime
* @param beanClass beanClass
*/
public void addSingleCommonJob(String jobName, String jobGroup, Date startTime, Class<? extends Job> beanClass) {
try {
JobDetail jobDetail = JobBuilder.newJob(beanClass).withIdentity(jobName, jobGroup).build();
SimpleTrigger singleTrigger = TriggerBuilder.newTrigger()
.withIdentity(jobName, jobGroup).startAt(startTime).withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(1))
.build();
// 把作业和触发器注册到任务调度中
scheduler.scheduleJob(jobDetail, singleTrigger);
// 启动
if (!scheduler.isStarted()) {
scheduler.start();
}
} catch (Exception e) {
log.error("add Job error", e);
}
}
/**
* 添加一个停止任务
* Params:
* jobName – jobName
* jobGroup – jobGroup
* startTime – startTime
* beanClass – beanClass
*/
public void addCommonJobShutdown(String jobName, String jobGroup, String cronExpression, Class<? extends Job> beanClass) {
try {
// 创建jobDetail实例,绑定Job实现类 ,指明job的名称,所在组的名称,以及绑定job类
JobDetail jobDetail = JobBuilder.newJob(beanClass).withIdentity(jobName, jobGroup).build();
// 定义调度触发规则 使用 cornTrigger 规则
Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroup)
.startAt(DateBuilder.futureDate(1, DateBuilder.IntervalUnit.SECOND))
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)).startNow().build();
// 把作业和触发器注册到任务调度中
scheduler.scheduleJob(jobDetail, trigger);
// 停止
if (scheduler.isStarted()) {
scheduler.shutdown();
}
} catch (Exception e) {
log.error("add Job error", e);
}
}
/**
* 暂停任务
*/
public void pauseJob(String jobName, String jobGroup) {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
try {
scheduler.pauseJob(jobKey);
} catch (SchedulerException e) {
log.error("pauseJob error", e);
}
}
/**
* 恢复任务
*/
public void resumeJob(String jobName, String jobGroup) {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
try {
scheduler.resumeJob(jobKey);
} catch (SchedulerException e) {
log.error("resumeJob error", e);
}
}
/**
* 删除任务
*/
public void deleteJob(String jobName, String jobGroup) {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
try {
if (scheduler.checkExists(jobKey)) {
scheduler.deleteJob(jobKey);
log.debug("定时任务:{}.{}-已删除!", jobName, jobGroup);
}
} catch (SchedulerException e) {
log.error("delete job error", e);
}
}
/**
* 启动任务(将现有任务再执行一次<仅执行一次并非新建了一个任务>
*/
public void runJobNow(String jobName, String jobGroup) {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
try {
scheduler.triggerJob(jobKey);
} catch (SchedulerException e) {
log.error("run Job now error", e);
}
}
/**
* 查看任务是否存在
*/
public Boolean checkExists(String jobName, String jobGroup) {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
try {
return scheduler.checkExists(jobKey);
} catch (SchedulerException e) {
log.error("pauseJob error", e);
return false;
}
}
}
6.定时任务实体
*/
@Entity
@Table(name = "db_job_manager")
@EntityListeners(AuditingEntityListener.class)
@DataObjectDescriptor(name = "定时任务管理", desc = "定时任务管理")
@EqualsAndHashCode
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class JobManager {
@Id
@DataFieldDescriptor("id")
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
@Column(name = "job_name")
@DataFieldDescriptor("任务名称")
private String jobName;
@Column(name = "job_group")
@DataFieldDescriptor("任务分组")
private String jobGroup;
@Column(name = "start_time")
@DateTimeFormat(pattern = "yyyy-MM-dd hh:mm:ss")
@JsonFormat(timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
@DataFieldDescriptor("一次行执行时间")
private Date startTime;
@Column(name = "cron_expression")
@DataFieldDescriptor("cron表达式")
private String cronExpression;
@Column(name = "bean_class")
@DataFieldDescriptor("执行方法")
private String beanClass;
@Column(name = "job_status")
@DataFieldDescriptor("任务状态")
private Integer jobStatus;
@Column(name = "job_type")
@DataFieldDescriptor("任务类型")
private Integer jobType;
@Column(name = "deleted")
@DataFieldDescriptor("是否删除")
private Integer deleted;
@Column(name = "creator_id", nullable = true)
@DataFieldDescriptor("创建人")
@CreatedBy
private Integer creatorId;
@CreatedDate
@Column(name = "create_time")
@DateTimeFormat(pattern = "yyyy-MM-dd hh:mm:ss")
@JsonFormat(timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
@DataFieldDescriptor("创建时间")
private Date createTime;
@Column(name = "modifier_id", nullable = true)
@DataFieldDescriptor("修改人id")
@LastModifiedBy
private Integer modifierId;
@LastModifiedDate
@Column(name = "modify_time", nullable = false)
@DateTimeFormat(pattern = "yyyy-MM-dd hh:mm:ss")
@JsonFormat(timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
@DataFieldDescriptor("修改时间")
private Date modifyTime;
}
任务操作类型枚举
@Getter
@AllArgsConstructor
public enum JobTaskEnum {
RUN(0, "运行"),
PAUSE(1, "暂停"),
RECOVER(2, "恢复");
/**
* code编码
*/
private Integer code;
/**
* 中文信息描述
*/
private String desc;
private static final Map<Integer, JobTaskEnum> LOOKUP = Maps.uniqueIndex(
Arrays.asList(JobTaskEnum.values()),JobTaskEnum::getCode
);
@Nullable
public static JobTaskEnum reversal(int code) {
return LOOKUP.get(code);
}
}
7. 定时任务操作接口
public interface JobManagerSrevice {
//保存任务
JobManager saveJobManager(JobManager job);
//任务操作
Boolean taskOperation(Integer operation, Integer id);
//删除任务
Boolean deleteById(List<Integer> ids);
//初始化任务
Boolean ininJob();
}
@Slf4j
@Service
public class JobManagerServiceImpl implemen JobManagerSrevice {
@Autowired
private JobManagerRepository jobManagerRepository;
@Autowired
private JobManagerUtil jobManagerUtil;
private Map<JobTaskEnum, BiConsumer<String, String>> taskOperationMap = new HashMap<>();
@PostConstruct
void init() {
taskOperationMap.put(JobTaskEnum.RUN, (jobName, jobGroup) -> jobManagerUtil.runJobNow(jobName, jobGroup));
taskOperationMap.put(JobTaskEnum.PAUSE, (jobName, jobGroup) -> jobManagerUtil.pauseJob(jobName, jobGroup));
taskOperationMap.put(JobTaskEnum.RECOVER, (jobName, jobGroup) -> jobManagerUtil.resumeJob(jobName, jobGroup));
}
@SneakyThrows
@Override
public JobManager saveJobManager(JobManager jobManager) {
long count = jobManagerRepository.countByJobNameAndDeleted(jobManager.getJobName(), Deleted.UN_DELETED.getCode());
if (count > 0) {
throw new ServiceException(ResultCode.JOB_NAME_IS_EXIST);
}
jobManager.setDeleted(Deleted.UN_DELETED.getCode());
jobManager.setJobStatus(JobStatusEnum.NORMAL.getCode());
JobManager save = jobManagerRepository.save(jobManager);
if (jobManager.getJobType().equals(JobTypeEnum.ONE_OFF_TASK.getCode())) {
jobManagerUtil.addSingleCommonJob(jobManager.getJobName(), jobManager.getJobGroup(), jobManager.getStartTime(), (Class<? extends Job>) Class.forName(jobManager.getBeanClass()));
} else {
jobManagerUtil.addCommonJob(jobManager.getJobName(), jobManager.getJobGroup(), jobManager.getCronExpression(), (Class<? extends Job>) Class.forName(jobManager.getBeanClass()));
}
return save;
}
@Override
public Boolean taskOperation(Integer operation, Integer id) {
Optional<JobManager> byId = jobManagerRepository.findById(id);
taskOperationVerify(operation, byId.get());
JobTaskEnum reversal = JobTaskEnum.reversal(operation);
BiConsumer<String, String> stringStringBiConsumer = taskOperationMap.get(reversal);
stringStringBiConsumer.accept(byId.get().getJobName(), byId.get().getJobGroup());
byId.get().setJobStatus(operationToStatus(reversal));
jobManagerRepository.save(byId.get());
return true;
}
@Override
public Boolean deleteById(List<Integer> ids) {
Assert.notEmpty(ids, "failed to delete the product by id ids is empty");
ids.parallelStream().forEach(i -> {
Optional<JobManager> byId = jobManagerRepository.findById(i);
byId.get().setDeleted(Deleted.DELETED.getCode());
jobManagerRepository.save(byId.get());
jobManagerUtil.deleteJob(byId.get().getJobName(), byId.get().getJobGroup());
});
return true;
}
@Override
public Boolean ininJob() {
List<JobManager> byDeletedAndStartTimeAfter = jobManagerRepository.findByDeletedOrStartTimeAfter(DateUtil.now());
byDeletedAndStartTimeAfter.parallelStream().forEach(i -> {
try {
if (i.getJobType().equals(JobTypeEnum.ONE_OFF_TASK.getCode())) {
jobManagerUtil.addSingleCommonJob(i.getJobName(), i.getJobGroup(), i.getStartTime(), (Class<? extends Job>) Class.forName(i.getBeanClass()));
} else {
if (i.getJobStatus().equals(JobStatusEnum.NORMAL.getCode())) {
jobManagerUtil.addCommonJob(i.getJobName(), i.getJobGroup(), i.getCronExpression(), (Class<? extends Job>) Class.forName(i.getBeanClass()));
}
}
} catch (ClassNotFoundException e) {
log.error("获取BeanClass错误:", e);
}
});
return true;
}
private Integer operationToStatus(JobTaskEnum jobTaskEnum) {
if (jobTaskEnum == JobTaskEnum.PAUSE) {
return 1;
} else {
return 0;
}
}
private void taskOperationVerify(Integer operation, JobManager jobManager) {
if (jobManager.getJobStatus().equals(JobStatusEnum.NORMAL.getCode())) {
if (operation.equals(2)) {
throw new ServiceException(ResultCode.MISSION_CANNOT_BE_RESTORED_IN_NORMAL_STATE);
}
} else {
if (operation.equals(1)) {
throw new ServiceException(ResultCode.MISSION_SUSPENDED);
}
}
}
}
8. 启动初始化加载任务
/**
* @author yz
* 初始化加载内部的job
*/
@Component
public class JobApplicationRunner implements ApplicationRunner {
@Autowired
private JobManagerSreviceb jobManagerSrevice;
@Override
public void run(ApplicationArguments var1) throws Exception {
jobManagerSrevice.ininJob();
}
}