Flink实战-(1)Flink-CDC MySQL同步到MySQL(select)

2023-11-12

背景

 

基于select语句的Flink-CDC 适用于数据同步的全量同步的场景,可以结合 Azkaban 或者dolphin scheduler 做定时调度 T+1 数据同步。

1、maven

   <properties>
        <flink.version>1.13.6</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <createDependencyReducedPom>false</createDependencyReducedPom>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>

                        <configuration>
                            <transformers>

                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!--如果要打包的话,这里要换成对应的 main class-->
                                    <mainClass>com.flink.cdc.demo.MysqlCdcMysql</mainClass>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                            <filters>
                                <filter>
                                    <artifact>*:*:*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>6</source>
                    <target>6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

2、MysqlReader

package com.flink.cdc.demo;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class MysqlReader extends RichSourceFunction<Tuple3<Integer, String, String>> {

    private Connection connection = null;
    private PreparedStatement ps = null;


    //该方法主要用于打开数据库连接,下面的ConfigKeys类是获取配置的类
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动
        connection = DriverManager.getConnection("jdbc:mysql://10.252.92.4:3306", "root", "root");//获取连接
        ps = connection.prepareStatement("select id,username,password from flink_cdc_test.t_test");
    }


    @Override
    public void run(SourceContext<Tuple3<Integer, String, String>> sourceContext) throws Exception {
        ResultSet resultSet = ps.executeQuery();
        while (resultSet.next()) {
            Tuple3<Integer, String, String> tuple = new Tuple3<Integer, String, String>();
            tuple.setFields(resultSet.getInt(1), resultSet.getString(2), resultSet.getString(3));
            sourceContext.collect(tuple);
        }
    }

    @Override
    public void cancel() {
        try {
            super.close();
            if (connection != null) {
                connection.close();
            }
            if (ps != null) {
                ps.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3、MysqlWriter

package com.flink.cdc.demo;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;



public class MysqlWriter extends RichSinkFunction<Tuple3<Integer, String, String>> {
    private Connection connection = null;
    private PreparedStatement ps = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        if (connection == null) {
            Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动
            connection = DriverManager.getConnection("jdbc:mysql://10.252.92.4:3306", "root", "root");//获取连接
        }

        ps = connection.prepareStatement("insert into ods_flink_cdc_test.ods_t_test values (?,?,?)");
        System.out.println("完成");
    }

    @Override
    public void invoke(Tuple3<Integer, String, String> value, Context context) throws Exception {
        //获取JdbcReader发送过来的结果
        try {
            ps.setInt(1, value.f0);
            ps.setString(2, value.f1);
            ps.setString(3, value.f2);
            ps.executeUpdate();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (ps != null) {
            ps.close();
        }
        if (connection != null) {
            connection.close();
        }
        super.close();
    }
}

4、主类MysqlCdcMysql

package com.flink.cdc.demo;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MysqlCdcMysql {
    public static void main(String[] args) throws Exception {
//        ExecutionEnvironment env  =  ExecutionEnvironment.createRemoteEnvironment("localhost",8081,"D:\\flink-steven\\target\\flink-0.0.1-SNAPSHOT.jar");

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "D:\\flink-steven\\target\\flink-0.0.1-SNAPSHOT.jar");
// 最好不要在程序设置并行度 如果设置了8 要保证安装的flink配置里面的parallelism这个参数大于8 不然会导致资源异常
//        env.setParallelism(8);
        DataStreamSource<Tuple3<Integer, String, String>> dataStream = env.addSource(new MysqlReader());
        dataStream.print();
        dataStream.addSink(new MysqlWriter());
        env.execute("Flink cost MySQL data to write MySQL");
    }
}

5、本地运行

6、打成jar包进行上传

注意:flink版本要和maven里的版本一致 scala版本也要保持一致 

7、运行

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink实战-(1)Flink-CDC MySQL同步到MySQL(select) 的相关文章

随机推荐

  • Go语言编程思想4——测试与性能调优

    Go语言编程思想4 测试与性能调优 Debugging Sucks Testing Rocks 多做测试 少做调试 Go语言使用表格驱动测试 一 传统测试 正确结果在前 函数结果在后 判断是否相等 测试逻辑和测试数据混在一起 出错信息不明确
  • .NET开发框架的选择历程

    去年来新公司后第一件事就是应领导要求 选择一款合适的开发框架 之前有熟悉的开发框架 Extjs NET简单三层 对于一般的企业后台管理系统够用了 但是现在Extjs这种前段端架基本被淘汰了 后端简单三层也显得非常落伍 但是对于选框架这个事情
  • zerotier使用教程_ZeroTier 初阶教程

    什么是 ZeroTier ZeroTier delivers the capabilities of VPNs SDN and SD WAN with a single system Manage all your connected re
  • stem函数--Matplotlib

    stem函数 Matplotlib 函数功能 Create a stem plot 创建棉棒图 A stem plot plots vertical lines at each x location from the baseline to
  • mysql常用命令有什么

    MySQL 数据库常用命令 1 MySQL常用命令 create database name 创建数据库 use databasename 选择数据库 drop database name 直接删除数据库 不提醒 show tables 显
  • 神经网络预测彩票数据

    一 人工智能深度学习神经网络在双色球彩票中的应用研究 一 https www cnblogs com zdz8207 p DeepLearning NeuralNetworks html 二 百度AI http ai baidu com p
  • js逆向-某399游戏登陆参数

    声明 本文仅供学习参考 禁止用于其他途径 违者后果自负 前言 目标网站 aHR0cHM6Ly93d3cuNDM5OS5jb20v 登陆接口 aHR0cHM6Ly9wdGxvZ2luLjQzOTkuY29tL3B0bG9naW4vbG9naW
  • 基于树莓派4B设计的智能家居系统(华为云IOT)

    基于树莓派的智能家居控制系统 华为云IOT 一 设计需求 前言 本次设计实现了一个基于树莓派的智能家居系统 可以对家庭环境进行实时监测和控制 提高居家安全性和舒适度 该系统采用了多种传感器和模块 包括温湿度传感器 烟雾传感器 火焰传感器 光
  • virtualenv: error: unrecognized arguments: --no-site-packages

    使用virtualenv version 看到自己的版本大于20 就可以将如下这段删除 export VIRTUALENVWRAPPER VIRTUALENV ARGS no site packages 我碰到的情况下 user makef
  • 机器学习算法之决策树

    原文 http www jianshu com p 6eecdeee5012 决策树是一种简单高效并且具有强解释性的模型 广泛应用于数据分析领域 其本质是一颗由多个判断节点组成的树 如 决策树 在使用模型进行预测时 根据输入参数依次在各个判
  • Go语言网络编程(socket编程)WebSocket编程

    1 WebSocket编程 1 1 1 webSocket是什么 WebSocket是一种在单个TCP连接上进行全双工通信的协议 WebSocket使得客户端和服务器之间的数据交换变得更加简单 允许服务端主动向客户端推送数据 在WebSoc
  • 关于epoll的IO模型是同步异步的一次纠结过程

    这篇文章的结论就是epoll属于同步非阻塞模型 这个东西貌似目前还是有争议 在新的2 6内核之后 epoll应该属于异步io的范围了 golang的高并发特性就是底层封装了epoll模型的函数 但也有文章指出epoll属于 伪AIO 真正的
  • IOS之同步请求、异步请求、GET请求、POST请求

    1 同步请求可以从因特网请求数据 一旦发送同步请求 程序将停止用户交互 直至服务器返回数据完成 才可以进行下一步操作 2 异步请求不会阻塞主线程 而会建立一个新的线程来操作 用户发出异步请求后 依然可以对UI进行操作 程序可以继续运行 3
  • PyQt5 笔记5 -- 消息框(QMessageBox)

    PyQt5 笔记5 消息框 QMessageBox 1 常用函数 函数原型 信息框 QMessageBox information self 框名 内容 按钮s 默认按钮 问答框 QMessageBox question self 框名 内
  • 西门子PLC学习笔记十-(计数器)

    S7 300 400的计数器一般占两个字节 是16位的 CPU最多可以使用64 512个计数器 计数器地址编号为C0 C511 1 S CUD 加减计数器 加减计数器波形图 2 S CU 加计数器 3 S CD 减计数器 4 三种计数器对应
  • Unity制作多屏幕解决方案

    最近制作了一个多屏幕的项目 多屏幕指的是一个电脑主机 连接多个显示器 我这个项目使用了一个显卡连接了三个显示设备 Unity UGUI提供Canvas画布 在画布上有一个TargetDisplay的解决解决方案 Canvas结合Camera
  • 摸不着头脑,flatMap处理后居然无法去重(原来是数据库添加字段的时候多了个空格= =)

    前言 这应该是一个bug 这是一篇毫无营养的博客 当我正在尝试从页面中获取作者时 发现之前应该被Set包装的作者字符串居然发生了重复 于是我赶紧回到源码处 并加了条log日志 开始排查问题 我的代码是这样的 查作者 绝对也会有 Set
  • 死锁产生的四个必要条件(缺一不可)

    死锁产生必须同时满足四个条件 只要其中任意一条不成立 死锁就不会发生 1 互斥条件 进程要求对所分配的资源进行排他性控制 即在一段时间内某项资源只被 一个进程所占有 此时若有其他进程请求该资源 则请求进程只能等待 如图一 2 非抢占 进程所
  • 解决OptiSystem安装、使用过程中遇到的问题

    系统环境 Win10系统 问题1 在上一篇文章 Optisystem7 0安装教程 Win10系统 中提到 安装过程中会弹出一个对话框 需要点击 忽略 但是安装过程中出现下图错误 错误代码 0x3 点击忽略 仍然会继续弹出这个对话框 或者第
  • Flink实战-(1)Flink-CDC MySQL同步到MySQL(select)

    背景 基于select语句的Flink CDC 适用于数据同步的全量同步的场景 可以结合 Azkaban 或者dolphin scheduler 做定时调度 T 1 数据同步 1 maven