DataX迁移MongoDB

2023-12-19

DataX迁移MongoDB

  • 项目地址: GitHub - alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。
  • 迁移MongoDB,读取组件为mongodbreader,写入组件为mongodbwriter

源码修改

  • 目前版本中,在迁移MongoDB时,若列的类型为二进制,mongodbreader未做处理,源码 src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java

    if (tempCol == null) {
        //continue; 这个不能直接continue会导致record到目的端错位
        record.addColumn(new StringColumn(null));
    }else if (tempCol instanceof Double) {
        //TODO deal with Double.isNaN()
        record.addColumn(new DoubleColumn((Double) tempCol));
    } else if (tempCol instanceof Boolean) {
        record.addColumn(new BoolColumn((Boolean) tempCol));
    } else if (tempCol instanceof Date) {
        record.addColumn(new DateColumn((Date) tempCol));
    } else if (tempCol instanceof Integer) {
        record.addColumn(new LongColumn((Integer) tempCol));
    }else if (tempCol instanceof Long) {
        record.addColumn(new LongColumn((Long) tempCol));
    } else {
        if(KeyConstant.isArrayType(column.getString(KeyConstant.COLUMN_TYPE))) {
            String splitter = column.getString(KeyConstant.COLUMN_SPLITTER);
            if(Strings.isNullOrEmpty(splitter)) {
                throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,
                    MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
            } else {
                ArrayList array = (ArrayList)tempCol;
                String tempArrayStr = Joiner.on(splitter).join(array);
                record.addColumn(new StringColumn(tempArrayStr));
            }
        } else {
            record.addColumn(new StringColumn(tempCol.toString()));
        }
    }
    
  • 修改为:

    if (tempCol == null) {
        //continue; 这个不能直接continue会导致record到目的端错位
        record.addColumn(new StringColumn(null));
    }else if (tempCol instanceof Double) {
        //TODO deal with Double.isNaN()
        record.addColumn(new DoubleColumn((Double) tempCol));
    } else if (tempCol instanceof Boolean) {
        record.addColumn(new BoolColumn((Boolean) tempCol));
    } else if (tempCol instanceof Date) {
        record.addColumn(new DateColumn((Date) tempCol));
    } else if (tempCol instanceof Integer) {
        record.addColumn(new LongColumn((Integer) tempCol));
    }else if (tempCol instanceof Long) {
        record.addColumn(new LongColumn((Long) tempCol));
    }  else if (tempCol instanceof Binary) {
        // 处理 MongoDB 的 Binary 类型数据
        Binary binaryData = (Binary) tempCol;
        byte[] binaryBytes = binaryData.getData();
        // 将字节数组添加到 DataX 中的二进制列
        record.addColumn(new BytesColumn(binaryBytes));
    } else {
        if(KeyConstant.isArrayType(column.getString(KeyConstant.COLUMN_TYPE))) {
            String splitter = column.getString(KeyConstant.COLUMN_SPLITTER);
            if(Strings.isNullOrEmpty(splitter)) {
                throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,
                    MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
            } else {
                ArrayList array = (ArrayList)tempCol;
                String tempArrayStr = Joiner.on(splitter).join(array);
                record.addColumn(new StringColumn(tempArrayStr));
            }
        } else {
            record.addColumn(new StringColumn(tempCol.toString()));
        }
    }
    
    • 修改源码后,要重新打包,由于只更改了mongodbreader,故在打包时,可以考虑将根

