spring boot 集成 quartz 集群 最佳实践—支持动态修改job

后端存储 简书

这篇文章我写的是集群方式的,如果是单节点且不需要持久化可以参考文章 https://www.jianshu.com/p/fe257adc331d

1、依赖jar包

由于公司使用的是Spring cloud 微服务架构,查看官网发现目前中央仓库中还没有 spring-boot-starter-quartz ,只有在spring 的官方仓库中有

image.png

路径 http://repo.spring.io/milestone/org/springframework/boot/spring-boot-starter-quartz/

要用的话需要手动指定版本

     org.springframework.boot
     spring-boot-starter-quartz
     2.0.0.M2

细看这个jar包内容的话不难发现,这个jar包只是在pom文件中引用了几个依赖jar包

image.png

既然是这样,我们不如直接引用这几个jar包省事,还能根据自己的项目环境选择合适的版本

            mysql
            mysql-connector-java
        
        
            org.springframework.boot
            spring-boot-starter-jdbc
        
        
            org.quartz-scheduler
            quartz
            2.3.0
        
        
            org.quartz-scheduler
            quartz-jobs
            2.3.0
        
        
            org.springframework
            spring-context-support
        
        
            org.springframework
            spring-tx
        

我的是Spring cloud项目,和spring相关的没有指定具体版本,各位视情况而定,如果是单纯的Spring boot项目,手动指定下版本号即可。

2、数据库配置

由于分布式的quartz需要将任务和触发器持久化到数据库,这样就的为quartz配置一套数据源,而一般的项目如果有其他的业务需要操作数据库,项目中本身需要为spring配置一套数据源,这样在同一个运行环境中就会同时有两套数据源的配置,如果不是有特殊需求,通一个环境有一套配置足以。在网上也看了好多集成的例子,但是都是配置两套,当时的直觉告诉我一套是可以的,当时也试了一些方式,后来自己摸索出来了,直接说我的实现方式,其他的省略,要看的话可以直接去官网看一眼,下面以msql数据库为例,其他类似。

image.png

quartz.properties

org.quartz.scheduler.instanceName=test-schedule
org.quartz.scheduler.instanceId=AUTO
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.tablePrefix=QRTZ_
org.quartz.jobStore.isClustered=true
org.quartz.jobStore.useProperties=false
org.quartz.jobStore.clusterCheckinInterval=20000
org.quartz.scheduler.skipUpdateCheck=true
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount=10
org.quartz.threadPool.threadPriority=5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true

application-dev.properties相关部分

server.port=8084
spring.application.name=test-schedule
server.tomcat.max-http-header-size=8192

spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test-schedule?useUnicode=true&autoReconnect=true&rewriteBatchedStatements=TRUE&useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

QuartzJobFactory

import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.scheduling.quartz.AdaptableJobFactory;
import org.springframework.stereotype.Component;

/**
 * 

Job工厂

*
 * 
修改记录 *
----------------------------------------------- *
修改日期 修改人 修改内容 *
* * @author zl * @version 1.0 * @date Created in 2017/12/16 15:48 * @copyright: Copyright (c) founders */ @Component public class QuartzJobFactory extends AdaptableJobFactory { @Autowired private AutowireCapableBeanFactory capableBeanFactory; @Override protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception { Object jobInstance = super.createJobInstance(bundle); capableBeanFactory.autowireBean(jobInstance); return jobInstance; } }

QuartzConfig

import org.quartz.Scheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

import javax.sql.DataSource;

/**
 * 

Quartz配置

*
 * 
修改记录 *
----------------------------------------------- *
修改日期 修改人 修改内容 *
* * @author zl * @version 1.0 * @date Created in 2017/12/16 15:33 * @copyright: Copyright (c) founders */ @Configuration public class QuartzConfig { @Autowired DataSource dataSource; @Bean public SchedulerFactoryBean schedulerFactoryBean (QuartzJobFactory quartzJobFactory) throws Exception { SchedulerFactoryBean factoryBean=new SchedulerFactoryBean(); factoryBean.setJobFactory(quartzJobFactory); factoryBean.setConfigLocation(new ClassPathResource("quartz.properties")); factoryBean.setDataSource(dataSource); factoryBean.afterPropertiesSet(); return factoryBean; } @Bean public Scheduler scheduler(SchedulerFactoryBean schedulerFactoryBean) throws Exception { Scheduler scheduler=schedulerFactoryBean.getScheduler(); scheduler.start(); return scheduler; } }

到这里,配置部分已经结束了,还需要创建一下数据库就可以安心写具体job了。

quartz-2.3.0.jar 这个jar包的org.quartz.impl.jdbcjobstore包下有对应的各种数据库的初始化sql脚本

image.png

3、job

BaseJob

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

/**
 * 

job接口

*
 * 
修改记录 *
----------------------------------------------- *
修改日期 修改人 修改内容 *
* * @author zl * @version 1.0 * @date Created in 2017/12/16 15:57 * @copyright: Copyright (c) founders */ public interface BaseJob extends Job { @Override void execute(JobExecutionContext context) throws JobExecutionException; }

