elasticjob 源码分析

2023-10-26

简介

elasticjob是基于quartz构建支持分片的分布式弹性可伸缩的job执行组件

zookeeper节点数据设计
job
   leader
        election
            latch
            instance  //主节点的实例ID  临时节点  在节点选举成功后添加
        sharding
            necessary
            processing //临时节点标记 分片是否正在进行
   servers
        10.2.123.152
        123.254.26.23
   instances
        456  //临时节点  运行实例
        235
   sharding
        0
           instance = 10.7.1.2@-@456
           running  //标记该分片的状态正在运行 
        1
           instance = 10.7.1.2@-@456

1 在线的实例节点设计为临时节点

    public void persistOnline() {
        jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstanceNode(), "");
    }

2 标记分片正在进行中的标识

jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
public void shardingIfNecessary() {
        List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
        if (!isNeedSharding() || availableJobInstances.isEmpty()) {
            return;
        }
        if (!leaderService.isLeaderUntilBlock()) {
            blockUntilShardingCompleted();
            return;
        }
        waitingOtherShardingItemCompleted();
        LiteJobConfiguration liteJobConfig = configService.load(false);
        int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
        log.debug("Job '{}' sharding begin.", jobName);
        jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
        resetShardingInfo(shardingTotalCount);
        JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());
        jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
        log.debug("Job '{}' sharding complete.", jobName);
    }

分片事务结束时候需要删除节点

curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.PROCESSING)).and();

3 实例正在运行的状态的标记

public void registerJobBegin(final ShardingContexts shardingContexts) {
        JobRegistry.getInstance().setJobRunning(jobName, true);
        if (!configService.load(true).isMonitorExecution()) {
            return;
        }
        for (int each : shardingContexts.getShardingItemParameters().keySet()) {
            jobNodeStorage.fillEphemeralJobNode(ShardingNode.getRunningNode(each), "");
        }
    }

任务开始执行时候即会注册相应的实例分片运行状态

 private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
        if (shardingContexts.getShardingItemParameters().isEmpty()) {
            if (shardingContexts.isAllowSendJobEvent()) {
                jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
            }
            return;
        }
        jobFacade.registerJobBegin(shardingContexts);
        String taskId = shardingContexts.getTaskId();
        if (shardingContexts.isAllowSendJobEvent()) {
            jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
        }
        try {
            process(shardingContexts, executionSource);
        } finally {
            // TODO 考虑增加作业失败的状态,并且考虑如何处理作业失败的整体回路
            jobFacade.registerJobCompleted(shardingContexts);
            if (itemErrorMessages.isEmpty()) {
                if (shardingContexts.isAllowSendJobEvent()) {
                    jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
                }
            } else {
                if (shardingContexts.isAllowSendJobEvent()) {
                    jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
                }
            }
        }
    }
原理分析之初始化

使用时我们定义执行ElasticJob,但是ElasticJob底层执行的必然是quartz的Job,在源码中是 LiteJob

public final class LiteJob implements Job {
   
   @Setter
   private ElasticJob elasticJob;
   
   @Setter
   private JobFacade jobFacade;
   
   @Override
   public void execute(final JobExecutionContext context) throws JobExecutionException {
       JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
   }
}

那LiteJob 是如何被初始化创建并start的,实际上 elasticjob中具体的任务将会被封

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

