DATAX:MongoDB增量数据写入到mysql中

2023-05-16

项目场景:

简述:

使用DATAX进行Mongo的数据抽取,然后写入到mysql中,其中会牵涉到全量数据的写入和增量数据的写入。
全量 数据的写入我们只需要正常写JSON模板即可,使用column将要获取的字段。但是考虑到有增量数据获取的时候,需要增加限定条件。

{

  "job": {

    "content": [

      {

        "reader": {

          "name": "mongodbreader",

          "parameter": {

            "address":["IP:端口号"],
			"toUri":true,"ssl":false,
			"trustStore":"",
			"trustStorePassword":"",
			"userName":"xxx",
			"userPassword":"",
			"dbName":"xxxx",
			"collectionName": "xxxx",
      "column":[
   {
    "name":"CRT_TM",
    "type":"string"
   }
      ]
          }
        },
                
		"writer": {
			
			"name": "mysqlwriter",             
			"parameter": {
            "column": [ 
"CRT_TM"
			],
                        
			"connection": [{
                             "jdbcUrl": "xxxx",
                             "table": ["xxxx"]
                            }],
            "preSql": [],
            "session": [ "set session sql_mode='ANSI'"],
            "username": "xx",
			"password": "",
            "writeMode": "insert"
                    }
                }
      } 
    ]
,
    "setting": {

      "errorLimit": {

        "percentage": 0,

        "record": 0

      },

      "speed": {

        "channel": 1

      }

    }
  }
}

问题描述

例如要获取近两天的mongo中的数据作为增量数据写入到mysql中。
mongodbreader的JSON中的parameter中没有像mysql、Oracle一样的querySQL,可以通过写SQL来限制日期时间来获取数据。

"reader": {

          "name": "oraclereader",

          "parameter": {

            "connection": [

              {

                "jdbcUrl": [

                  "xxx"

                ],

                "querySql": [
"
SELECT
CASE 
WHEN A.MZZY_FLAG='B'
THEN A.JZ_LSH ELSE NULL
END AS INHOS_NO ,--- 住院号
CASE 
WHEN A.MZZY_FLAG='M'
THEN A.JZ_LSH ELSE NULL
END AS OTPT_EMG_NO ,--- 门(急)诊号
A.PAT_ID AS PID ,--- 患者ID
'' AS MDCR_PAY_WAY_CD ,--- 医保支付方式代码
'' AS MDCR_PAY_WAY_NM ,--- 医保支付方式名称
A.TCJJ_ZF  AS MDCR_OVRL_PLNG_PAY_FEE ,--- 医保统筹基金支付金额
A.DBJZ_ZF  AS CRTCL_ILLNS_INSRNC_FEE ,--- 大病保险金额
'' AS MDC_ASSTS_FEE ,--- 医疗救助金额
A.GWYBZ_ZF AS CVL_SVNT_MDC_SBSDY_FEE ,--- 公务员医疗补助金额
'' AS BIG_SPLMT_FEE ,--- 大额补充金额
'' AS MNFCT_SPLMT_FEE ,--- 企业补充金额
(A.FUND_PAY_SUMAMT-A.TCJJ_ZF-A.DBJZ_ZF-A.GWYBZ_ZF) AS OTHR_PAY_FEE ,--- 其他基金支付金额
A.ZL_JE AS SLF_PAY ,--- 个人自付
A.ZF_JE AS SLF_FEE ,--- 个人自费
'' AS ACNT_PAY ,--- 个人账户支付
A.GRXJ_ZF AS CSH_PAY ,--- 个人现金支付
-- A.FY_ZE,--总额
-- A.BXJE,--报销金额
A.JS_RQ AS FEE_STLMT_DT ,--- 费用结算日期
    A.CJSJ AS CRT_TM, -- 创建时间
    A.CJR AS CRT_STFF_CD, -- 创建职工工号
    (SELECT XM FROM DICTMANAGE.DICT_EMPE WHERE KEYNO = A.CJR AND ROWNUM = 1) AS CRT_STFF_NM, -- 创建职工姓名
    A.ZHGXSJ AS UPD_DT, -- 更新日期
    A.ZHGXR AS UPD_STFF_CD, -- 更新职工工号
    (SELECT XM FROM DICTMANAGE.DICT_EMPE WHERE KEYNO = A.ZHGXR AND ROWNUM = 1) AS UPD_STFF_NM, -- 更新职工姓名
CASE WHEN A.MISTATUS IN ('MI_DEL')  THEN '1' ELSE '0' END AS INVLD_FLG --- 作废标识
-- A.MISTATUS,--(医保状态PAY_OK,付款成功,RECEDE_OK退款成功,CANCEL取消医保,MI_ERROR医保出错,RECEDE_ERROR退费出错,NEW新的记录,MI_COST医保收费,MI_OK医保确认提交,MI_DEL数据删除)
-- CASE WHEN A.MISTATUS ='PAY_OK'  THEN '付款成功' 
-- WHEN A.MISTATUS ='RECEDE_OK'  THEN '退款成功' 
-- WHEN A.MISTATUS ='CANCEL'  THEN '取消医保' 
-- WHEN A.MISTATUS ='MI_ERROR'  THEN '医保出错' 
-- WHEN A.MISTATUS ='RECEDE_ERROR'  THEN '退费出错' 
-- WHEN A.MISTATUS ='NEW'  THEN '新的记录' 
-- WHEN A.MISTATUS ='MI_COST'  THEN '医保收费' 
-- WHEN A.MISTATUS ='MI_OK'  THEN '医保确认提交' 
-- WHEN A.MISTATUS ='MI_DEL'  THEN '数据删除' 
-- END AS MISTATUS_NM
FROM HIS.MI_TRANS_INFO A
WHERE A.PAT_ID IS NOT NULL 
"
]

              }

            ],

            "username": "xxx",

            "password": "xxx",

            "where": "",

            "splitPk": "PRM_KEY",

            "fetchSize": 512,
          }

        }

