xxl-job中的线程们
启动后,从控制台看看admin调度中心
"xxl-job, admin JobFailMonitorHelper"@6,307 in group "main": SLEEPING
"xxl-job, admin JobLogReportHelper"@6,321 in group "main": SLEEPING
"xxl-job, admin JobLosedMonitorHelper"@6,316 in group "main": SLEEPING
"xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread"@6,303 in group "main": SLEEPING
"xxl-job, admin JobScheduleHelper#ringThread"@6,331 in group "main": SLEEPING
"xxl-job, admin JobScheduleHelper#scheduleThread"@6,327 in group "main": SLEEPING
"xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-1366916281"@8,303 in group "main": WAIT
"xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-1831119422"@8,219 in group "main": WAIT
"xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-423903477"@8,273 in group "main": WAIT
"xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-673589758"@8,270 in group "main": WAIT
"xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-703679237"@8,271 in group "main": WAIT
这里其实可以看到一个比较好的设计点,是针对自己服务创建的线程都自定义了名称,这样对于阅读有帮助。同时线上出现问题的话,也能根据日志快速定位
- JobFailMonitorHelper 任务失败监控
- JobLogReportHelper 日志
- JobLosedMonitorHelper 任务失去监控
- JobRegistryMonitorHelper-registryMonitorThread 任务注册
- JobRegistryMonitorHelper
- JobScheduleHelper 任务调度?
- JobTriggerPoolHelper-fastTriggerPool 任务
大致流程图
最新版可以查看,https://www.processon.com/view/link/604e0b860791291f25613424,密码 xiaowan
JobScheduleHelper
比较重要的几个变量
public class JobScheduleHelper {
private Thread scheduleThread;
private Thread ringThread;
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
}
scheduleThread
大体上的逻辑为:
- 加锁
- 执行调度
- 查询出 trigger_next_time 小于 往后5秒的时间的数据(当前时间30,那就查出这个字段小于 35的数据)
- 判断任务执行时间,3个判断,这里假设 getTriggerNextTime = 30,preRead = 5,当前时间为now = 37,now=32,now = 29
- 如果当前时间是37,这个任务超时5秒钟,当次调不调度根,据自己配置(调度过期策略=忽略/立即执行一次)
- 如果当前时间是32,超时在5秒以内,那就立即执行,然后判断下一次执行是否在5s钟以内,看是否放到时间轮里面
- 如果当前时间是29,还没到执行时间,直接放到时间轮里面
- 更新数据库中的任务信息
- commit
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
while (!scheduleThreadToStop) {
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
long nowTime = System.currentTimeMillis();
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
for (XxlJobInfo jobInfo: scheduleList) {
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
}
refreshNextValidTime(jobInfo, new Date());
}
else if (nowTime > jobInfo.getTriggerNextTime()) {
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
refreshNextValidTime(jobInfo, new Date());
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
pushTimeRing(ringSecond, jobInfo.getId());
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
else {
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
pushTimeRing(ringSecond, jobInfo.getId());
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false;
}
} catch (Exception e) {
if (!scheduleThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
}
} finally {
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
ringThread
- ringThread 判断 ringData 中的数据,进行调度
- 将所有需要调度的数据,从ringData取出,key:当前秒和下一秒
- 循环,调用tigger
ringThread = new Thread(new Runnable() {
@Override
public void run() {
while (!ringThreadToStop) {
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
if (ringItemData.size() > 0) {
for (int jobId: ringItemData) {
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
});
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
JobTriggerPoolHelper
- start 初始化2个线程池
- trigger–》addTrigger
- addTrigger 选一个线程池进行调度
- XxlJobTrigger#trigger 检查job/分片等
- processTrigger 保存日志, 初始化trigger的参数,找一个地址,远程调用 ,保存结果,日志
- runExecutor
- executorBiz.run(triggerParam)
private ThreadPoolExecutor fastTriggerPool = null;
private ThreadPoolExecutor slowTriggerPool = null;
public void start(){
fastTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
}
});
slowTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
}
});
}
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
}
public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList) {
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {
triggerPool_ = slowTriggerPool;
}
triggerPool_.execute(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
try {
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
}
}
});
}
JobRegistryHelper
registryMonitorThread
- 移除掉多次没有心跳的客户端,默认心跳时间30s,移除时间是3 * 30秒
- 刷新在线的执行器信息
- 刷新在地址group信息
registryMonitorThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
for (XxlJobRegistry item: list) {
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
String appname = item.getRegistryKey();
List<String> registryList = appAddressMap.get(appname);
if (registryList == null) {
registryList = new ArrayList<String>();
}
if (!registryList.contains(item.getRegistryValue())) {
registryList.add(item.getRegistryValue());
}
appAddressMap.put(appname, registryList);
}
}
}
for (XxlJobGroup group: groupList) {
List<String> registryList = appAddressMap.get(group.getAppname());
String addressListStr = null;
if (registryList!=null && !registryList.isEmpty()) {
Collections.sort(registryList);
StringBuilder addressListSB = new StringBuilder();
for (String item:registryList) {
addressListSB.append(item).append(",");
}
addressListStr = addressListSB.toString();
addressListStr = addressListStr.substring(0, addressListStr.length()-1);
}
group.setAddressList(addressListStr);
group.setUpdateTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
try {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
}
});
registryMonitorThread.setDaemon(true);
registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
registryMonitorThread.start();
registryOrRemoveThreadPool 添加/更新/删除注册信息时调用的线程池
- 这里注册和删除都是通过这个线程池处理
- 暴露registry和registryRemove方法
registryOrRemoveThreadPool = new ThreadPoolExecutor(
2, 10, 30L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
r.run();
logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
}
});
public ReturnT<String> registry(RegistryParam registryParam) {
registryOrRemoveThreadPool.execute(new Runnable() {
@Override
public void run() {
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
if (ret < 1) { XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
freshGroupRegistryInfo(registryParam);
}
}
});
return ReturnT.SUCCESS;
}
public ReturnT<String> registryRemove(RegistryParam registryParam) {
registryOrRemoveThreadPool.execute(new Runnable() {
@Override
public void run() {
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
if (ret > 0) {
freshGroupRegistryInfo(registryParam);
}
}
});
return ReturnT.SUCCESS;
}
JobLogReportHelper 一分钟执行一次
执行情况统计报告
数据库job的执行log清理
JobFailMonitorHelper
任务失败监控,
相关文章
分布式作业调度(定时任务)系统xxl-job快速上手及高级功能简述
xxl-job源码之admin调度中心的线程们
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)