flinkCDC+iceberg(hadoop catalog) 同步mysql数据库数据

2023-11-09

flink可以实现hadoop catalog 和hive catalog 。

前者映射hdfs地址,实现数据存储

后者映射hive表地址,用于有关hive的数据处理及其他项目实操。

1. 环境准备

官网:https://iceberg.apache.org/

1. hadoop 一定要 分布式集群

2.需要flink-connector-mysql-cdc-1.4.0.jar 即mysql-cdc连接器放到flink 的 lib下,为了连接数据库

3. 需要将iceberg 0.13.1的jar包放在${FLINK_HOME}/lib下

4. 开启mysql的binlog日志,mysql需要开启binlog日志,需要修改/etc/my.cnf文件,加上如下两段代码。

server-id=1
log-bin=mysql-bin 


本文版本:版本 flink 1.13.6+iceberg 0.13.1

2.启动flink环境

#启动flink环境
${FLINK_HOME}/bin/start-cluster.sh


#启动flinksql
${FLINK_HOME}/bin/sql-client.sh

3.放纵时刻

#流操作
SET execution.runtime-mode=streaming;

##手动设置checkpoint时间 
set execution.checkpointing.interval=5sec;

####数据库连接
CREATE TABLE mysql_binlog(
    name STRING ,
    cust_id INT,
    age INT,
    PRIMARY KEY (cust_id) NOT ENFORCED
)WITH (
'connector' = 'mysql-cdc',
'hostname' = 'hadoop01',
'port' = '3306',
'username' = 'root',
'password' = 'pwd',
'server-time-zone' = 'Asia/Shanghai',
'debezium.snapshot.mode'='initial',
'database-name' = 'FlinkCDC',
'table-name' = 'person'
);


#检查数据库连接数据是否正常
select * from mysql_binlog;


#创建iceberg表
CREATE TABLE sample(
    name STRING ,
    cust_id INT,
    age INT,
    PRIMARY KEY (cust_id) NOT ENFORCED
)WITH (
'connector'='iceberg',
'catalog-name'='iceberg_catalog',  --catalog名称
'catalog-type'='hadoop',  --创建的为hadoop的catalog
'warehouse'='hdfs://hadoop01:8020/iceberg/test2',  --指定数据位置
'format-version'='2'  --版本 2 支持带主键的表upsert但是仍然无法流式查询。版本 1 不支持带主键的表upsert
);

##导入数据到iceberg
insert into sample select * from mysql_binlog;

##检查数据是否导入成功
select * from sample;

丰富知识:
iceberg使用代码合并小文件后如果不开启此配置,历史文件同样不会删除,开启后就会实现合并后清除历史文件
    启用提交后写入元数据删除
    write.metadata.delete-after-commit.enabled=true
    配置保留历史数量(比如配置为5,则元数据和数据都保留5份历史数据和1份最新数据)
    write.metadata.previous-versions-max=5    

历史文件保留最大值为5,metadata和data里面文件数则始终保持为6个

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

