Quartz 实现定时任务调度

Quartz 实现定时任务调度

1. 导包

<dependency>
        <groupId>org.quartz-scheduler</groupId>
        <artifactId>quartz</artifactId>
</dependency>

2. 添加配置文件

在resources下添加quartz.properties 配置文件

# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
#org.quartz.scheduler.instanceName: DefaultQuartzScheduler
#org.quartz.scheduler.rmi.export: false
#org.quartz.scheduler.rmi.proxy: false
#org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
#org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount= 20
#org.quartz.threadPool.threadPriority: 5
#org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
#org.quartz.jobStore.misfireThreshold: 60000
#org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore

3. 添加定时器工厂

@Component
public class JobFactory extends AdaptableJobFactory {

    /**
     * Job对象的实例化过程是在Quartz中进行的,这时候我们需要将spring的东西注入进来
     */
    @Autowired
    private AutowireCapableBeanFactory capableBeanFactory;

    @Override
    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
        //调用父类的方法
        Object jobInstance = super.createJobInstance(bundle);
        //进行注入
        capableBeanFactory.autowireBean(jobInstance);
        return jobInstance;
    }
}

4. 添加定时器配置

@Configuration
public class JobConfigration {

    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory) {
        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
        try {
            schedulerFactoryBean.setOverwriteExistingJobs(true);
            schedulerFactoryBean.setQuartzProperties(quartzProperties());
            schedulerFactoryBean.setJobFactory(jobFactory);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return schedulerFactoryBean;
    }

    @Bean
    public Properties quartzProperties() throws IOException {
        PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
        propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
        propertiesFactoryBean.afterPropertiesSet();
        return propertiesFactoryBean.getObject();
    }

    /**
     * 创建schedule
     * @return Scheduler
     */
    @Bean(name = "scheduler")
    public Scheduler scheduler(JobFactory jobFactory) {
        return schedulerFactoryBean(jobFactory).getScheduler();
    }
}

5. 定时任务操作类

@Component
@Slf4j
public class JobManagerUtil {
    @Resource
    private Scheduler scheduler;

    /**
     * 添加任务2
     *
     * @param jobName        jobName
     * @param jobGroup       jobGroup
     * @param cronExpression cronExpression
     * @param beanClass      beanClass
     */
    public void addCommonJob(String jobName, String jobGroup, String cronExpression, Class<? extends Job> beanClass) {
        try {
            // 创建jobDetail实例,绑定Job实现类 ,指明job的名称,所在组的名称,以及绑定job类
            JobDetail jobDetail = JobBuilder.newJob(beanClass).withIdentity(jobName, jobGroup).build();
            // 定义调度触发规则 使用 cornTrigger 规则
            Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroup)
                    .startAt(DateBuilder.futureDate(1, DateBuilder.IntervalUnit.SECOND))
                    .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)).startNow().build();
            // 把作业和触发器注册到任务调度中
            scheduler.scheduleJob(jobDetail, trigger);
            // 启动
            if (!scheduler.isStarted()) {
                scheduler.start();
            }
        } catch (Exception e) {
            log.error("add Job error", e);
        }
    }

    /**
     * 添加任务3 一次性任务
     *
     * @param jobName   jobName
     * @param jobGroup  jobGroup
     * @param startTime startTime
     * @param beanClass beanClass
     */
    public void addSingleCommonJob(String jobName, String jobGroup, Date startTime, Class<? extends Job> beanClass) {
        try {
            JobDetail jobDetail = JobBuilder.newJob(beanClass).withIdentity(jobName, jobGroup).build();
            SimpleTrigger singleTrigger = TriggerBuilder.newTrigger()
                    .withIdentity(jobName, jobGroup).startAt(startTime).withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(1))
                    .build();
            // 把作业和触发器注册到任务调度中
            scheduler.scheduleJob(jobDetail, singleTrigger);
            // 启动
            if (!scheduler.isStarted()) {
                scheduler.start();
            }
        } catch (Exception e) {
            log.error("add Job error", e);
        }
    }

    /**
     * 添加一个停止任务
     * Params:
     * jobName – jobName
     * jobGroup – jobGroup
     * startTime – startTime
     * beanClass – beanClass
     */
    public void addCommonJobShutdown(String jobName, String jobGroup, String cronExpression, Class<? extends Job> beanClass) {
        try {
            // 创建jobDetail实例,绑定Job实现类 ,指明job的名称,所在组的名称,以及绑定job类
            JobDetail jobDetail = JobBuilder.newJob(beanClass).withIdentity(jobName, jobGroup).build();
            // 定义调度触发规则 使用 cornTrigger 规则
            Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroup)
                    .startAt(DateBuilder.futureDate(1, DateBuilder.IntervalUnit.SECOND))
                    .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)).startNow().build();
            // 把作业和触发器注册到任务调度中
            scheduler.scheduleJob(jobDetail, trigger);
            // 停止
            if (scheduler.isStarted()) {
                scheduler.shutdown();
            }
        } catch (Exception e) {
            log.error("add Job error", e);
        }
    }

    /**
     * 暂停任务
     */
    public void pauseJob(String jobName, String jobGroup) {
        JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
        try {
            scheduler.pauseJob(jobKey);
        } catch (SchedulerException e) {
            log.error("pauseJob error", e);
        }
    }

    /**
     * 恢复任务
     */
    public void resumeJob(String jobName, String jobGroup) {
        JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
        try {
            scheduler.resumeJob(jobKey);
        } catch (SchedulerException e) {
            log.error("resumeJob error", e);
        }
    }

    /**
     * 删除任务
     */
    public void deleteJob(String jobName, String jobGroup) {
        JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
        try {
            if (scheduler.checkExists(jobKey)) {
                scheduler.deleteJob(jobKey);
                log.debug("定时任务:{}.{}-已删除!", jobName, jobGroup);
            }
        } catch (SchedulerException e) {
            log.error("delete job error", e);
        }
    }

    /**
     * 启动任务(将现有任务再执行一次<仅执行一次并非新建了一个任务>
     */
    public void runJobNow(String jobName, String jobGroup) {
        JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
        try {
            scheduler.triggerJob(jobKey);
        } catch (SchedulerException e) {
            log.error("run Job now error", e);
        }
    }

    /**
     * 查看任务是否存在
     */
    public Boolean checkExists(String jobName, String jobGroup) {
        JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
        try {
            return scheduler.checkExists(jobKey);
        } catch (SchedulerException e) {
            log.error("pauseJob error", e);
            return false;
        }
    }

}