迁移脚本

  • 编写job脚本: 1.json

    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "mongodbreader",
                        "parameter": {
                            "address": ["ip1:27017"],
                            "collectionName": "data",
                            "column": [
    							{
                                    "name": "_id",
                                    "type": "long"
                                },
                                {
                                    "name": "fileContent",
                                    "type": "bytes"
                                }
    						],
                            "dbName": "monitor",
                            "userName": "root",
                            "userPassword": "123456",
    						"query": {
    						  "_id": {
    							"$lt": 21
    						  }
    						}
                        }
                    },
                    "writer": {
                        "name": "mongodbwriter",
                        "parameter": {
                            "address": ["ip2:27017"],
                            "collectionName": "data",
                            "column": [
    							{
                                    "name": "_id",
                                    "type": "long"
                                },
                                {
                                    "name": "fileContent",
                                    "type": "bytes"
                                }
    						],
    						"writeMode": {
    						  "isReplace": "true",
    						  "replaceKey": "_id"
    						}
                            "dbName": "test",
    						"userName": "root",
                            "userPassword": "123456",
                        }
                    }
                }
            ],
            "setting": {
                "speed": {
                    "channel": "2"
                }
            }
        }
    }
    
    • reader 中的 query 节点为查询条件,上述demo中是查询 _id 小于21的记录。
  • 执行命令:

    python datax.py G:\Code\1.json
    
    • datax.py 在打包后的target目录下,相对路径: target\datax\datax\bin
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

DataX迁移MongoDB 的相关文章

