文章目录
- xxl-job介绍
- 系统组成
- 为什么不使用quartz
- 过期处理策略
- 避免任务重复执行
- 源码分析
xxl-job介绍
XXL-JOB是一个轻量级分布式任务调度平台,它的核心设计目标是开发迅速、学习简单、轻量级、易扩展。
1.简单易用:XXL-JOB提供了友好的Web界面,使得用户可以通过简单的操作完成任务的创建、编辑、删除和查询。
2.动态管理:支持动态修改任务状态、启动/停止任务,以及终止运行中任务,即时生效。
3.调度中心高可用(HA):调度采用中心式设计,支持集群部署,可保证调度中心的高可用性。
4.执行器高可用(HA):任务分布式执行,执行器支持集群部署,可保证任务执行的高可用性。
5.注册中心:执行器会周期性自动注册任务,调度中心将会自动发现注册的任务并触发执行。同时,也支持手动录入执行器地址。
6.弹性扩容缩容:一旦有新执行器机器上线或者下线,下次调度时将会重新分配任务。
7.路由策略:执行器集群部署时提供丰富的路由策略,包括第一个、最后一个、轮询、随机、一致性HASH、最不经常使用、最近最久未使用、故障转移、忙碌转移等。
8.故障转移:任务路由策略选择"故障转移"情况下,如果执行器集群中某一台机器故障,将会自动Failover切换到一台正常的执行器发送调度请求。
9.执行失败查看日志:执行失败的任务可以查看日志。
10.支持邮件报警:XXL-JOB支持邮件报警,可以在任务执行失败时发送邮件通知。
11.跨平台任务调度:支持通过通用HTTP提供跨平台任务调度。
12.任务级联编排:支持父任务执行结束后触发子任务执行。
13.设置任务优先级:支持设置指定任务执行节点路由策略,包括多种调度策略。
系统组成
调度模块(调度中心):
负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;
支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。
执行模块(执行器):
负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;
接收“调度中心”的执行请求、终止请求和日志请求等。
为什么不使用quartz
Quartz作为开源作业调度中的佼佼者,是作业调度的首选。但是集群环境中Quartz采用API的方式对任务进行管理,从而可以避免上述问题,但是同样存在以下问题:
问题一:调用API的的方式操作任务,不人性化;
问题二:需要持久化业务QuartzJobBean到底层数据表中,系统侵入性相当严重。
问题三:调度逻辑和QuartzJobBean耦合在同一个项目中,这将导致一个问题,在调度任务数量逐渐增多,同时调度任务逻辑逐渐加重的情况下,此时调度系统的性能将大大受限于业务;
问题四:quartz底层以“抢占式”获取DB锁并由抢占成功节点负责运行任务,会导致节点负载悬殊非常大;而XXL-JOB通过执行器实现“协同分配式”运行任务,充分发挥集群优势,负载各节点均衡。
XXL-JOB弥补了quartz的上述不足之处。
过期处理策略
任务调度错过触发时间时的处理策略:
可能原因:服务重启;调度线程被阻塞,线程被耗尽;上次调度持续阻塞,下次调度被错过;
处理策略:
过期超5s:本次忽略,当前时间开始计算下次触发时间
过期5s内:立即触发一次,当前时间开始计算下次触发时间
避免任务重复执行
调度密集或者耗时任务可能会导致任务阻塞,集群情况下调度组件小概率情况下会重复触发;
针对上述情况,可以通过结合 “单机路由策略(如:第一台、一致性哈希)” + “阻塞策略(如:单机串行、丢弃后续调度)” 来规避,最终避免任务重复执行。
源码分析
定时任务的核心入口
java">package com.xxl.job.admin.core.conf;
import com.xxl.job.admin.core.alarm.JobAlarmer;
import com.xxl.job.admin.core.scheduler.XxlJobScheduler;
import com.xxl.job.admin.dao.*;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.Arrays;
/**
* xxl-job config
*
* @author xuxueli 2017-04-28
*/
@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
private static XxlJobAdminConfig adminConfig = null;
public static XxlJobAdminConfig getAdminConfig() {
return adminConfig;
}
// ---------------------- XxlJobScheduler ----------------------
private XxlJobScheduler xxlJobScheduler;
@Override
public void afterPropertiesSet() throws Exception {
adminConfig = this;
//核心 项目启动时,执行init中的方法
xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
}
@Override
public void destroy() throws Exception {
xxlJobScheduler.destroy();
}
// ---------------------- XxlJobScheduler ----------------------
// conf
@Value("${xxl.job.i18n}")
private String i18n;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${spring.mail.username}")
private String emailUserName;
@Value("${xxl.job.triggerpool.fast.max}")
private int triggerPoolFastMax;
@Value("${xxl.job.triggerpool.slow.max}")
private int triggerPoolSlowMax;
@Value("${xxl.job.logretentiondays}")
private int logretentiondays;
// dao, service
@Resource
private XxlJobLogDao xxlJobLogDao;
@Resource
private XxlJobInfoDao xxlJobInfoDao;
@Resource
private XxlJobRegistryDao xxlJobRegistryDao;
@Resource
private XxlJobGroupDao xxlJobGroupDao;
@Resource
private XxlJobLogReportDao xxlJobLogReportDao;
@Resource
private JavaMailSender mailSender;
@Resource
private DataSource dataSource;
@Resource
private JobAlarmer jobAlarmer;
public String getI18n() {
if (!Arrays.asList("zh_CN", "zh_TC", "en").contains(i18n)) {
return "zh_CN";
}
return i18n;
}
public String getAccessToken() {
return accessToken;
}
public String getEmailUserName() {
return emailUserName;
}
public int getTriggerPoolFastMax() {
if (triggerPoolFastMax < 200) {
return 200;
}
return triggerPoolFastMax;
}
public int getTriggerPoolSlowMax() {
if (triggerPoolSlowMax < 100) {
return 100;
}
return triggerPoolSlowMax;
}
public int getLogretentiondays() {
if (logretentiondays < 7) {
return -1; // Limit greater than or equal to 7, otherwise close
}
return logretentiondays;
}
public XxlJobLogDao getXxlJobLogDao() {
return xxlJobLogDao;
}
public XxlJobInfoDao getXxlJobInfoDao() {
return xxlJobInfoDao;
}
public XxlJobRegistryDao getXxlJobRegistryDao() {
return xxlJobRegistryDao;
}
public XxlJobGroupDao getXxlJobGroupDao() {
return xxlJobGroupDao;
}
public XxlJobLogReportDao getXxlJobLogReportDao() {
return xxlJobLogReportDao;
}
public JavaMailSender getMailSender() {
return mailSender;
}
public DataSource getDataSource() {
return dataSource;
}
public JobAlarmer getJobAlarmer() {
return jobAlarmer;
}
}
init方法
java">public class XxlJobScheduler {
private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);
public void init() throws Exception {
// init i18n 国际化
initI18n();
// admin registry monitor run
//注册监控线程
JobRegistryMonitorHelper.getInstance().start();
// admin fail-monitor run
//注册失败监控线程
JobFailMonitorHelper.getInstance().start();
// admin lose-monitor run
//用于监控和处理“丢失”的任务。这些所谓的“丢失”任务可能是指那些由于执行器(Executor)宕机、网络分区或其他异常情况导致未能成功回调调度中心(Admin)的任务。
JobLosedMonitorHelper.getInstance().start();
// admin trigger pool start
//启动任务触发的线程池
JobTriggerPoolHelper.toStart();
// admin log report start
//启动日志报表任务
JobLogReportHelper.getInstance().start();
// start-schedule
//开启任务调度线程
JobScheduleHelper.getInstance().start();
//调度器执行成功
logger.info(">>>>>>>>> init xxl-job admin success.");
}
}