TestJob

import lombok.extern.slf4j.Slf4j;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.stereotype.Component;

/**
 * 

Test Job

*
 * 
修改记录 *
----------------------------------------------- *
修改日期 修改人 修改内容 *
* * @author zl * @version 1.0 * @date Created in 2017/12/16 16:14 * @copyright: Copyright (c) founders */ @Slf4j @Component @DisallowConcurrentExecution public class TestJob implements BaseJob { @Override public void execute(JobExecutionContext context) throws JobExecutionException { log.info("test job----PreviousFireTime={},NextFireTime={},FireTime={}" ,context.getPreviousFireTime(),context.getNextFireTime(),context.getFireTime()); } }

@DisallowConcurrentExecution 意思是不允许并发执行,也就是说当Job的执行时间(如执行完需要30s)大于job的执行时间间隔(如10s),默认情况下,quartz为了能让job按照预定的时间间隔执行,会马上启用新的线程执行job。

这个时候启动项目,然后在数据库 qrtz_cron_triggersqrtz_job_detailsqrtz_triggers 这三个表添加相应的任务和触发器,如果添加的正确,job是可以跑起来的,下面我再介绍下job的管理。

4、job管理

TaskInfoVo

import lombok.Data;
import java.util.Date;

/**
 * 

task

*
 * 
修改记录 *
----------------------------------------------- *
修改日期 修改人 修改内容 *
* * @author zl * @version 1.0 * @date Created in 2017/12/16 22:00 * @copyright: Copyright (c) founders */ @Data public class TaskInfoVo { private String jobName; private String jobGroup; private String jobDescription; private String jobStatus; private String cronExpression; private String createTime; private Date previousFireTime; private Date nextFireTime; }

JobService 这个类拿去直接是可以用的,注意我用了 lombok

import com.sunlands.zlcx.schedule.exception.BusinessException;
import com.sunlands.zlcx.schedule.vo.PageResultVO;
import com.sunlands.zlcx.schedule.vo.TaskInfoVo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;

