xxl-job源码之admin调度中心的线程们

2023-05-16

xxl-job中的线程们

启动后,从控制台看看admin调度中心

在这里插入图片描述

"xxl-job, admin JobFailMonitorHelper"@6,307 in group "main": SLEEPING
"xxl-job, admin JobLogReportHelper"@6,321 in group "main": SLEEPING
"xxl-job, admin JobLosedMonitorHelper"@6,316 in group "main": SLEEPING
"xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread"@6,303 in group "main": SLEEPING
"xxl-job, admin JobScheduleHelper#ringThread"@6,331 in group "main": SLEEPING
"xxl-job, admin JobScheduleHelper#scheduleThread"@6,327 in group "main": SLEEPING
"xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-1366916281"@8,303 in group "main": WAIT
"xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-1831119422"@8,219 in group "main": WAIT
"xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-423903477"@8,273 in group "main": WAIT
"xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-673589758"@8,270 in group "main": WAIT
"xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-703679237"@8,271 in group "main": WAIT

这里其实可以看到一个比较好的设计点,是针对自己服务创建的线程都自定义了名称,这样对于阅读有帮助。同时线上出现问题的话,也能根据日志快速定位

  • JobFailMonitorHelper 任务失败监控
  • JobLogReportHelper 日志
  • JobLosedMonitorHelper 任务失去监控
  • JobRegistryMonitorHelper-registryMonitorThread 任务注册
  • JobRegistryMonitorHelper
  • JobScheduleHelper 任务调度?
  • JobTriggerPoolHelper-fastTriggerPool 任务

大致流程图

最新版可以查看,https://www.processon.com/view/link/604e0b860791291f25613424,密码 xiaowan
在这里插入图片描述

JobScheduleHelper

比较重要的几个变量

public class JobScheduleHelper {
    // 调度线程
    private Thread scheduleThread;
    // 时间轮调度线程
    private Thread ringThread;
    // 时间轮那个对象
    private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
}

scheduleThread

大体上的逻辑为:

  1. 加锁
  2. 执行调度
    1. 查询出 trigger_next_time 小于 往后5秒的时间的数据(当前时间30,那就查出这个字段小于 35的数据)
    2. 判断任务执行时间,3个判断,这里假设 getTriggerNextTime = 30,preRead = 5,当前时间为now = 37,now=32,now = 29
      1. 如果当前时间是37,这个任务超时5秒钟,当次调不调度根,据自己配置(调度过期策略=忽略/立即执行一次)
      2. 如果当前时间是32,超时在5秒以内,那就立即执行,然后判断下一次执行是否在5s钟以内,看是否放到时间轮里面
      3. 如果当前时间是29,还没到执行时间,直接放到时间轮里面
  3. 更新数据库中的任务信息
  4. commit
// schedule thread
        scheduleThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (!scheduleThreadToStop) {
                    // Scan Job
                    long start = System.currentTimeMillis();

                    Connection conn = null;
                    Boolean connAutoCommit = null;
                    PreparedStatement preparedStatement = null;
                    // 上一次是否调度成功
                    boolean preReadSuc = true;
                    try {
                        conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                        connAutoCommit = conn.getAutoCommit();
                        conn.setAutoCommit(false);
                        // 数据库锁,多服务并发
                        preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                        preparedStatement.execute();

                        // 1、pre read
                        long nowTime = System.currentTimeMillis();
                        // 查数据库,获取当前往后五秒内的将要执行的任务
                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                        if (scheduleList!=null && scheduleList.size()>0) {
                            // 2、push time-ring
                            // 这个最终会将需要执行的任务,进行调用
                            for (XxlJobInfo jobInfo: scheduleList) {
//                               getTriggerNextTime = 30,pre = 5
//                               now = 37,now=32,now = 29

                                // time-ring jump
                                // 37>30+5? 这种应该是查询太久,可能admin挂了,跳过了这次执行? misfire
                                // 这里就是可以配置,当漏掉某个任务后,的执行策略
                                if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                    // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                                    logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

                                    // 1、misfire match
                                    MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                                    if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                                        // FIRE_ONCE_NOW 》 trigger
                                        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                                        logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                                    }

                                    // 2、fresh next
                                    refreshNextValidTime(jobInfo, new Date());

                                }
                                // 这个分支是32>30,延迟了几秒了
                                else if (nowTime > jobInfo.getTriggerNextTime()) {
                                    // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time

                                    // 1、trigger
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                    logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

                                    // 2、fresh next
                                    refreshNextValidTime(jobInfo, new Date());

                                    // next-trigger-time in 5s, pre-read again
                                    // 看下下次时间是否在5s内,扔到 时间轮中
                                    if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
                                        // 1、make ring second
                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
                                        // 2、push time ring
                                        pushTimeRing(ringSecond, jobInfo.getId());
                                        // 3、fresh next
                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
                                    }
                                }
                                // 这里分支是 29<30,所以还没到需要调度的时间
                                // 所以把这个任务放到时间轮中,等着ringThread调度
                                else {
                                    // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
                                    // 1、make ring second
                                    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
                                    // 2、push time ring
                                    pushTimeRing(ringSecond, jobInfo.getId());
                                    // 3、fresh next
                                    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
                                }
                            }
                            // 3、update trigger info
                            // 更新调度信息
                            for (XxlJobInfo jobInfo: scheduleList) {
                                XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                            }
                        } else {
                            preReadSuc = false;
                        }
                    } catch (Exception e) {
                        if (!scheduleThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                        }
                    } finally {
                     }
                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
            }
        });
        scheduleThread.setDaemon(true);
        scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
        scheduleThread.start();