elasticjob 源码分析 的相关文章

  • openstack zun源码分析

    容器服务启动过程 项目包括三个服务 xff0c 分别是zun api xff0c zun wsproxy xff0c zun compute xff0c 均使用systemctl来管理启动停止 xff0c 相关的服务文件如 zun api
  • 从APM源码分析GPS、气压计惯导融合

    最近事多 xff0c 忙着开源自研飞控 xff0c 现主要工作基本已经完成 xff0c 代码最迟下月中旬开放 xff0c 博客来不及更新 xff0c 还请各位见谅 xff0c 后面会抽空多更的咯 xff01 xff01 xff01 自研飞控
  • ViewModel源码分析

    首先 xff0c 还是先看一个例子 xff1a public class MyViewModel extends ViewModel private MutableLiveData lt List lt User gt gt users p
  • Android Volley源码分析(1)

    1 Volley newRequestQueue 我们从Volley中RequestQueue的初始化入手 xff0c 开始进行分析 应用利用Volley java的静态方法 xff0c 获取RequestQueue xff0c 开启使用V
  • RxJava 2.x 源码分析 之 FlatMap

    FlatMap 官方定义 xff1a 把被观察者发射出去的事件转化成新的子被观察者 xff0c 然后把这些发射量展开平铺后统一放到一个被观察者中 官方文档 简单来讲就是把被观察者每次发射的事件转化成一个子被观察者 xff0c 然后通过合并
  • Android Handler深入学习(源码分析)

    目录 xff1a 1 背景 在分析源码之前 xff0c 先来了解一下Message MessageQueue Looper这几个对象 1 1 Message 消息 定义 xff1a 是线程间通讯的数据单元 xff0c 包含着描述信息及任意数
  • 【containerd 源码分析】containerd image pull 源码分析

    本文分析 containerd pull 镜像的分析过程 xff0c 包括 ctr image 命令行以及 containerd daemon 执行 过程 xff0c 也包含镜像 metadata xff0c content 等内容 1 执
  • GCC源码分析(十三) — 机器描述文件

    版权声明 xff1a 本文为CSDN博主 ashimida 64 的原创文章 xff0c 遵循CC 4 0 BY SA版权协议 xff0c 转载请附上原文出处链接及本声明 原文链接 xff1a https blog csdn net lid
  • lemon源码分析

    基本概念见上篇 lemon源码基本概念整理 1 follow集 对于如下4条产生式 program 61 expr TK SEM expr 61 expr TK IMPL expr expr 61 TK LPAREN expr TK RPA
  • px4flow源码分析

    Flow c 计算光流用的是 SAD块匹配算法 第一部分是生成直方图 xff0c 第二部分是根据直方图来进行位移向量的计算 外部的 j i的for循环是采样点的循环 xff0c 内部的jj ii的循环是对于一个小邻域的采样 采样点是从 im
  • PX4源码分析7_添加mavlink自定义消息

    一 自定义mavlink消息 xff1a 根据uorb消息 xff08 msg xff09 自定义mavlink消息 方法为利用mavlink generator工具在xml文件生成mavlink所需相应的头文件 二 发送自定义mavlin
  • 【死磕 Java 集合】— ConcurrentSkipListMap源码分析

    转自 xff1a http cmsblogs com p 61 4773 隐藏目录 前情提要简介存储结构源码分析 主要内部类构造方法添加元素添加元素举例删除元素删除元素举例查找元素查找元素举例彩蛋 作者 xff1a 彤哥 出处 xff1a
  • 【ROS】源码分析-消息订阅与发布

    说明 本文通过NodeHandle subscribe和Publication publish 源码作为入口 xff0c 来分析PubNode SubNode之间是网络连接是如何建立的 xff0c 消息是如何发布的 xff0c topic队
  • 源码分享-go语言实现的snow3g加密算法

    源码路径 free5gc nas security snow3g snow3g go package snow3g var sr byte 0x63 0x7c 0x77 0x7b 0xf2 0x6b 0x6f 0xc5 0x30 0x01
  • 记一次修改DiyBox的经历(openwrt固件解包与打包)

    吐槽几句 做技术的有无私造福人类的 也有耍流氓坑人的 说的不是DiyBox 而是 信利 信利就是一家犯贱 祸害大学生 助纣为虐的流氓公司 其所谓的 防私接 技术让电信和移动牢牢的垄断着学校的宽带资源 还让学生花费了大量的冤枉钱 当然 有狗熊
  • Mybatis执行过程源码解析

    使用Mybatis执行查询sql代码示例 SqlSessionFactory sqlSessionFactory new SqlSessionFactoryBuilder build Resources getResourceAsReade
  • kaldi中SHELL调用C++程序过程源码分析

    引入 kaldi真正的核心源码 都是C 写成的 这个结论可以从如下两点得以确认 1 在kaldi的源码kaldi src目录下 能看到很多扩展名为 cc的源程序 这是linux下C 源码 2 在源码中 比如kaldi src featbin
  • Mybatis整合Spring源码分析

    一 整合配置 POM
  • AndroidO audio系统之AudioPolicyService分析(三)

    1 AudioPolicyService基础 AudioPolicy在Android系统中主要负责Audio 策略 相关的问题 它和AudioFlinger一起组成了Android Audio系统的两个服务 一个负责管理audio的 路由
  • GTest源码剖析(四)——TEST_P宏

    GTest源码剖析 TEST P宏 GTest源码剖析TEST P宏 TEST P宏用法 TestWithParam 类 1 TestWithParam 类定义 2 WithParamInterface 模版类定义 INSTANTIATE

