datax启动流程

2023-11-18

组件

datax采集流程
  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Transformer:在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作
  • Job: Job是DataX用以描述从一个源头到一个目的端的同步作业,是DataX数据同步的最小业务单元。
  • Task: Task是把Job拆分得到的最小执行单元。
  • JobContainer: Job执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。
  • TaskGroupContainer: TaskGroup执行器,负责执行一组Task的工作单元。
  • TaskGroup: 描述的是一组Task集合。在同一个TaskGroupContainer执行下的Task集合称之为TaskGroup

参数

datax.py脚本接收参数
-j:jvm参数
--jobid: 在local与distribute模式下运行的作业唯一id
-m: 运行datax时的-Dmode参数,可选standalone, local, distribute 
-p: 运行datax时的额外的附加的运行参数
-r: 查看reader模板,与-w一起使用,${datax.home}/plugin/reader/${该读插件名称}/plugin_job_template.json
-w: 查看writer模板,与-r一起使用,${datax.home}/plugin/reader/${该写插件名称}/plugin_job_template.json

C:/dev/Python27/python.exe datax.py -p"-Dlast=123 -Dend=456" --jobid=123456 C:/Users/Lenovo/Desktop/datax/jobConf/mysql2mysql.json

#分配了启动 限制堆大小为1g,不可扩展,发生了 内存溢出错误dump路径为C:\Users\Lenovo\PycharmProjects/log 
java -server 
-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=C:\Users\Lenovo\PycharmProjects/log 
-Dloglevel=info -Dfile.encoding=UTF-8 
-Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener 
-Djava.security.egd=file:///dev/urandom 
-Ddatax.home=C:\Users\Lenovo\PycharmProjects 
-Dlogback.configurationFile=C:\Users\Lenovo\PycharmProjects/conf/logback.xml 
-classpath C:\Users\Lenovo\PycharmProjects/lib/*  
-Dlog.file.name=onf\mysql2mysql_json 
-Dlast=123 -Dend=456 com.alibaba.datax.core.Engine 
-mode standalone -jobid 123456 -job C:\Users\Lenovo\Desktop\datax\jobConf\mysql2mysql.json
#默认以standalone模式启动
datax.home=当前运行目录
logback.configurationFile=${data.home}/conf/logback.xml 
classpath=${datax.home}/lib/*

CoreConstant中会提取datax.home这个环境变量供全局使用,拼接成core.json,plugin.json的地址

  • 一些环境变量
  • mode:standalone, local, distribute 选择作业运行模式
  • jobid:在local与distribute模式下运行的作业唯一id
  • job:作业配置文件路径
  • classpath
  • Standalone: 单进程运行,没有外部依赖。
  • Local: 单进程运行,统计信息、错误信息汇报到集中存储。
  • Distrubuted: 分布式多进程运行,依赖DataX Service服务。

运行流程

引擎启动后jobContainer启动流程

作业配置加载

  • 通过作业配置文件路径(-job参数)来加载作业配置文件。

  • CoreConstant通过环境变量获取core配置文件路径(datax.home拼接),加载core配置。

  • 通过job.content[0].reader/writer.name读取该作业的插件名,通过job.preHandler.pluginName/job.postHandler.pluginName读取该作业的前置或后置处理插件名。通过CoreConstant获取以上所有读取到的插件名的绝对路径。

  • 通过路径来加载插件配置文件内容。插件的配置文件按如下约束。

    {
        "name": "mysqlwriter",
        "class": "com.alibaba.datax.plugin.writer.mysqlwriter.MysqlWriter",
        "description": "",
        "developer": ""
    }
  • Configuration.from(String json)读取任意配置文件时都会将${xxx}$xxx占位符替换成xxx对应的环境变量。即-Dlast=123使配置文件中 ${last}替换成123, 该逻辑存在StrUtil.replaceVariable(json)

  • 将core,job,plugin配置合并,生成全局使用的配置Configuration

    {
        "entry":{……},
        "common":{……},
        "core":{
            "container":{
                "job":{
                    "id": ${jobId},
                    ………………其他配置
              } 
          }
        },
        "job":{……},
      "plugin":{
            "reader":{
                "${pluginName}":{
                    "name": "",
                    "class": "",
                    "description": "",
                    "developer": ""
                }
            },
            "writer":{
                "${pluginName}":{……}
            }
      }
    }
  • 最后做过滤输出和检查配置

引擎启动

  • 从common取出需要的转换格式yyyy-MM-dd或编码UTF-8,用于String与Date或Bytes的互相转换
  • 将配置Configuration传入LoadUtil Jar加载器,后面会使用LoadUtil进行插件Jar的动态加载。包括对每个插件的加载隔离机制和加载器缓冲的实现。
  • 根据core.container.model判断使用TaskGroupContainer还是使用JobContainer,默认使用JobContainer
  • PerfTrace初始化,默认不使用PerfTrace,获取job.JobInfo 默认无该配置项,
  • 容器启动

JobContainer

其中加载操作中的加载插件时:

为避免jar冲突,比如hbase可能有多个版本的读写依赖jar包,JobContainer和TaskGroupContainer
就需要脱离当前classLoader去加载这些jar包,执行完成后,又退回到原来classLoader上继续执行接下来的代码

每个Jar的执行加载都有单一的类加载器进行隔离加载,JarLoader会缓冲到jarLoaderCenter中

LoadUtil.getJarLoader 就会根据插件类型和名字去jarLoaderCenter中获取加载器,获取不到之后才会重新构造一个加载器
类加载器隔离
  • preHandle前置处理器:根据job.preHandler.pluginName加载已存在的插件,并执行插件的preHandler方法

  • init初始化:

    1. 根据job.content[0].reader/writer.name插件名来加载reader和writer插件,并保存reader/wirter PluginName

    2. 赋值Configuration,赋值Job插件本身和对端插件的配置job.content[0].reader/writer.parameter与对端的插件名子。并且执行他们的init方法。

  • prepare准备:执行reader/writer的prepare方法

  • split切分任务:

    1. 根据job.setting.speed.bytecore.transport.channel.speed.bytejob.setting.speed.recordcore.transport.channel.speed.record的值计算出并发task数needChannelNumber,具体算法

      作业byte限速除于单个channel的byte限速 得到 byte限速下的所需channel
      作业record限速除于单个channel的record限速 得到record限速下的所需channel
      对比两个channel数取最小的作为needChannelNumber
      
      若job.setting.speed.byte与job.setting.speed.record设置为空
      则直接使用job.setting.speed.channel作为needChannelNumber
      若都为空,则抛出异常
  1. 执行reader和writer的split方法,获取经过split每个Task的reader和writer的配置。

    执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果,
    达到切分后数目相等,才能满足1:1的通道模型,所以这里可以将reader和writer的配置整合到一起。
    
    计算出的needChannelNumber/tableNUm * 分裂因子 = 最终需要的Task数量
切分方式
在split方法中需要根据tables数量,splitPk进行分隔任务,每个任务下的connection都会根据切分结果与column,where来生成一个querySql。
  1. 获取作业的transformer配置,每个Task的reader和writer配置再加上该transformer的配置合并。将原本的job.content替换。即原本只有单个content,经过split后产生多个content,并为其设置递增的taskId

    {
        "job": {
            "content": [
                {
                    "taskId": 1,
                    "reader": {
                        "parameter": {
                            "querySql": ""
                        }
                    },
                    "transformer":[],
                    "writer": {}
                },
                 {
                    "taskId": 2,
                    "reader": {},
                    "transformer":[],
                    "writer": {}
                },
                {
                    "taskId": 3,
                    "reader": {},
                    "transformer":[],
                    "writer": {}
                }
            ],
            "setting": {
                "speed": {
                    "channel": ""
                }
         }
        }
    }
  • schedule调度:

    1. parseAndGetResourceMarkAndTaskIdMap:以reader.parameter.loadBalanceResourceMark资源名做分组。得出一个 资源名称 --> taskId(List) 的 map 映射关系。在split阶段,会对插件的loadBalanceResourceMark进行设置,通常是使用jdbc连接的host

    2. doAssign:根据parseAndGetResourceMarkAndTaskIdMap的结果,将需要运行Task按一个特定的规则分配到taskGroup中。每个TaskGroup都将获得一份Configuration克隆,设置每个taskConfiguration的content中的core.container.taskGroup.id。并且修正job.content,使他的配置文件回到单content状态

      a 库上有表:0, 1, 2
      b 库上有表:3, 4
      c 库上有表:5, 6, 7
      
      如果有 4个 taskGroup
      打竖遍历添加到taskGroup 
      资源: 0 3 5|1 4 6| 2 7
      taskGroup: 0 3 5 1|4 6 2 7
      
      则 doAssign 后的结果为:
      taskGroup-0: 0,  4,
      taskGroup-1: 3,  6,
      taskGroup-2: 5,  2,
      taskGroup-3: 1,  7
  1. adjustChannelNumPerTaskGroup:修正因为无法平均分配的少一个task的taskGroup的core.container.taskGroup.channel的更改

    3个task分配到2个taskGroup中时,会造成一个taskGroup的channel为2,一个taskGroup的channel为1
    所以要将少了一个task的taskGroup的channel进行修正优化。
  2. 为每个taskGroup修正core.container.job.mode为standalone

  3. StandAloneScheduler#registerCommunication:为每个taskGroup注册Communication(状态及统计信息交互)

  4. StandAloneScheduler#startAllTaskGroup:为每个taskGroup创建TaskGroupContainer并代理到TaskGroupContainerRunner启动TaskGroupContainer。其中动态加载transfomer,数据采集就在这个步骤之内。

  • post:执行reader和writer的post方法

  • postHandle:根据job.postHandler.pluginName加载已存在的插件,并执行插件的postHandler方法

  • invokeHooks:根据/hook目录调用外部hook

TaskGroupContainer

类图

reader与writer的数据传输

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

datax启动流程 的相关文章

随机推荐

  • Flink checkPoint和SavePoint

    savepoint和checkpoint都是flink为容错提供的强大功能特性 能够自动或手动保存job的运行状态 两者区别 checkpoint 应用定时触发 用户保存状态 会过期 内部应用失败重启的时候启用 但是手动cancel时 会删
  • nginx配置转发日志

    http include mime types default type application octet stream log format main remote user time local http x Forwarded fo
  • Vue3 icons 图标无效

    问题描述 需要在账号 密码处加上icon图标 但是引用完element plus icons之后 还是不行 不显示icon图标 后面发现 当前版本的emement plus的icon图标不能直接使用了 前置条件 npm install el
  • Go 服务自动收集线上问题现场

    前言 对于 pprof 相信熟悉 Go 语言的程序员基本都不陌生 一般线上的问题都是靠它可以快速定位 但是实际项目中 很多时候我们为了性能都不会开启它 但是出了问题又要靠它来分析 好在 go zero 已经帮我们很好的集成进来了 我们只需要
  • 深度模型压缩论文(03)- Be Your Own Teacher: Improve the Performance of Convolutional Neural Networks via Self

    文章目录 1 摘要和背景 1 1 摘要 1 2 背景 2 方法和贡献 2 1 方法 2 1 1 训练过程 2 1 2 loss介绍 2 2 贡献 3 实验和结果 3 1 实验 3 2 结果 4 总结和展望 4 1 总结 4 2 展望 主要贡
  • 【git】git push 本地项目报错 ssh_dispatch_run_fatal

    1 概述 我的一个项目原本是使用如下命令git下来的 git clone git xxx git 昨天还好还好的 今天发现突然无法push项目了 开始自己发现网络比较慢 后面稍微恢复了一下还是不可以 然后git push的时候报错 ssh
  • mybatis-plus整合alibaba.druid实现多数据源配置

    须知 依托于springboot项目实现 一 添加maven依赖
  • javascript学习笔记-面向对象

    javascript学习笔记 面向对象 JavaScript中 现阶段我们可以采用三种方式创建对象 利用字面量创建对象 利用New Object创建对象 利用构造函数创建对象 一 利用字面量创建对象 var obj uname 张三 age
  • 云安全技术——kvm虚拟化技术

    目录 10 1 kvm简介 10 2 在CentOS 7 图形化界面下安装KVM 使用IDEA开发读写MySQL数据库程序 实验目的 了解 CentOS7图形化界面的部署方法 了解 KVM的组成和作用 了解 KVM的技术架构 了解KVM的安
  • python—scrapy框架爬虫—链家二手房数据

    本文讲解的是scrapy框架爬虫的实例 文章目录 前言 scrapy简介 1 scrapy框架的流程 2 流程简介 操作 1 创建scrapy项目 2 运行 3 代码部分 前言 本文爬取的是链家重庆主城九区的二手房数据 同时将爬取的数据存入
  • linux查看进程绑定cpu核是否成功

    运行top命令 可以看到进程以及进程cpu占有率 然后查看是否有P属性 这个属性用来查看进程绑定的cpu核 这里没有看到cpu占用核心的P属性 运行top后 按 f 键进入top配置界面 然后按上下键选择P选项 此时可以看到P选项前面没有
  • 简单Hexo更换主题教程

    Hexo自带的默认主题不是很好看 我们可以按自己需求更换对应的主题 主题由很多 大家可以使用搜索引擎查找 这里我们演示butterfly主题的安装 前提 需要安装git 需要安装nodejs 步骤 在博客的项目文件夹下打开git bash执
  • LeetCode——1302. 层数最深叶子节点的和

    题目描述 给你一棵二叉树的根节点 root 请你返回层数最深的叶子节点的和 示例 1 输入 root 1 2 3 4 5 null 6 7 null null null null 8 输出 15 示例 2 输入 root 6 7 8 2 7
  • 以太坊公链节点连接节点超时问题排查

    2020年4月1日晚上8点 zabbix报警 以太坊公链三分钟内没有检测到区块数据同步 立即登录到服务器 查看以太坊公链节点数据同步情况 docker logs f public eth tail 10 INFO 04 01 20 17 3
  • 用IDEA创建JavaWeb项目

    文章目录 一 创建web项目 1 打开idea软件 点击界面上的Create New Project 2 进入如下界面 选中 java Enterprise 配置jdk tomcat 勾选Web Application案例 注意勾选生成we
  • 华为交换机的基本配置,看完秒懂

    一 交换机的基本配置 交换机连接方式 本地 计算机COM口 USB口 gt Console线 gt 交换机Console口 远程 Putty SecureCRT Xshell远程管理工具 华为VRP网络操作系统 1 华为的视图模式
  • HTML设计一个水平导航栏,完成水平导航栏下拉列表交互效果的实现。

    HTML设计一个水平导航栏 简单的完成水平导航栏下拉列表交互效果的实现 一 水平导航栏 设计要求 CSS样式分析 hello 大家好 学习之路一小步 如果有不严谨的问题请指出 我会积极学习的 一 水平导航栏 设计要求 1 使用无序列表ul及
  • GBase 8s 整合hibernate和pagehelpe

    1 pom 配置文件
  • Ubuntu20.04 Mathtype-appimage

    LD PRELOAD usr lib x86 64 linux gnu libxcb dri3 so 0 Math AppImage
  • datax启动流程

    组件 datax采集流程 Reader Reader为数据采集模块 负责采集数据源的数据 将数据发送给Framework Writer Writer为数据写入模块 负责不断向Framework取数据 并将数据写入到目的端 Transform