/**
 * 

Job管理

*
 * 
修改记录 *
----------------------------------------------- *
修改日期 修改人 修改内容 *
* * @author zl * @version 1.0 * @date Created in 2017/12/16 21:58 * @copyright: Copyright (c) founders */ @Service @Slf4j public class JobService { @Autowired private Scheduler scheduler; /** * 分页查询 * * @return */ public PageResultVO list(int page, int size) { PageResultVO resultVO = new PageResultVO(); try { List list = new ArrayList(); for (String groupJob : scheduler.getJobGroupNames()) { for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.groupEquals(groupJob))) { List triggers = scheduler.getTriggersOfJob(jobKey); for (Trigger trigger : triggers) { Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey()); JobDetail jobDetail = scheduler.getJobDetail(jobKey); String cronExpression = "", createTime = ""; if (trigger instanceof CronTrigger) { CronTrigger cronTrigger = (CronTrigger) trigger; cronExpression = cronTrigger.getCronExpression(); createTime = cronTrigger.getDescription(); } TaskInfoVo info = new TaskInfoVo(); info.setJobName(jobKey.getName()); info.setJobGroup(jobKey.getGroup()); info.setJobDescription(jobDetail.getDescription()); info.setJobStatus(triggerState.name()); info.setCronExpression(cronExpression); info.setCreateTime(createTime); info.setPreviousFireTime(trigger.getPreviousFireTime()); info.setNextFireTime(trigger.getNextFireTime()); list.add(info); } } } resultVO.setTotal(list.size()); resultVO.setRows(list.stream().skip((page - 1) * size).limit(size).collect(Collectors.toList())); } catch (SchedulerException e) { log.error("分页查询定时任务失败,page={},size={},e={}", page, size, e); } return resultVO; } /** * 添加 * * @param jobName * @param jobGroup * @param cronExpression * @param jobDescription */ public void addJob(String jobName, String jobGroup, String cronExpression, String jobDescription) { if (StringUtils.isAnyBlank(jobName, jobGroup, cronExpression, jobDescription)) { throw new BusinessException(String.format("参数错误, jobName={},jobGroup={},cronExpression={},jobDescription={}", jobName, jobGroup, cronExpression, jobDescription)); } String createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); try { log.info("添加jobName={},jobGroup={},cronExpression={},jobDescription={}", jobName, jobGroup, cronExpression, jobDescription); if (checkExists(jobName, jobGroup)) { log.error("Job已经存在, jobName={},jobGroup={}", jobName, jobGroup); throw new BusinessException(String.format("Job已经存在, jobName={},jobGroup={}", jobName, jobGroup)); } TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); JobKey jobKey = JobKey.jobKey(jobName, jobGroup); CronScheduleBuilder schedBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing(); CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withDescription(createTime).withSchedule(schedBuilder).build(); Class clazz = (Class) Class.forName(jobName); JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(jobKey).withDescription(jobDescription).build(); scheduler.scheduleJob(jobDetail, trigger); } catch (SchedulerException | ClassNotFoundException e) { log.error("添加job失败, jobName={},jobGroup={},e={}", jobName, jobGroup, e); throw new BusinessException("类名不存在或执行表达式错误"); } } /** * 修改 * * @param jobName * @param jobGroup * @param cronExpression * @param jobDescription */ public void edit(String jobName, String jobGroup, String cronExpression, String jobDescription) { if (StringUtils.isAnyBlank(jobName, jobGroup, cronExpression, jobDescription)) { throw new BusinessException(String.format("参数错误, jobName={},jobGroup={},cronExpression={},jobDescription={}", jobName, jobGroup, cronExpression, jobDescription)); } String createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); try { log.info("修改jobName={},jobGroup={},cronExpression={},jobDescription={}", jobName, jobGroup, cronExpression, jobDescription); if (!checkExists(jobName, jobGroup)) { log.error("Job不存在, jobName={},jobGroup={}", jobName, jobGroup); throw new BusinessException(String.format("Job不存在, jobName={},jobGroup={}", jobName, jobGroup)); } TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); JobKey jobKey = new JobKey(jobName, jobGroup); CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing(); CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withDescription(createTime).withSchedule(cronScheduleBuilder).build(); JobDetail jobDetail = scheduler.getJobDetail(jobKey); jobDetail.getJobBuilder().withDescription(jobDescription); HashSet triggerSet = new HashSet(); triggerSet.add(cronTrigger); scheduler.scheduleJob(jobDetail, triggerSet, true); } catch (SchedulerException e) { log.error("修改job失败, jobName={},jobGroup={},e={}", jobName, jobGroup, e); throw new BusinessException("类名不存在或执行表达式错误"); } } /** * 删除 * * @param jobName * @param jobGroup */ public void delete(String jobName, String jobGroup) { try { log.info("删除jobName={},jobGroup={}", jobName, jobGroup); TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); if (checkExists(jobName, jobGroup)) { scheduler.pauseTrigger(triggerKey); scheduler.unscheduleJob(triggerKey); } } catch (SchedulerException e) { log.error("删除job失败, jobName={},jobGroup={},e={}", jobName, jobGroup, e); throw new BusinessException(e.getMessage()); } } /** * 暂停 * * @param jobName * @param jobGroup */ public void pause(String jobName, String jobGroup) { try { log.info("暂停jobName={},jobGroup={}", jobName, jobGroup); TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); if (!checkExists(jobName, jobGroup)) { log.error("Job不存在, jobName={},jobGroup={}", jobName, jobGroup); throw new BusinessException(String.format("Job不存在, jobName={},jobGroup={}", jobName, jobGroup)); } scheduler.pauseTrigger(triggerKey); } catch (SchedulerException e) { log.error("暂停job失败, jobName={},jobGroup={},e={}", jobName, jobGroup, e); throw new BusinessException(e.getMessage()); } } /** * 重启 * * @param jobName * @param jobGroup */ public void resume(String jobName, String jobGroup) { try { log.info("重启jobName={},jobGroup={}", jobName, jobGroup); TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); if (!checkExists(jobName, jobGroup)) { log.error("Job不存在, jobName={},jobGroup={}", jobName, jobGroup); throw new BusinessException(String.format("Job不存在, jobName={},jobGroup={}", jobName, jobGroup)); } scheduler.resumeTrigger(triggerKey); } catch (SchedulerException e) { log.error("重启job失败, jobName={},jobGroup={},e={}", jobName, jobGroup, e); throw new BusinessException(e.getMessage()); } } /** * 立即执行 * * @param jobName * @param jobGroup */ public void trigger(String jobName, String jobGroup) { try { log.info("立即执行jobName={},jobGroup={}", jobName, jobGroup); if (!checkExists(jobName, jobGroup)) { log.error("Job不存在, jobName={},jobGroup={}", jobName, jobGroup); throw new BusinessException(String.format("Job不存在, jobName={},jobGroup={}", jobName, jobGroup)); } JobKey jobKey = new JobKey(jobName, jobGroup); scheduler.triggerJob(jobKey); } catch (SchedulerException e) { log.error("立即执行job失败, jobName={},jobGroup={},e={}", jobName, jobGroup, e); throw new BusinessException(e.getMessage()); } } /** * 验证是否存在 * * @param jobName * @param jobGroup * @throws SchedulerException */ private boolean checkExists(String jobName, String jobGroup) throws SchedulerException { TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); return scheduler.checkExists(triggerKey); } }

Controller和页面我就不拿出来了,自行写一下即可,可以写个测试类跑一下。

简书稿源:简书 (源链) | 关于 | 阅读提示

本站遵循[CC BY-NC-SA 4.0]。如您有版权、意见投诉等问题,请通过eMail联系我们处理。
酷辣虫 » 后端存储 » spring boot 集成 quartz 集群 最佳实践—支持动态修改job

喜欢 (0)or分享给?

专业 x 专注 x 聚合 x 分享 CC BY-NC-SA 4.0

使用声明 | 英豪名录