DataX全量和增量mysqltomysql(二)

2023-11-08

全量mysqltomysql

进入目录编写json

cd /usr/local/datax/job
vi zabbixmysql2mysql.json

写入的表结构要和reader的表结构一样,先建立好
编写json文件

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "test",
                        "password": "123",
                        "column": [
                            "itemid",
                            "clock",
                            "timestamp",
                            "source",
                            "severity",
                            "value",
                            "logeventid",
                            "ns" 
                        ],
                        "splitPk": "itemid",
                        "connection": [
                            {
                                "table": [
                                    "history_log"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://172.16.3.89:3306/zabbix"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "test",
                        "password": "123",
                        "column": [
                            "itemid",
                            "clock",
                            "timestamp",
                            "source",
                            "severity",
                            "value",
                            "logeventid",
                            "ns"    
                        ],
                        "preSql": [
                            "truncate history_log_copy1"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://172.16.3.89:3306/chenzhenhua2?useUnicode=true&characterEncoding=utf8",
                                "table": [
                                    "history_log_copy1"
                                ]
                            }
                        ]
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 6
            }
        }
    }
}

注意:“writeMode”: “insert”,也可以为update,update更加稳妥一点
“preSql”: [ “truncate history_log_copy1” ], 在写入前提前清空表清空表

如果写入的数据库为mysql8以上版本,必须修改mysql-connector-java的插件

cd /usr/local/datax/plugin/writer/mysqlwriter/libs
mv mysql-connector-java-5.1.34.jar mysql-connector-java-5.1.34.jar-bak

我这边上传的为mysql-connector-java-8.0.16.jar,下载地址https://static.runoob.com/download/mysql-connector-java-8.0.16.jar

增量同步

Datax需要解决的另一个难题在于增量更新。

首先需要说明, Datax本身在大部分reader插件中提供了where配置项,用于做增量更新。例如mysqlerader md文件说明如下:

* **where**

	* 描述:筛选条件,MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create > $bizdate 。注意:不可以将where条件指定为limit 10,limit不是SQL的合法where子句。<br />

          where条件可以有效地进行业务增量同步。如果不填写where语句,包括不提供where的key或者value,DataX均视作同步全量数据。

	* 必选:否 <br />

	* 默认值:无 <br />

* **querySql**

	* 描述:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id <br />

	 `当用户配置querySql时,MysqlReader直接忽略table、column、where条件的配置`,querySql优先级大于table、column、where选项。

	* 必选:否 <br />

	* 默认值:无 <br />

示例:
新建json

vi  new.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123",
                        "where": "created_at > FROM_UNIXTIME(${create_time}) and created_at  < FROM_UNIXTIME(${end_time})",
                        "column": [
                            "id",
                            "rpt_date",
                            "rpt_hour",
                            "unit_id",
                            "build_id",
                            "num",
                            "run_state",
                            "created_at"
                        ],
                        "splitPk": "id",
                        "connection": [
                            {
                                "table": [
                                    "rpt_warning_hour"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://172.16.5.11:3306/smart_fire"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "update",
                        "username": "test",
                        "password": "123",
                        "column": [
                            "id",
                            "rpt_date",
                            "rpt_hour",
                            "unit_id",
                            "build_id",
                            "num",
                            "run_state",
                            "created_at"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://172.16.3.89:3306/chenzhenhua2?useUnicode=true&characterEncoding=utf8",
                                "table": [
                                    "rpt_warning_hour"
                                ]
                            }
                        ]
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 6
            }
        }
    }
}

上面需要注意的事情为FROM_UNIXTIME将表里面的时间格式转换为时间戳格式,如果表里默认为时间戳不需要转换。
${…}就是将变量传入,上次更新{create_time}上次更新时间,{end_time}为现在本地时间。

然后再编写一个python脚本可以将参数传入json即可,vi dataxScheduler.py

import time,os,sys

print "going to execute"

configFilePath = sys.argv[1]
logFilePath = sys.argv[2]
lastTimeExecuteRecord = sys.argv[3]
lastExecuteTime=""

try:
    fo = open(lastTimeExecuteRecord, "r")
    lastExecuteTime = fo.read()
    print lastExecuteTime
except IOError:
    lastExecuteTime = int(1)
lastExecuteTime = int(lastExecuteTime)

print("last time execute time:  " + str(lastExecuteTime))

currentTime = int(time.time())
print("currentTime is        :"+ str(currentTime))