ringThread

  1. ringThread 判断 ringData 中的数据,进行调度
    1. 将所有需要调度的数据,从ringData取出,key:当前秒和下一秒
    2. 循环,调用tigger
// ring thread
        ringThread = new Thread(new Runnable() {
            @Override
            public void run() {

                while (!ringThreadToStop) {

                    // align second
                    try {
                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                    } catch (InterruptedException e) {
                        if (!ringThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                    try {
                        // second data
                        List<Integer> ringItemData = new ArrayList<>();
                        // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   
                        for (int i = 0; i < 2; i++) {
                            List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                            if (tmpData != null) {
                                ringItemData.addAll(tmpData);
                            }
                        }

                        // ring trigger
                        logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                        if (ringItemData.size() > 0) {
                            // do trigger
                            for (int jobId: ringItemData) {
                                // do trigger
                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                            }
                            // clear
                            ringItemData.clear();
                        }
                    } catch (Exception e) {
                        if (!ringThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
            }
        });
        ringThread.setDaemon(true);
        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
        ringThread.start();

JobTriggerPoolHelper

  1. start 初始化2个线程池
  2. trigger–》addTrigger
    • addTrigger 选一个线程池进行调度
      • XxlJobTrigger#trigger 检查job/分片等
        • processTrigger 保存日志, 初始化trigger的参数,找一个地址,远程调用 ,保存结果,日志
          • runExecutor
            • executorBiz.run(triggerParam)

    private ThreadPoolExecutor fastTriggerPool = null;
    private ThreadPoolExecutor slowTriggerPool = null;

    public void start(){
        fastTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(1000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
                    }
                });

        slowTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
                    }
                });
   	}

    public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
        helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
    }	
public void addTrigger(final int jobId,
                           final TriggerTypeEnum triggerType,
                           final int failRetryCount,
                           final String executorShardingParam,
                           final String executorParam,
                           final String addressList) {

        // choose thread pool
        // 这里选择一个执行rpc请求的线程池
        ThreadPoolExecutor triggerPool_ = fastTriggerPool;
        AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
        if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
            triggerPool_ = slowTriggerPool;
        }

        // trigger
        triggerPool_.execute(new Runnable() {
            @Override
            public void run() {
                long start = System.currentTimeMillis();
                try {
                    // do trigger
                    // 最重要的地方,进行调用
                    XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                } finally {
                }

            }
        });
    }

JobRegistryHelper

registryMonitorThread

  1. 移除掉多次没有心跳的客户端,默认心跳时间30s,移除时间是3 * 30秒
  2. 刷新在线的执行器信息
  3. 刷新在地址group信息