原因分析:

参考DATAX的阅读文档中,文章有提到这样的一个参数。

query作为额外的查询条件

解决方案:

使用query参数设定时间范围:
“dateNum”:-2,
“dateType”:“day”,
“query”:“{ “CRT_TM”:{“KaTeX parse error: Expected group as argument to '\"' at end of input: gte\":{\"date”:”%{nextDate}“}}}”,

具体实现如下:

"content": [

      {

        "reader": {

          "name": "mongodbreader",

          "parameter": {

      "column":[
   {
    "name":"CRT_TM",
    "type":"string"
   }
      ],
						"dateNum":-2,
                        "dateType":"day",
                        "query":"{ \"CRT_TM\":{\"$gte\":{\"$date\":\"%{nextDate}\"}}}",

          }
        },
                
		"writer": {
			
			"name": "mysqlwriter",             
			"parameter": {        
            "writeMode": "replace"
                    }
                }
      } 
    ]
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

DATAX:MongoDB增量数据写入到mysql中 的相关文章

  • 事务是什么?

    事务 xff1a 简单来说 xff0c 事务就是几个操作要作为一个处理单元来完成 xff0c 要么全部完成 xff0c 要么全部不完成 事务可以是一条SQL语句 xff0c 也可以是多条SQL语句或者整个程序 事务日志 xff1a 重做日志
  • 各种加解密算法比较

    一 加密 算法介绍 对称加密算法 对称加密算法用来对敏感数据等信息进行加密 xff0c 常用的算法包括 xff1a DES xff08 Data Encryption Standard xff09 xff1a 数据加密标准 xff0c 速度
  • 系统提示缺少libltdl.so.3

    今天安装heartbeat pils 2 1 4 11 el5 i386 rpm时 xff0c 显示 因为重新安装的linux xff0c 所以以前的一些操作都丢失了 xff0c 安装了一大堆的开发工具 34 Development lib
  • 5 essential skills every Web Developer should have?

    The idea here is that most of us should already know most of what is on this list But there just might be one or two ite
  • 安装的虚拟机没有了VMnet1

    虚拟的东西终归时有其缺陷的 xff0c 大家安装好虚拟机之后 xff0c 网络适配器中是有VMnat1和VMnat8俩块网卡的 xff0c VMnat1负责主机域虚拟机的host only通信 xff0c 而VMnat8则负责和虚拟机的na
  • 未来已来

    虚拟现实 xff0c 又称VR xff08 virtualreality xff09 xff0c 是一种综合利用计算机图形系统和现实中各种接口设备 xff0c 在计算机上生成可交互的沉浸式环境的技术 xff0c 可以将虚拟世界和现实世界实现
  • mount:No medium found

    使用vmware时 xff0c 科技将iso作为系统的镜像 但是 xff0c 在配置yum源的时候 xff0c 可能会遇到这样的问题 究其原因 xff0c 是由于镜像文件未启动 解决方法 xff1a 右击 xff0c 点击连接 xff0c
  • 什么叫跨平台语言

    什么叫跨平台语言呢 xff1f 今天就个人理解简单谈一下 xff0c 还望指正 简单的说 xff0c 就像插座和插头 xff0c 这世界上有没有完全通用的插座呢 xff1f 没有 但是比如某家公司 xff0c 制作了插座和插头 xff0c
  • rpm包管理功能全解

    通常在linux系统中 xff0c 服务是要通过程序来提供的 xff0c 通过调用各种接口编译好之后的源码包文件 xff0c 需要使用rpm xff08 redhat package manager xff09 命令来安装并提供相应的服务
  • 加密

    lt div id 61 34 article content 34 class 61 34 article content clearfix csdn tracking statistics 34 data pid 61 34 blog
  • 国内代码托管平台Gitee(码云)的入门使用

    网址在这 gt gt gt 码云官网地址 中文代码托管平台 xff0c 英文不好的话 xff0c 使用github一定的障碍 xff0c 所有gitee是很好的选择 文章目录 一 新建仓库二 AndroidStudio使用码云 xff08
  • Docker

    1 环境准备 官方网址 xff1a https docs docker com engine install centos CentOS 7 虚拟机 环境查看 root 64 localhost cat etc centos release
  • Idea kafka 远程 debug

    1 kafka kafka run class sh 修改 xff0c 总共两处需要修改 xff1a mhbtest 64 localhost kafka 2 11 1 0 1 vim bin kafka run class sh if l
  • 高质量嵌入式Linux C编程 .pdf

    http 链接 xff1a https pan baidu com s 10MjISMt0nNeVWo3L 8VCaQ 提取码 xff1a mxhh
  • 来到CSDN

    刚到CSDN 的时候写过一篇关于来到CSDN 的博客 感觉表达不够清楚在此修改一下 写博客是进入提高班后老师要求的 xff0c 至于老师为什么让我们写博客大家可以看这里 博客是需要用心经营的 开始的时候一直在网易上写 xff0c 后来发现网
  • 【毕设调试一】WiFi模块esp8266的调试

    硬件说明 xff1a span style color fe2c24 strong 提示 xff1a strong span span style color 0d0016 主控芯片STM32F103C8T6 xff0c 与WiFi通信串口
  • ubuntu系统下用kazam软件录制的视频不能在windows系统下播放的解决方案

    ubuntu系统下用kazam软件录制的视频不能在windows系统下播放的解决方案 参考文章 xff1a xff08 1 xff09 ubuntu系统下用kazam软件录制的视频不能在windows系统下播放的解决方案 xff08 2 x
  • Huawei LiteOS与freeRTOS、Ucos主流嵌入式操作内核的区别

    LiteOS与freeRTOS Ucos主流嵌入式操作内核的区别 云社区 华为云
  • ROS串口读取16进制的数据

    include lt ros ros h gt 必备 include lt serial serial h gt ROS已经内置了的串口包 include lt iostream gt 输入输出库 int main int argc cha
  • Linux C语言 串口与网络的数据透传/串口服务器

    Linux下C语言 串口与网络的数据透传 已经有开源的软件ser2net 实现了类似的功能 而且更加强大 支持telnet配置参数 而且越来越大 下载源码看了 实现比较复杂 网络上也有许多类似的 但最终还是决定自己写一个 网口采用TCP通信

随机推荐