#os.system("python /usr/local/datax/bin/datax.py " + configFilePath + " --lastTime" +  lastExecuteTime + " --currentTime" + currentTime + " >> " + logFilePath)

script2execute  = "python /usr/local/datax/bin/datax.py %s -p \"-Dcreate_time=%s -Dend_time=%s\" >> %s"%(configFilePath,lastExecuteTime,currentTime,logFilePath)
print("to be excute script:"+script2execute)
os.system(script2execute)

print("script execute ending")

# update timestamp to file
fo = open(lastTimeExecuteRecord, "w+")
fo.write(str(currentTime))
fo.close()

print("ending---",lastTimeExecuteRecord)

运行

python /usr/local/datax/job/dataxScheduler.py  '/usr/local/datax/job/new.json'  '/usr/local/datax/job/test_job.log'   '/usr/local/datax/job/test_job.record'

测试,增加数据后再次运行,数据对应增加了,加入到定时任务执行即可完成增量同步。
但这个写脚本的方式还是非常笨拙的,下一篇介绍的datax-web会更好的去解决增量同步的问题。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

DataX全量和增量mysqltomysql(二) 的相关文章

  • 通过 Python 循环浏览网络上的目录并显示其内容(文件和其他目录)

    同样的道理在Python中处理从源目录到目标目录的一组文件 https stackoverflow com questions 2593399 process a set of files from a source directory t
  • 如何用正则表达式替换多个匹配/组?

    通常我们会编写以下内容来替换一场比赛 namesRegex re compile r is life re I replaced namesRegex sub r butter There is no life in the void pr
  • Python-验证我的文档 xls 中是否存在工作表

    我正在尝试在空闲时间设计一个小程序 加载 xls 文件 然后在要扫描的文档中选择一张纸 步骤1 用户导入 xls文件 导入程序后检查文件是否存在 我能做到的 第 2 步 我要求用户提供要分析的文档表 xls 的名称 这就是它停止的地方 该程
  • 尝试使用 Ruby Java Bridge (RJB) gem 时出现错误“无法创建 Java VM”

    我正在尝试实现 Ruby Java Bridge RJB gem 来与 JVM 通信 以便我可以运行 Open NLP gem 我在 Windows 8 上安装并运行了 Java 所有迹象 至少我所知道的 都表明 Java 已安装并可运行
  • Python:在字典中查找具有唯一值的键?

    我收到一个字典作为输入 并且想要返回一个键列表 其中字典值在该字典的范围内是唯一的 我将用一个例子来澄清 假设我的输入是字典 a 构造如下 a dict a cat 1 a fish 1 a dog 2 lt unique a bat 3
  • 将 Long 转换为 DateTime 从 C# 日期到 Java 日期

    我一直尝试用Java读取二进制文件 而二进制文件是用C 编写的 其中一些数据包含日期时间数据 当 DateTime 数据写入文件 以二进制形式 时 它使用DateTime ToBinary on C 为了读取 DateTime 数据 它将首
  • org.jdesktop.application 包不存在

    几天以来我一直在构建一个 Java 桌面应用程序 一切都很顺利 但是今天 当我打开Netbeans并编译文件时 出现以下编译错误 Compiling 9 source files to C Documents and Settings Ad
  • 无法使用 python rasterio、gdal 打开 jp2 (来自哨兵)

    我试图在 python 中将 jp2 栅格产品作为栅格打开 但当我们使用 raterio 和 gdal 包时没有成功 我收到此错误 RasterioIOError b4 jp2 not recognized as a supported f
  • Seaborn 中没有线性拟合的散点图

    我想知道是否有办法关闭seaborn中的线性拟合lmplot或者是否有一个等效函数可以生成散点图 当然 我也可以使用 matplotlib 但是 我发现 seaborn 中的语法和美学非常吸引人 例如 我想绘制以下情节 import sea
  • 是否可以在Python中将日+月(不是年)与当前日+月进行比较?

    我正在获取 5 月 10 日 格式的数据 我试图弄清楚它是今年还是明年 该日期仅一年 因此 5 月 10 日表示 2015 年 5 月 10 日 而 5 月 20 日表示 2014 年 5 月 20 日 为此 我想将字符串转换为日期格式并进
  • 查看Jasper报告执行的SQL

    运行 Jasper 报表 其中 SQL 嵌入到报表文件 jrxml 中 时 是否可以看到执行的 SQL 理想情况下 我还想查看替换每个 P 占位符的值 Cheers Don JasperReports 使用 Jakarta Commons
  • 将2-3-4树转换为红黑树

    我正在尝试将 2 3 4 树转换为 java 中的红黑树 但我无法弄清楚它 我将这两个基本类编写如下 以使问题简单明了 但不知道从这里到哪里去 public class TwoThreeFour
  • 休眠以持久保存日期

    有没有办法告诉 Hibernate java util Date 应该持久保存 我需要这个来解决 MySQL 中缺少的毫秒分辨率问题 您能想到这种方法有什么缺点吗 您可以自己创建字段long 或者使用自定义的UserType 实施后User
  • Python 读取未格式化的直接访问 Fortran 90 给出不正确的输出

    这是数据的写入方式 它是一个二维浮点矩阵 我不确定大小 open unit 51 file rmsd nn output form unformatted access direct status replace recl Npoints
  • Python 相当于 Scala 案例类

    Python 中是否有与 Scala 的 Case Class 等效的东西 就像自动生成分配给字段而无需编写样板的构造函数一样 当前执行此操作的现代方法 从 Python 3 7 开始 是使用数据类 https www python org
  • 两种 ODE 求解器之间的差异

    我想知道 两者之间有什么区别ODEINT and solve ivp用于求解微分方程 它们之间有什么优点和缺点 f1 solve ivp f 0 1 y0 y0 is the initial point f2 odeint f y0 0 1
  • KeyPressed 和 KeyTyped 混淆[重复]

    这个问题在这里已经有答案了 我搜索过之间的区别KeyPressedand KeyTyped事件 但我仍然不清楚 我发现的一件事是 Keypressed 比 KeyTyped 首先被触发 请澄清一下这些事件何时被准确触发 哪个适合用于哪个目的
  • 如何使用 Python/Django 在 Facebook 中获取(和使用)扩展权限

    我正在尝试编写一个简单的应用程序 让用户授予我的代码写入其页面的 Facebook 流的权限 据我了解 它应该很简单 让用户单击一个按钮 启动一个弹出窗口 其中包含我的 Facebook 应用程序中的页面 在该页面中 他们单击授予的内容流发
  • 基于值的 matplotlib 条形图颜色

    有没有一种方法可以根据条形图的值对条形图的条形进行着色 例如 values below 0 5 red values between 0 5 to 0 green values between 0 to 08 blue etc 我找到了一些
  • javax.persistence.Table.indexes()[Ljavax/persistence/Index 中的 NoSuchMethodError

    我有一个 Play Framework 应用程序 并且我was使用 Hibernate 4 2 5 Final 通过 Maven 依赖项管理器检索 我决定升级到 Hibernate 4 3 0 Final 成功重新编译我的应用程序并运行它

随机推荐

  • Java语言中的重写(override)和重载(overload)

    Java语言中的重写 override 和重载 overload 重写 override 和重载 overload 是编程语言中的两个常见概念 用于描述函数或方法的特定行为 重写指的是在子类中重新定义 覆盖 父类中已经存在的同名方法 重写可
  • 习题2软件工程

    3 4 1 不是 通常所说的结构化程序 是按照狭义的结构程序的定义衡量 符合定义规定的程序 图示的程序的循环控劇结构有两个出口 显然不符合狭义的结构程序的定义 因此是非结构化的程序 2
  • aspose文档格式转换

    文章目录 Word转Pdf html转pdf pdf转word Word转Pdf public static void main String args throws Exception Document doc new Document
  • pikachu靶场CSRF之TOKEN绕过

    简介 Pikachu靶场中的CSRF漏洞环节里面有一关CSRF TOKEN 这个关卡和其余关卡稍微有点不一样 因为表单里面存在一个刷新就会变化的token 那么这个token是否能绕过呢 接下来我们来仔细分析分析 实战过程 简单尝试 先利用
  • 11月10日 生命值,减少生命值,创建生命值UI UE4斯坦福 学习笔记

    制作角色属性Comp 添加一个Actorcomp 在 h内添加生命值与减少血量的函数 protected 只在蓝图内可以编辑 在编辑器界面不能编辑 UPROPERTY EditDefaultsOnly BlueprintReadOnly C
  • Qt应用开发(基础篇)——颜色选择器 QColorDialog

    一 前言 QColorDialog类继承于QDialog 是一个设计用来选择颜色的对话框部件 对话框窗口 QDialog QColorDialog颜色选择器一般用来让用户选择颜色 比如画图工具中选择画笔的颜色 刷子的颜色等 你可以使用静态函
  • 彻底卸载MySQL8.0

    环境需求 win10 MySQL8 0 彻底卸载 1 停止MySQL服务 启动任务管理器 gt 选择服务 gt 找到MySQL gt 右键停止 如果有多个MySQL服务 也全部都要停掉 2 卸载MySQL相关所有组件 打开看控制面板 gt
  • 使用树莓派进行远程视频转播(内网穿透)

    一 准备材料 实体 树莓派摄像头 树莓派 虚拟 云服务器 二 先测试树莓派进行局域网转播 这里是需要安装的软件 sudo apt get install subversion libjpeg8 dev imagemagick libv4l
  • 线性代数系列讲解第七篇 正交向量及正交空间

    正交向量 orthogonal vector 毕达哥拉斯定理 勾股定理 Pythagoras 我们很容易得出 x 2 y 2 x y 2 x 2 y 2 x y 2 x 2 y 2 x y 2 这就是勾股定理 我们可以将一个向量的模的平方写
  • 服务器改配项目,网络服务器搭建(项目五)[xxxx1214修改].ppt

    网络服务器搭建 项目五 xxxx1214修改 4 查看启动信息 service named restart 如果named服务无法正常启动 可以查看提示信息 根据提示信息更改配置文件 5 查看端口 如果服务正常工作 则会开启TCP和UDP的
  • 自动化测试:python测试结果和报告自动发送邮件

    一 带有附件发送邮件 1 导入模块 MIMEMultipart from email mime multipart import MIMEMultipart 复制 2 先读取要发送文件的内容 file new 是测试报告路径的参数名 3 下
  • Linux 动态库 soname 实践

    xredis 因为项目中使用到了 xredis C 开发的redis客户端 是对hiredis的C 封装 在 makefile 中发现使用到了 Wl soname 这个语法 之前没怎么了解过 特此记录 makefile 节选如下 XREDI
  • LeetCode—200.岛屿数量(Number of Islands)——分析及代码(C++)

    LeetCode 200 岛屿数量 Number of Islands 分析及代码 C 一 题目 二 分析及代码 1 深度优先搜索 1 思路 2 代码 3 结果 三 其他 一 题目 给定一个由 1 陆地 和 0 水 组成的的二维网格 计算岛
  • elementui确认消息区分取消和关闭按钮

    默认情况下 elementui的确认消息 取消按钮和右上角弹窗 走的是同一个方法 也就是catch方法的回调 如果功能上需要做区分 就没法区分了 所以 要解决这个问题 就需要在取消的回调方法里做一个判断 来区分是点击右上角的取消 还是点击的
  • 如何隐藏unity窗口中的变量、如何设置变量范围、在编辑器中如何显示私有变量与Awake、start之间的区别

    什么是脚本 cs的文本文件 类文件 附加到游戏物体中 定义游戏对象行为指令的代码 c 类包括 字段 属性 构造方法 方法 脚本文件包括 字段 方法 不能在脚本文件里写构造方法 一些unity脚本小属性 序列化字段 作用 在编辑器中显示私有变
  • 基于Matlab的BiLSTM实现

    问题背景 目前深度学习多使用python实现 不过想要配置好一个python的深度学习环境有时却并不轻松 常常因为各个第三方库版本兼容性问题而失败 相比之下 matlab仅需一次安装简化了不少工作 这几年matlab的深度学习工具箱也是发展
  • 高斯过程回归

    文章目录 效果一览 文章概述 研究内容 程序设计 参考资料 效果一览 文章概述 高斯过程回归 Matlab实现高斯过程回归多输入单输出预测 Gaussian Process Regression 研究内容 高斯过程回归 Gaussian P
  • OpenFeign 入门教程 - 基础篇

    目录 Spring Cloud OpenFeign 介绍 Feign 概述 Spring Cloud OpenFeign 概述 Spring Cloud OpenFeign 的特性 Feign 与 Spring Cloud OpenFeig
  • Centos7 linux 安装 redis 遇到的几个问题

    环境 centos7 redis 5 0 解决方案仅供参考 如不能解决问题 请查找请他方案 1 不能编译没有GCC 编译工具 make报错 make 1 persist settings Error 2 ignored CC adlist
  • DataX全量和增量mysqltomysql(二)

    全量mysqltomysql 进入目录编写json cd usr local datax job vi zabbixmysql2mysql json 写入的表结构要和reader的表结构一样 先建立好 编写json文件 job conten