// for monitor
		registryMonitorThread = new Thread(new Runnable() {
			@Override
			public void run() {
				while (!toStop) {
					try {
						// auto registry group
						List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
						if (groupList!=null && !groupList.isEmpty()) {

							// remove dead address (admin/executor)
							List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
							if (ids!=null && ids.size()>0) {
								XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
							}

							// fresh online address (admin/executor)
							HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
							List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
							if (list != null) {
								for (XxlJobRegistry item: list) {
									if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
										String appname = item.getRegistryKey();
										List<String> registryList = appAddressMap.get(appname);
										if (registryList == null) {
											registryList = new ArrayList<String>();
										}

										if (!registryList.contains(item.getRegistryValue())) {
											registryList.add(item.getRegistryValue());
										}
										appAddressMap.put(appname, registryList);
									}
								}
							}

							// fresh group address
							for (XxlJobGroup group: groupList) {
								List<String> registryList = appAddressMap.get(group.getAppname());
								String addressListStr = null;
								if (registryList!=null && !registryList.isEmpty()) {
									Collections.sort(registryList);
									StringBuilder addressListSB = new StringBuilder();
									for (String item:registryList) {
										addressListSB.append(item).append(",");
									}
									addressListStr = addressListSB.toString();
									addressListStr = addressListStr.substring(0, addressListStr.length()-1);
								}
								group.setAddressList(addressListStr);
								group.setUpdateTime(new Date());

								XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
							}
						}
					} catch (Exception e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
						}
					}
					try {
						TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
					} catch (InterruptedException e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
						}
					}
				}
				logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
			}
		});
		registryMonitorThread.setDaemon(true);
		registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
		registryMonitorThread.start();

registryOrRemoveThreadPool 添加/更新/删除注册信息时调用的线程池

  1. 这里注册和删除都是通过这个线程池处理
  2. 暴露registry和registryRemove方法
// for registry or remove
		registryOrRemoveThreadPool = new ThreadPoolExecutor(
				2,				10,				30L,				TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>(2000),
				new ThreadFactory() {
					@Override
					public Thread newThread(Runnable r) {
						return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
					}
				},
				new RejectedExecutionHandler() {
					@Override
					public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
						r.run();
						logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
					}
				});

public ReturnT<String> registry(RegistryParam registryParam) {
		// async execute
		registryOrRemoveThreadPool.execute(new Runnable() {
			@Override
			public void run() {
				int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
				if (ret < 1) {					XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
					// fresh
					freshGroupRegistryInfo(registryParam);
				}
			}
		});

		return ReturnT.SUCCESS;
	}

	public ReturnT<String> registryRemove(RegistryParam registryParam) {
		// async execute
		registryOrRemoveThreadPool.execute(new Runnable() {
			@Override
			public void run() {
				int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
				if (ret > 0) {
					// fresh
					freshGroupRegistryInfo(registryParam);
				}
			}
		});

		return ReturnT.SUCCESS;
	}

JobLogReportHelper 一分钟执行一次

执行情况统计报告
数据库job的执行log清理

JobFailMonitorHelper

任务失败监控,

相关文章

分布式作业调度(定时任务)系统xxl-job快速上手及高级功能简述

xxl-job源码之admin调度中心的线程们

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