flinkCDC+iceberg(hadoop catalog) 同步mysql数据库数据 的相关文章

  • 非 hdfs 文件系统上的 hadoop/yarn 和任务并行化

    我已经实例化了 Hadoop 2 4 1 集群 并且发现运行 MapReduce 应用程序的并行化方式会有所不同 具体取决于输入数据所在的文件系统类型 使用 HDFS MapReduce 作业将生成足够的容器 以最大限度地利用所有可用内存
  • 获取在任何日期创建的表的列表?

    我遇到了这样的情况 我想查找我在 2012 年 9 月 14 日 2012 年 9 月 14 日 在 sql server 上创建的表 是否有任何查询会列出在此日期创建的这些表 SELECT FROM sys tables WHERE cr
  • SQL Server:为什么 ISO-8601 格式的日期依赖于语言?

    我需要一些帮助来理解 SQL Server 中的日期格式处理 如果您尝试以下操作 它将返回正确的结果 SET LANGUAGE English SELECT CAST 2013 08 15 AS DATETIME 2013 08 15 00
  • SQL 使用另一列的键和最大值设置列

    我需要根据同一 ID 的 duration 列的最大值更新 max register 列 将值设置为 1 其他值设置为 0 初始表 Id duration max register 1 0 0 1 7 0 1 3 0 2 10 0 2 5
  • 如何用约束标记一大组“传递群”?

    在 NealB解决方案之后进行编辑 与以下解决方案相比 NealB的解决方案非常非常快任何另一个 https stackoverflow com q 18033115 answers and 提出了关于 添加约束以提高性能 的新问题 Nea
  • SQL - 需要查找重复记录但排除反向事务

    我有一张交易表 偶尔会有 重复条目 如果 当管理员发现这些重复条目时 他们将撤销交易 从而创建负值 但由于监管要求 原始重复条目仍然保留 我想创建一个 SQL 查询 并使用 Crystal Reports 来制作报告 以便管理员轻松查找重复
  • ORA-12728: 正则表达式中的范围无效

    我想检查表中是否插入了有效的电话号码 所以我的触发代码在这里 select start index into mob index from gmarg mobile operators where START INDEX substr ne
  • meta_query,如何使用关系 OR 和 AND 进行搜索?

    已解决 请参阅下面的答案 我有一个名为的自定义帖子类型BOOKS 它有几个自定义字段 名称为 TITLE AUTHOR GENRE RATING 我该如何修复我的meta query下面的代码以便仅books在自定义字段中包含搜索词 tit
  • 解析错误:语法错误,意外的 T_RETURN [关闭]

    这个问题不太可能对任何未来的访客有帮助 它只与一个较小的地理区域 一个特定的时间点或一个非常狭窄的情况相关 通常不适用于全世界的互联网受众 为了帮助使这个问题更广泛地适用 访问帮助中心 help reopen questions 遇到这个问
  • 处理与不同相关实体的一对多的正确模式

    我有一个 C 项目 我使用实体框架作为 ORM 我有一个User 可以向多家银行付款 每家银行都是一个独立的实体 并且每家银行都由不同的字段描述 问题是 一User可以没有或有很多不同的Banks 我不太确定如何对此进行建模 临时解决方案是
  • 如果 Oracle SQL 中存在视图,则删除视图[重复]

    这个问题在这里已经有答案了 我是 Oracle 数据库系统的新手 Oracle 12c 中以下 SQL 语句的等效项是什么 DROP VIEW IF EXIST
  • SQL:如何从一个表中获取另一个表中每一行的随机行数

    我有两个数据不相关的表 对于表 A 中的每一行 我想要例如表 B 中的 3 个随机行 使用光标这相当容易 但速度非常慢 那么我该如何用单个语句来表达这一点以避免 RBAR 呢 要获得 0 到 N 1 之间的随机数 可以使用 abs chec
  • HDFS:使用 Java / Scala API 移动多个文件

    我需要使用 Java Scala 程序移动 HDFS 中对应于给定正则表达式的多个文件 例如 我必须移动所有名称为 xml从文件夹a到文件夹b 使用 shell 命令我可以使用以下命令 bin hdfs dfs mv a xml b 我可以
  • 针对约 225 万行的单表选择查询的优化技术?

    我有一个在 InnoDB 引擎上运行的 MySQL 表 名为squares大约有 2 250 000 行 表结构如下 squares square id int 7 unsigned NOT NULL ref coord lat doubl
  • 如何将今天的日期返回到 Oracle 中的变量

    我想做这个 DECLARE today as smalldatetime SELECT today GetDate 但我需要一个oracle翻译 甲骨文使用SYSDATE 还有 ANSI 标准CURRENT TIMESTAMP 除其他外 S
  • 如何在 SQL Server 2012 中选择除一列之外的所有列? [复制]

    这个问题在这里已经有答案了 有没有一种方法可以选择所有列 但只选择我不想选择的特定列 我的意思是有时我会遇到这样的问题 表有数百个字段 而我只需要删除一个字段 我需要重写所有列吗 有什么窍门吗 喜欢select
  • hive - 在值范围之间将一行拆分为多行

    我在下面有一张表 想按从开始列到结束列的范围拆分行 即 id 和 value 应该对开始和结束之间的每个值重复 包括两者 id value start end 1 5 1 4 2 8 5 9 所需输出 id value current
  • CONTAINS 不适用于 Oracle Text

    我在执行此查询时遇到问题 SELECT FROM gob attachment WHERE CONTAINS gob a document java gt 0 它给了我 ORA 29902 error in executing ODCIIn
  • Oracle REGEXP_INSTR() 和“a-z”字符范围与预期不匹配

    我想用REGEXP INSTR 在 oracle 数据库中检查小写 大写字符 我知道 upper and lower POSIX 字符类 但我选择了a z这给了我非常奇怪的结果 我不明白 有人可以解释一下吗 SELECT REGEXP IN
  • 如何使用 SQL 查询创建逗号分隔的列表?

    我有 3 个表 名为 应用程序 ID 名称 资源 id 名称 应用程序资源 id app id resource id 我想在 GUI 上显示所有资源名称的表格 在每一行的一个单元格中 我想列出该资源的所有应用程序 以逗号分隔 所以问题是