随机推荐

  • CRM系统在IT软件行业的重要性和价值 | 发展趋势和选择理由

    如今 IT软件行业面临诸多问题 如推广成本的增加和交易周期的延长 很多销售人员埋怨产品好 有需求 但最终没有做到买卖的流程 为了改善这种业务问题 CRM管理系统是一个不错的选择 那样 IT软件行业CRM系统的好处是什么呢 IT软件行业面临哪
  • 金融CRM有用吗?金融行业CRM有哪些功能

    市场形式波诡云谲 金融行业也面临着资源体系分散 竞争力后继不足 未知风险无法规避等问题 金融企业该如何解决这些问题 或许可以了解一下CRM管理系统 和其提供的 金融行业CRM解决方案 金融行业是银行业 保险业 信托业 证券业和租赁业的总称
  • 普通USB摄像头转为网络摄像头

    普通USB摄像头转为网络摄像头 2022 02 25 Raspberry Pi Zero W吃灰有一段时间了 想着能否废物利用 使用普通USB摄像头改成一个RTSP协议网络摄像头 1 查看摄像头是否可用 查看是否已识别USB摄像头 lsus
  • 井盖位移传感器作用一览,井盖出现位移如何预警

    在城市的公共马路 小区住宅或工厂区路面上随处可见各种各样的井盖 不仅材质不一 而且每一个井盖背后的存在意义也是不同的 但是这些井盖有一个共同的特点 便是地上地下城市生命线的关键连接点 无论哪一种类型的井盖出现问题 即便是轻微的位移或者翻转
  • 解决Electron应用中的白屏问题的实用方法

    在使用Electron构建应用程序时 一些开发者可能会面临窗口加载过程中出现的白屏问题 这种问题主要分为两个方面 Electron未加载完毕HTML 这时Electron自身产生的白色背景可能导致用户在启动应用时看到一片空白 HTML加载渲
  • Java——关于实现多线程的测试小题,帮助我们更好的理解多线程的使用方法

    前面讲解了关于多线程的使用方法 这篇文章则是进行实战 做几道测试题 感兴趣的情况下可以看一下 Java多线程 多线程练习1 卖电影票 一共有1000张电影票 可以在两个窗口领取 假设每次领取的时间为3000毫秒要求 请用多线程模拟卖票过程并
  • ubuntu 20.04 prometheus-alertmanager

    prometheus alertmanager prometheus alertmanager focal updates focal security 0 15 3 ds 3ubuntu1 1 amd64 prometheus xmpp
  • Ceph入门到精通-smartctl 查看硬盘参数

    smartctl 参数含义 Model Family Toshiba s Enterprise Capacity HDD Device Model TOSHIBA MG08ACss Serial Number sssssss LU WWN
  • 华为mpls vpn 跨域方案B

    跨域方案B原理 缺点是两个as如果有多个ce的话 asbr pe压力大 1 pe和P都和单域一样配置 只是asbr pe配置不同 2 2个asbr pe配置上面不需要建立ip vpn instance 实例 3 2个asbr pe互联接口上
  • 坦克大战(二)

    欢迎来到程序小院 坦克大战 二 玩法 键盘 A W S D 键来控制方向 空格键发射子弹 N 下一关 P 上一关 Enter 开始 赶紧去闯关吧 开始游戏 https www ormcc com play gameStart 221 htm
  • u盘突然乱码然后文件都不见了怎么办

    在我们日常使用电脑时 U盘作为常用的移动存储设备 扮演了重要的角色 然而 有时我们可能会遇到U盘突然出现乱码并无法访问文件的问题 这不仅让人感到困惑 还可能丢失重要的数据 本文旨在分享几种解决U盘乱码文件不可见问题的方法 帮助您尽快恢复U盘
  • 桥梁结构健康监测系统的效果和作用

    随着城市化进程的加速 基础设施的重要性日益凸显 其中桥梁作为连接城市各个区域的交通枢纽 其结构安全对于城市的正常运行至关重要 为了全方位的保障有关于桥梁结构健康的安全性 万宾科技采用全新的科技理念 打造全套桥梁监测系统 WITBEE 万宾
  • 一款批量Linux应急响应检查工具

    fireman 简介 fireman用于在维护多台服务器并且需要定时检查服务器状态的场景下 使用自带命令可一键获取相关资源信息 排查服务器是否存在可疑用户 非法外连 文件更改等高危事件 使用 res模块 用于管理资源信息 添加资源 编辑资源
  • 机器学习 项目结构

    需求 我的项目文件夹下有许多文件 我想把我的项目单独放到一个文件夹 我的封装的模块放到一个一个文件夹方便管理 我该怎么做 这样做之后 主程序调用子模块需要在接口函数中调整路径吧 解决 将项目单独放到一个文件夹并将封装的模块放到另一个文件夹是
  • 谷歌Chrome浏览器无法安装插件的解决方法

    Google Chrome是一款由Google公司开发的网页浏览器 该浏览器基于其他开源软件撰写 包括WebKit 目标是提升稳定性 速度和安全性 并创造出简单且有效率的使用者界面 使用谷歌浏览器安装扩展插件的时候有时会遇到无法安装问题 解
  • 机器学习---决策树

    介绍 决策树和随机森林都是非线性有监督的分类模型 决策树是一种树形结构 树内部每个节点表示一个属性上的测试 每个分支代表一个测试输出 每个叶子节点代表一个分类类别 通过训练数据构建决策树 可以对未知数据进行分类 随机森林是由多个决策树组成
  • 井盖出现位移怎么办?井盖传感器效果一览

    井盖位移 井盖倾斜 井盖翻转 各种各样的问题应该怎么解决呢 井盖是城市基础设施建设过程之中不容忽视的一个重要部分 但是由于各种外界影响或者是内部的原因 可能会导致井盖出现位移等异常的现象 这不仅影响了路面的平整度 而且还可能会对路过的行人和
  • orcle定时器表达式梳理

    Oracle 定时任务执行时间间隔学习笔记 oracle 定时任务每隔1小时 CSDN博客 Oracle job 定时器的执行时间间隔也是定时器job 的关键设置 在这一设置上 开始还没掌握 总是不知道怎么写 现总结如下 其实主要是使用了T
  • 设计模式 原型模式 与 Spring 原型模式源码解析(包含Bean的创建过程)

    原创 疯狂的狮子Li 狮子领域 程序圈 2023 12 19 10 30 发表于辽宁 原型模式 原型模式 Prototype模式 是指 用原型实例指定创建对象的种类 并且通过拷贝这些原型 创建新的对象 原型模式是一种创建型设计模式 允许一个
  • DataX迁移MongoDB

    DataX迁移MongoDB 项目地址 GitHub alibaba DataX DataX是阿里云DataWorks数据集成的开源版本 迁移MongoDB 读取组件为mongodbreader 写入组件为mongodbwriter 源码修