xxl-job源码之admin调度中心的线程们 的相关文章

  • Flink:job报错NoResourceAvailableException: Could not acquire the minimum required resources

    flink conf yaml中修改下边的参数 taskmanager network memory min taskmanager network memory max
  • vue-element-admin的二次开发

    最近也是完成了公司招聘管理系统后台的前端开发 xff0c 项目已经开始测试了近期估计就会交付使用 一直是一个人在做 xff0c 配合两个后端 xff0c 说实话这种从很多不会到一个个独立debug解决问题到最后终于完成项目的感觉真的太有成就
  • vue-element-admin执行npm install时的一些报错。

    文章目录 1 首先在gitee上拉取的中文版2 执行npm install的一些报错3 参考文章 1 首先在gitee上拉取的中文版 git clone b i18n https gitee com panjiachen vue eleme
  • enncy-admin ant design vue 后台管理系统脚手架

    github 项目地址 https github com enncy enncy admin vue3 版本的请看我的另一个项目 https github com enncy funny blog admin 在 template 分支你可
  • k8s job机制初探

    博客作为学习笔记记录 若有理解或表述错误 欢迎指出 k8s的job机制 k8s官网参考 k8s的job是用来执行一次性任务的一类资源 相关的还有cronjob 用于执行以下周期性任务 部署job之后 k8s会起对应pod 当pod的状态为f
  • Tomcat 默认管理员密码是什么

    安装新的 Tomcat 服务器后 默认情况下不会创建用于访问 Administrator 和 Manager Web 界面的用户 完成安装后 设置 Tomcat Admin 和 Manager 用户帐户并设置其密码 您还可以访问我们以下有关
  • 面经-Bosch博世无锡&UL美华

    工作总算有所眉目了 太多的总结暂时还没有太多心情来理清楚 先来两个面经 给可能现在或以后需要的人们1 Bosch 博世无锡柴油系统博世公司 业内的人都知道 汽车部件的巨无霸 最近几年才来到中国 虽然比起德尔福有些稍晚 但发展前景值得期待 无
  • 如何从以管理员用户身份运行的 C# 代码重新启动 IIS?

    通常 在 Windows 7 中 安装程序会请求修改系统的权限 作为管理员 我可以在不提供密码的情况下授予授权 我试图弄清楚如何从以 AN 管理员用户身份运行的 C 代码中执行管理员操作 重新启动 IIS 但不是 管理员 帐户 要运行提升的
  • 拖放在 C# 中不起作用

    我在 C 中创建了一个拖放控件 以允许人们将文件拖放到我的表单上 这是我遇到的问题 调试时工作正常 但是 当以管理员模式运行我的程序时 它不起作用 这有什么原因吗 这是我的代码 private void panel1 DragEnter o
  • Rails 4 设计强大的参数管理模型

    我已经使用设计创建了用户和管理模型 我在中使用了强参数应用程序 控制器 application controller rb file class ApplicationController lt ActionController Base
  • 如何在管理面板中创建自定义文本表单并将其显示在我的页面上(Wordpress)

    介绍 目前我正在创建我的第一个自定义 WordPress 主题 现在我成功创建了一个 HTML CSS 模板并将其转换为适合 WordPress 包括 header php index php footer php functions ph
  • 更改 Django 管理中日期字段的默认小部件

    如何更改 Django ADMIN 中 DateField 实例的默认小部件 我知道如何为 ModelForm 执行此操作 如何更改 ModelForm 中所有 Django 日期字段的默认小部件 https stackoverflow c
  • Geodjango 管理,显示点域而不是地图

    这可能是一个愚蠢的问题 但我找不到任何明确的答案 如何更改 Django 管理中的显示 以便 Pointfield 不会像 OpenLayer 地图那样显示 而是作为常规输入字段显示 我需要查看长纬度以进行调试 我必须更改字段类型吗 小部件
  • 管理员登录停止运行 Django

    我在我的项目上工作了一段时间 最近注意到当我尝试访问 localhost admin 时 它给出了一个错误 DoesNotExist at admin Site matching query does not exist Request M
  • 使用管理员权限执行 shell 命令时,Applescript 应用程序在 10.9 上挂起

    我正在 Applescript 应用程序中执行以下行 set POSIX path to Applications iPhoto app do shell script sudo rm rfv quoted form of POSIX pa
  • 无法登录 Magento 管理员

    我已将 magento 安装在子目录中 www domain com subdir magento 该网站一度运行得非常完美 我什么也没做 直到我的客户说他无法登录到 magento admin 我从我的电脑上登录得很好 但在他的电脑上 它
  • 在 Woocommerce 的管理订单页面上添加城市下拉列表

    我想将城市下拉列表添加到 woocommerce 中的新订单页面 我知道如何将此功能添加到结帐页面 但在这里我想添加此功能管理新订单页面在 Woocommerce 中 See example image for reference 使用以下
  • Joomla 2.5创建组件并保存数据

    我一直在使用这个文档 我在网上可以找到的唯一文档 来构建一个组件 http docs joomla org Developing a Model View Controller Component 2 5 Introduction http
  • 在 django admin 过滤器 list_filter 中选择多个选项?

    目前我通过 django 管理界面中的某些选项进行过滤 例如 假设我按 按状态 过滤 是否可以选择多个状态来过滤结果 这是过滤器的屏幕截图 我可以从此列表中选择多个项目吗 不在管理 UI 中 但如果修改 URL 则可以使过滤条件更加复杂 例
  • Django:如何从管理界面调用管理自定义命令执行?

    参考 从代码执行管理命令 https stackoverflow com questions 907506 how can i call a custom django manage py command directly from a t

随机推荐