FlinkCDC第四部分-同步mysql到mysql,ctrl就完事~(flink版本1.17.1)

2023-11-06

 本文介绍了不同源单表-单表同步,不同源多表-单表同步。

注:此版本支持火焰图

Flink版本:1.17.1

环境:Linux CentOS 7.0、jdk1.8

基础文件:

flink-1.17.1-bin-scala_2.12.tgz

flink-connector-jdbc-3.0.0-1.16.jar、(maven仓库目录:corg.apache.flink/flink-connector-jdbc/3.0.0-1.16)

flink-sql-connector-mysql-cdc-2.3.0.jar、(maven仓库目录:com.ververica/flink-sql-connector-mysql-cdc/2.3.0)
安装Flink步骤详见文章第二篇

支持的mysql版本: 

一、 数据源ip为***.51的源表,同步数据到数据源ip为***.50的目标表中,需要以下几个步骤:

1. 启动flink服务:

[root@localhost bin]#  ./start-cluster.sh

2. 停止flink服务:

[root@localhost bin]#  ./stop-cluster.sh

3. 启动FinkSQL:

[root@localhost bin]# ./sql-client.sh

4. 编写FlinkSql,创建临时表和job:

FlinkSql与mysql字段的类型映射

 把写好的Sql粘贴到FlinkSql客户端命令行中,分号'  ;  '是语句结束标识符,按回车创建:

 创建来源表结构:

来源表链接类型为'connector' = 'mysql-cdc'

Flink SQL> CREATE TABLE source_alarminfo51 (
>   id STRING NOT NULL,
>   AlarmTypeID STRING,
>   `Time` timestamp,
>   PRIMARY KEY (`id`) NOT ENFORCED
>  ) WITH (
>     'connector' = 'mysql-cdc',
>     'hostname' = '***',
>     'port' = '3306',
>     'username' = '***',
>     'password' = '***',
>     'database-name' = 'alarm',
>     'server-time-zone' = 'Asia/Shanghai',
>     'table-name' = 'alarminfo'
>  );

[INFO] Execute statement succeed.

 创建目标表结构(目标表结构可比来源表字段多,可使用视图指定字段默认值):

目标表链接类型为'connector' = 'jdbc',注意url需要跟后面以下属性值

Flink SQL> CREATE TABLE target_alarminfo50 (
>   id STRING NOT NULL,
>   AlarmTypeID STRING,
>   `Time` timestamp
>   PRIMARY KEY (`id`) NOT ENFORCED
>  ) WITH (
>     'connector' = 'jdbc',
>     'url' = 'jdbc:mysql://***:3306/alarm?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&useSSL=true&dontTrackOpenResources=true&defaultFetchSize=10000&useCursorFetch=true',
>     'username' = '***',
>     'password' = '****',
>     'table-name' = 'alarminfo',
>     'driver' = 'com.mysql.cj.jdbc.Driver'
>  );

[INFO] Execute statement succeed.

 最后创建同步关系:

INSERT INTO target_alarminfo50 SELECT * FROM source_alarminfo51;

如下:

创建完表结构可使用下列语句查看和删除:

查看表:show tables;

删除表:drop table if exists  target_alarminfo; 

flink-UI页面效果:

打开火焰图:

编辑flink-conf.yaml:最后面添加 

rest.flamegraph.enabled: true

配置后重启flink服务,重新创建任务。

火焰图效果:

数据同步效果:

源表:

目标表数据:首次数据全量,后面数据变更增量 

 注:

在分析火焰图时,可以关注以下几点:
函数的执行时间:纵向的轴显示了函数的嵌套层级,越往下表示越深层的函数调用。横向轴表示时间,通过不同颜色的方块来表示函数的执行时间。
热点函数:寻找占据执行时间大部分的函数,这些函数可能是需要优化的关键点。
函数之间的关系:观察函数之间的调用关系,查看是否有不必要的函数调用或循环。
I/O 操作:关注是否有大量的数据读取、写入或网络通信,这可能是性能瓶颈的来源。
根据火焰图的分析结果,您可以进一步定位和排查潜在的性能问题,并在代码、配置或资源分配方面进行优化。
请注意,为了准确分析火焰图,建议在负载较高的情况下生成火焰图,并保持足够的监视时间。此外,Flink 的火焰图功能在生产环境中可能会造成一定的开销,因此建议在测试或开发环境中使用。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

FlinkCDC第四部分-同步mysql到mysql,ctrl就完事~(flink版本1.17.1) 的相关文章

随机推荐