随机推荐

  • uni-app插件使用注意事项

    1 将插件设置为全局组件后需要将项目重新运行 2 有些插件的功能会互斥 不要贪多全部装上 按需安装即可
  • QT中QMap使用实例详解

    QMap QMultiMap属于关联式容器 其底层结构是通过二叉树实现 故其查找value的效率很快 QMap中的数据都是成对出现的 第一个称为key 键 第二个称value 键值 目录 实例化QMap对象 插入数据 移除数据 遍历数据 由
  • siege压力测试工具安装和介绍

    前言 最近公司有个项目需要做一个短轮询类推送服务器 推送服务器分为三种 短轮询 长轮询 长连接 用户量不大 但是为了保险起见还是做一下压力测试 用的工具是siege 目录 前言 目录 siege介绍 siege安装 siege使用 1 si
  • python爬虫---用数据解析bs4爬取整部三国演义(不用诗词名句网)

    python爬虫 用数据解析bs4爬取整部三国演义 不用诗词名句网 需求 使用bs4实现将三国演义小说的每一章的内容爬取到本地磁盘进行存储 诗词名句网无法进去 所以我自己找了个网站爬取 思路差不多 首先 对首页的页面数据进行爬取 url h
  • 矩阵的逆矩阵 和 转置矩阵

    这几天用到了逆矩阵 就在这里总结一下逆矩阵和转置矩阵 逆矩阵 逆矩阵就是一个矩阵的逆向 比如一个点乘以一个矩阵后得到了一个新的点的位置 如果想通过这个点再获得矩阵转换前的位置 那我们就需要乘以这个矩阵的逆矩阵 在Three js里面 我们可
  • 国产数据库

    作者 JiekeXu 来源 JiekeXu DBA之路 ID JiekeXu IT 大家好 我是 JiekeXu 很高兴又和大家见面了 今天和大家一起来体验一下 TiDB 5 0 欢迎点击上方蓝字关注我 标星或置顶 更多干货第一时间到达 T
  • springboot对bean的生命周期管理

    声明 代码是JavaEE开发的颠覆者 Spring Boot实战代码中的 我只是拿去学习 传统方式 public class BeanWayService public void init System out println Bean i
  • windows10上通过python3远程连接hive

    注意 impyla 既可以连接impala 也可以连接hive 环境 windows10 python版本 3 6 hive版本 1 1 impyla安装过程 安装依赖 pip3 install bit array pip3 install
  • stm32通过ESP8266连接互联网服务器,手机通过网页实现远程控制灯亮灭

    一 实验结果 最终实验结果如上图所示 由于csdn限制gif图像大小 所以模糊了点 但是还是可以看清的 图中是手机在网页中进行操作 然后发送请求到php服务器 php服务器建立tcp链接 该链接通过一个JAVA写的TCP请求转发器 把tcp
  • EasyExcel实现Excel文件导入导出

    1 EasyExcel简介 EasyExcel是一个基于Java的简单 省内存的读写Excel的开源项目 在尽可能节约内存的情况下支持读写百M的Excel github地址 https github com alibaba easyexce
  • 软考高级-信息系统项目管理工程师-备考建议

    本人参加了2023年11月的软考高项 这里分享一下关于高项的备考建议 高项一共有24章节 其中 重点是7 17这10大管理知识域 需要重点理解性学习 1 3 4 18这4章 几乎全是概念 可以阅读性的速看 把相关概念标注出来 考前在速记一下
  • 【python基础知识】18.实操-使用python自动群发邮件

    文章目录 前言 项目实操 明确项目目标 分析过程 拆解项目 逐步执行 代码实现 版本1 0 学习模块 发一封简单邮件 版本2 0 给自己发一封完整邮件 版本3 0 群发完整邮件 前言 之前 我们学习了模块相关的知识 让我们来回顾一下 回顾结
  • Vue列表渲染(v-for的使用)

    列表渲染 列表渲染的东西比较多 我们通过案例一步一步学习列表渲染的相关知识 基本列表 首先写一个基本的列表 想要把persons列表里面的对象展示在li里面 我们可以使用一个指令 v for v for vue提供给我们做循环的指令 语法类
  • Column 'goods_type' in where clause is ambiguous

    今天开发超市管理系统的时候发现了一个问题 百度了一下这个单词ambiguous是暧昧的意思 然后百度了 网上的人说是因为数据库查询的时候的多表查询中 有列名相同导致数据库不知道是那个表的列名 无法识别所以报出这个错误 错误发生在mybati
  • 记使用RabbitMQ的坑

    主要碰到以下几个问题 1 无法正常的启动rabbit服务 见图1 2 工厂启动后无法正常连接消息队列 见图2 3 1 2之后还是无法连接到消息队列 将port端口设置成5672 而不是15672 解决方法 1 针对问题1 在windows服
  • antd-vue表格实现单击或者双击

    在table表格中设置customRow属性 methods中实现 doubleClick record index return on 这里是双击 单击改成click即可 dblclick gt console log record in
  • Verilog 位拼接运算符{}语法要点总结

    Verilog 位拼接运算符语法要点总结 Verilog位拼接运算符 语法回顾 要点总结 Verilog位拼接运算符 语法回顾 verilog中 运算符用于 拼接 多个变量或者常量 基本用法如下 1 变量的拼接 wire a 3 0 b 4
  • android关于屏幕适配的几点建议

    1 使用wrap content match parent weight 2 使用相对布局 尽量不使用绝对布局 3 使用限定符 如 layout large xxx xml 这样大屏设备就会自动使用该布局 4 使用最小宽度限定符 如 lay
  • 基于Redisson的分布式锁

    接口实现类 import java util concurrent TimeUnit import org redisson api RLock import org redisson api RedissonClient 基于Rediss
  • elasticjob 源码分析

    简介 elasticjob是基于quartz构建支持分片的分布式弹性可伸缩的job执行组件 zookeeper节点数据设计 job leader election latch instance 主节点的实例ID 临时节点 在节点选举成功后添