随机推荐

  • ARM NEON编译优化

    NEON被设计为附加的加载 存储架构 以提供良好的矢量化 编译器对c c 等语言有良好的支持 这样可以实现很高水平的并行性 开发者可以为需要高性能的应用程序编写NEON指令来实现相应功能 最重要的是它实现了访问交叉存储在内存中的多个数据流并
  • 对解数独问题的归纳

    解数独 我们都可能玩过或者了解知道 就是数独游戏 数独是一种运用纸 笔进行演算的逻辑游戏 玩法 在空格里填入数字1到9 使得每一行 每一列和每一个用粗线围起来的3 3的九个单元格里 填数都包含1到9各一个 而利用电脑怎样实现呢 怎样将我们的
  • xshell链接服务器报错To escape to local shell, press ‘Ctrl+Alt+]‘

    关掉代理即可
  • C语言学习日记(2)——写个Hello,World测试一下

    感觉工具已经安装好了 但到底怎么样还要试过才知道 那就写一个经典的Hello World程序测试一下吧 首先打开vscode 新建一个文件 输入代码 代码字体颜色全都是白色的 看到状态栏右下角 原来文件格式还是Plain Text 设置一下
  • 2021各厂免费云服务器申请攻略(阿里云/腾讯云/华为云)

    阿里云腾讯云华为云一直都有免费云服务器提供 企业用户个人用户可以申请的免费云服务器配置及免费时长都不同 云服务器吧来详细说下阿里云免费云服务器 腾讯云免费云服务器和华为云免费服务器申请地址 申请条件及申请攻略 阿里云免费云服务器 阿里云免费
  • RabbitMQ-推(push)模式

    推 push 模式 采用Basic Consume进行消费 关键代码
  • Python3学习笔记:Python中的None和空字符串''

    一 定义 None python中的内建常数 是NoneType中唯一的值 且不能赋值 一般用于函数中表示参数的缺省 空字符串 代表是字符串 None print type None print bool None 执行结果
  • CSDN竞赛第37期题解

    CSDN竞赛第37期题解 1 题目名称 幼稚班作业 幼稚园终于又有新的作业了 老师安排同学用发给同学的4根木棒拼接成一个三角形 当然按照正常的逻辑 如果不能拼 接成三角形 必然要折断某个木棍来拼接三角形 可是懒惰的小艺当然不会费力了 如果拼
  • 从男士正装切入服装租赁领域,The Black Tux 获 2500 万美元 B 轮融资

    近日 自营男士正装租赁品牌The Black Tux宣布获 2500 万美元 B 轮融资 本轮由 Stripes Group 领投 该机构还对时尚品牌 Reformation 进行过注资 其他投资方还包括 First Round Capit
  • Eslint-plugin-vue 报警告问题解决

    问题描述 eslint plugin vue vue valid v for Custom elements in iteration require 提示警告 解决办法 修改首选项的默认用户设置 如下图 设置将Eslint vetur 校
  • 【Grafana】CentOS下安装Grafana

    Grafana CentOS下安装Grafana 本文主要是在CentOS下对Grafana的安装和配置等进行介绍 以及一些踩过的坑 文章目录 Grafana CentOS下安装Grafana 一 基本介绍 二 安装方法 1 从YUM存储库
  • c# uint[]和int[] 转 byte[]的几种方式

    uint UintArray byte ByteArray 1 for循环转 for int i 0 i
  • UE4插件研发 So Easy

    UE4插件研发 UE4插件扫盲 UE4插件是什么 UE4插件的作用 UE4引擎 项目 插件的区别 引擎自带的插件浏览器 插件目录结构 插件配置文件说明 UE4插件的创建方法 插件代码的执行过程 四种常用模式的插件模板 第三方库的引入 牛刀小
  • PCB为什么不能直角走线?

    最初学习PCB设计时 很多老师说过 注意不要走直角 很多人也认为优秀的电子工程师都应该在PCB电路设计时避免直角走线 但事实上 PCB一定不能直角走线吗 一 能不能直角走线 电路频率说了算 PCB并不是绝对不能直角走线 而是视电路情况而定
  • Headless CMS - 打破“设计优先”的怪圈

    什么是 Headless CMS 为什么 Headless CMS 带有真正的革命性 因为它严格的将内容和格式分离 使我们回归到内容管理的本源 这种变化必然会带来一些不确定性 因此 在开始您的第一个 CMS 项目之前 了解 Headless
  • 断触问题分析思路

    在使用手机的时候 有时候会出现触碰中断的异常问题 比如点击无效 已经存在的触碰事件突然消失 这种问题很直观 用户体验很差 如果能够复现问题 抓到实时log 分析起来会清楚很多 可以打开开发者选项中的指针轨迹 这个触碰轨迹的实现是使用了Poi
  • 看门狗定时器

    看门狗定时器 WDT Watch Dog Timer 是单片机的一个组成部分 它实际上是一个计数器 一般给看门狗一个数字 程序开始运行后看门狗开始倒计数 如果程序运行正常 过一段时间CPU应发出指令让看门狗复位 重新开始倒计数 如果看门狗减
  • 报错torch._C._LinAlgError: cusolver error: CUSOLVER_STATUS_EXECUTION_FAILED

    解决方法 将使用torch linalg模块或torch inverse语句中变量移到CPU 或者用更旧的pytorch版本例如 PyTorch 1 10 0 cu111 即torch inverse data 改为 device data
  • 最详细的Spark内存管理

    spark 各版本的内存参数 一 Spark 1 6内存管理 spark 1 6之前 使用StaticMemoryManager 叫legacy模式 默认是关闭的 spark1 6开始 使用UnifiedMemoryManager 1 6开
  • flinkCDC+iceberg(hadoop catalog) 同步mysql数据库数据

    flink可以实现hadoop catalog 和hive catalog 前者映射hdfs地址 实现数据存储 后者映射hive表地址 用于有关hive的数据处理及其他项目实操 1 环境准备 官网 https iceberg apache