6.定时任务实体

 */
@Entity
@Table(name = "db_job_manager")
@EntityListeners(AuditingEntityListener.class)
@DataObjectDescriptor(name = "定时任务管理", desc = "定时任务管理")
@EqualsAndHashCode
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class JobManager {

    @Id
    @DataFieldDescriptor("id")
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Integer id;

    @Column(name = "job_name")
    @DataFieldDescriptor("任务名称")
    private String jobName;

    @Column(name = "job_group")
    @DataFieldDescriptor("任务分组")
    private String jobGroup;

    @Column(name = "start_time")
    @DateTimeFormat(pattern = "yyyy-MM-dd hh:mm:ss")
    @JsonFormat(timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
    @DataFieldDescriptor("一次行执行时间")
    private Date startTime;

    @Column(name = "cron_expression")
    @DataFieldDescriptor("cron表达式")
    private String cronExpression;

    @Column(name = "bean_class")
    @DataFieldDescriptor("执行方法")
    private String beanClass;

    @Column(name = "job_status")
    @DataFieldDescriptor("任务状态")
    private Integer jobStatus;

    @Column(name = "job_type")
    @DataFieldDescriptor("任务类型")
    private Integer jobType;


    @Column(name = "deleted")
    @DataFieldDescriptor("是否删除")
    private Integer deleted;


    @Column(name = "creator_id", nullable = true)
    @DataFieldDescriptor("创建人")
    @CreatedBy
    private Integer creatorId;

    @CreatedDate
    @Column(name = "create_time")
    @DateTimeFormat(pattern = "yyyy-MM-dd hh:mm:ss")
    @JsonFormat(timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
    @DataFieldDescriptor("创建时间")
    private Date createTime;

    @Column(name = "modifier_id", nullable = true)
    @DataFieldDescriptor("修改人id")
    @LastModifiedBy
    private Integer modifierId;

    @LastModifiedDate
    @Column(name = "modify_time", nullable = false)
    @DateTimeFormat(pattern = "yyyy-MM-dd hh:mm:ss")
    @JsonFormat(timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
    @DataFieldDescriptor("修改时间")
    private Date modifyTime;
}

任务操作类型枚举

@Getter
@AllArgsConstructor
public enum JobTaskEnum {
    RUN(0, "运行"),
    PAUSE(1, "暂停"),
    RECOVER(2, "恢复");

    /**
     * code编码
     */
    private Integer code;
    /**
     * 中文信息描述
     */
    private String desc;



    private static final Map<Integer, JobTaskEnum> LOOKUP = Maps.uniqueIndex(
            Arrays.asList(JobTaskEnum.values()),JobTaskEnum::getCode
    );


    @Nullable
    public static JobTaskEnum reversal(int code) {
        return LOOKUP.get(code);
    }
}

7. 定时任务操作接口

public interface JobManagerSrevice {
    //保存任务
    JobManager saveJobManager(JobManager job);
    //任务操作
    Boolean taskOperation(Integer operation, Integer id);
    //删除任务
    Boolean deleteById(List<Integer> ids);
    //初始化任务
    Boolean ininJob();
}
@Slf4j
@Service
public class JobManagerServiceImpl implemen JobManagerSrevice {
    @Autowired
    private JobManagerRepository jobManagerRepository;
    @Autowired
    private JobManagerUtil jobManagerUtil;

    private Map<JobTaskEnum, BiConsumer<String, String>> taskOperationMap = new HashMap<>();

    @PostConstruct
    void init() {
        taskOperationMap.put(JobTaskEnum.RUN, (jobName, jobGroup) -> jobManagerUtil.runJobNow(jobName, jobGroup));
        taskOperationMap.put(JobTaskEnum.PAUSE, (jobName, jobGroup) -> jobManagerUtil.pauseJob(jobName, jobGroup));
        taskOperationMap.put(JobTaskEnum.RECOVER, (jobName, jobGroup) -> jobManagerUtil.resumeJob(jobName, jobGroup));
    }

    @SneakyThrows
    @Override
    public JobManager saveJobManager(JobManager jobManager) {
        long count = jobManagerRepository.countByJobNameAndDeleted(jobManager.getJobName(), Deleted.UN_DELETED.getCode());
        if (count > 0) {
            throw new ServiceException(ResultCode.JOB_NAME_IS_EXIST);
        }
        jobManager.setDeleted(Deleted.UN_DELETED.getCode());
        jobManager.setJobStatus(JobStatusEnum.NORMAL.getCode());
        JobManager save = jobManagerRepository.save(jobManager);
        if (jobManager.getJobType().equals(JobTypeEnum.ONE_OFF_TASK.getCode())) {
            jobManagerUtil.addSingleCommonJob(jobManager.getJobName(), jobManager.getJobGroup(), jobManager.getStartTime(), (Class<? extends Job>) Class.forName(jobManager.getBeanClass()));
        } else {
            jobManagerUtil.addCommonJob(jobManager.getJobName(), jobManager.getJobGroup(), jobManager.getCronExpression(), (Class<? extends Job>) Class.forName(jobManager.getBeanClass()));
        }
        return save;
    }

    @Override
    public Boolean taskOperation(Integer operation, Integer id) {
        Optional<JobManager> byId = jobManagerRepository.findById(id);
        taskOperationVerify(operation, byId.get());
        JobTaskEnum reversal = JobTaskEnum.reversal(operation);
        BiConsumer<String, String> stringStringBiConsumer = taskOperationMap.get(reversal);
        stringStringBiConsumer.accept(byId.get().getJobName(), byId.get().getJobGroup());
        byId.get().setJobStatus(operationToStatus(reversal));
        jobManagerRepository.save(byId.get());
        return true;
    }

    @Override
    public Boolean deleteById(List<Integer> ids) {
        Assert.notEmpty(ids, "failed to delete the product by id ids is empty");
        ids.parallelStream().forEach(i -> {
            Optional<JobManager> byId = jobManagerRepository.findById(i);
            byId.get().setDeleted(Deleted.DELETED.getCode());
            jobManagerRepository.save(byId.get());
            jobManagerUtil.deleteJob(byId.get().getJobName(), byId.get().getJobGroup());
        });
        return true;
    }

    @Override
    public Boolean ininJob() {
        List<JobManager> byDeletedAndStartTimeAfter = jobManagerRepository.findByDeletedOrStartTimeAfter(DateUtil.now());
        byDeletedAndStartTimeAfter.parallelStream().forEach(i -> {
            try {
                if (i.getJobType().equals(JobTypeEnum.ONE_OFF_TASK.getCode())) {
                    jobManagerUtil.addSingleCommonJob(i.getJobName(), i.getJobGroup(), i.getStartTime(), (Class<? extends Job>) Class.forName(i.getBeanClass()));
                } else {
                    if (i.getJobStatus().equals(JobStatusEnum.NORMAL.getCode())) {
                        jobManagerUtil.addCommonJob(i.getJobName(), i.getJobGroup(), i.getCronExpression(), (Class<? extends Job>) Class.forName(i.getBeanClass()));
                    }
                }
            } catch (ClassNotFoundException e) {
                log.error("获取BeanClass错误:", e);
            }
        });
        return true;
    }

    private Integer operationToStatus(JobTaskEnum jobTaskEnum) {
        if (jobTaskEnum == JobTaskEnum.PAUSE) {
            return 1;
        } else {
            return 0;
        }
    }

    private void taskOperationVerify(Integer operation, JobManager jobManager) {
        if (jobManager.getJobStatus().equals(JobStatusEnum.NORMAL.getCode())) {
            if (operation.equals(2)) {
                throw new ServiceException(ResultCode.MISSION_CANNOT_BE_RESTORED_IN_NORMAL_STATE);
            }
        } else {
            if (operation.equals(1)) {
                throw new ServiceException(ResultCode.MISSION_SUSPENDED);
            }
        }
    }
}

8. 启动初始化加载任务

/**
 * @author yz
 * 初始化加载内部的job
 */
@Component
public class JobApplicationRunner implements ApplicationRunner {

    @Autowired
    private JobManagerSreviceb jobManagerSrevice;

    @Override
    public void run(ApplicationArguments var1) throws Exception {
        jobManagerSrevice.ininJob();
    }
}