flink-cdc 实现oracle 实时同步到kudu

2023-10-26

其实网上也有很多相关话题的代码实现,但是发现有很多坑。

在 腾讯官方文档中 有介绍,但是屏蔽了很多细节,我做了以下四点修改才能正常运行。

1.前置条件

保证oracle中相关表开启了归档日志和补充日志,因为flink-cdc基于debezium的logminer实现的。

2.进入调试阶段

问题一. maven依赖需要本地install

文档中使用maven依赖如下

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-oracle-cdc</artifactId>
    <version>2.2-SNAPSHOT</version>
    <!-- 此处依赖需要设置为 scope,其他 flink 依赖需设置为 provied-->
    <scope>compile</scope>
</dependency>

实际过程中在maven仓库中并没有此pom,我使用的是2.2.1版本,而且此版本需要oracle 19c的驱动,自己手动从官网下载jar包,然后mvn install 。

问题二.oracle字段名要大写,否则 oracle的变更数据会获取不到

   tEnv.executeSql("CREATE TABLE `oracleSource` (\n" +
                //字段名大写
                " PK      BIGINT,\n" +     
                " BRANCHID    VARCHAR,\n" +
                " PRIMARY KEY(PK) NOT ENFORCED )\n" +
                " WITH (\n" +
                "  'connector' = 'oracle-cdc',\n" +
                // 请修改成 Oracle 所在的实际 IP 地址
                "  'hostname' = 'xxx',\n" +
                "  'port' = '1521',\n" +
                "  'username' = 'xxx',\n" +
                "  'password' = 'xxx',\n" +
                "  'database-name' = 'xxx',\n" +
                "  'schema-name' = 'xxx',\n" +
                "  'table-name' = 'xxx',\n" +
                 " 'scan.startup.mode' = 'initial'\n"+
                 ")");

问题三.kudu的端口需要开通,kudu master的7051端口,kudu tablet server 的7050端口,否则数据写入kudu报错

问题四.kudud的表名要完整 ,当整个链路都是通的时,发现kudu表没有数据,后来根据 kudu webui的 tables里查看后修改表名后才正常

 tEnv.executeSql("CREATE TABLE `my_second_table_kudu` (\n" +
                " `id`    BIGINT,\n" +
                " `name`  VARCHAR\n" +
                ") WITH (\n" +
                " 'connector.type' = 'kudu',\n" +
                // 请修改成实际的 master IP 地址
                " 'kudu.masters' = 'xxx:7051',\n" +
//kudu.table 以kudu webui的tables 的表名为准
                " 'kudu.table' = 'impala::default.my_second_table',\n" +
                " 'kudu.hash-columns' = 'id',\n" +
                " 'kudu.primary-key-columns' = 'id'\n" +
                ")");

问题五. classNotFoundEexeption 

java.lang.ClassNotFoundException: org.apache.flink.shaded.guava18.com.google.common.util

还是老问题,把依赖引入正常,但是接下来又报错找不到

 org.apache.flink.shaded.guava30.com.google

后来通过新建lib包 add library的方式将flink-shaded-guava-30.1.1-jre-15.0.jar包引入

程序运行终于正常!!!

完整代码如下

package com.xxx.xxx;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


public class OracleToKudu {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        //使用本地webui进行调试
        conf.setInteger(RestOptions.PORT,8082);
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv,settings);

        // SQL 写法
         tEnv.executeSql("CREATE TABLE `oracleSource` (\n" +
               //字段大写
                " PK      BIGINT,\n" +
                " BRANCHID    VARCHAR,\n" +
                " PRIMARY KEY(PK) NOT ENFORCED )\n" +
                " WITH (\n" +
                "  'connector' = 'oracle-cdc',\n" +
                // 请修改成 Oracle 所在的实际 IP 地址
                "  'hostname' = 'xxx',\n" +
                "  'port' = '1521',\n" +
                "  'username' = 'xxx',\n" +
                "  'password' = 'xxx',\n" +
                "  'database-name' = 'xxx',\n" +
                "  'schema-name' = '<placeholder>这里需要大写',\n" +
                "  'table-name' = '<placeholder>这里需要大写',\n" +
                 " 'scan.startup.mode' = 'initial'\n"+
                 ")");

       // TableResult tableResult =
        tEnv.executeSql("create view vw_oracleSource as select PK AS id,BRANCHID AS name from oracleSource");

//
        tEnv.executeSql("CREATE TABLE `my_second_table_kudu` (\n" +
                " `id`    BIGINT,\n" +
                " `name`  VARCHAR\n" +
                ") WITH (\n" +
                " 'connector.type' = 'kudu',\n" +
                // 请修改成实际的 master IP 地址
                " 'kudu.masters' = 'xxx:7051',\n" +
                " 'kudu.table' = 'impala::default.my_second_table',\n" +
                " 'kudu.hash-columns' = 'id',\n" +
                " 'kudu.primary-key-columns' = 'id'\n" +
                ")");
        //使用upsert模式
        tEnv.executeSql("UPSERT into my_second_table_kudu select id, name from vw_oracleSource");

       sEnv.execute("kudu_demo1");
    }
}

maven依赖如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-oracle-kudu</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <flink.version>1.13.5</flink.version>

    </properties>
<dependencies>
    <!-- https://mvnrepository.com/artifact/com.ververica/flink-connector-oracle-cdc -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-oracle-cdc</artifactId>
        <version>2.2.1</version>
    </dependency>



    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>


   <dependency>
     <groupId>org.apache.flink</groupId>
      <artifactId>flink-shaded-guava</artifactId>
     <version>18.0-11.0</version>
   </dependency>


    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-kudu -->
    <dependency>
        <groupId>org.apache.bahir</groupId>
        <artifactId>flink-connector-kudu_2.11</artifactId>
        <version>1.1.0</version>

    </dependency>

    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.15</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-nop -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-nop</artifactId>
        <version>1.7.15</version>
    </dependency>

</dependencies>

<bulid 省略>
</project>

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

flink-cdc 实现oracle 实时同步到kudu 的相关文章

  • 统一日志处理切面

    import cn hutool core util StrUtil import cn hutool core util URLUtil import cn hutool json JSONUtil import com macro ma

随机推荐

  • 如何通过百度翻译实现整站网页翻译

    gt 通过百度翻译接口开通 小飞兔 整站翻译功能 英文网站翻译成中文网站 中文网站翻译成英文网站 本软件是通过百度翻译接口 将英文网页翻译成中文网页或中文网页翻译成英文网页 逐一翻译文本内容 并保持原页面的样式结构 一 下载软件https
  • python脚本——selenium自动化执行一些网页上的操作

    文章目录 一 说明 二 代码 三 用法总结 一 说明 通过python的selenium模块 自动化执行一些网页上的重复的无聊的工作 二 代码 usr bin python3 6 from time import sleep import
  • Java对象复制(直接赋值,浅拷贝,深拷贝)

    目录 Java对象复制 1 直接赋值 2 浅拷贝 3 深拷贝 4 序列化拷贝 Java对象复制 将一个对象的引用复制给另一个对象 一共有三种方式 第一种是直接赋值 第二种方式是浅拷贝 第三种是深拷贝 这三种方式实际上都是为了拷贝对象 1 直
  • Python搭建QQ机器人,监控QQ发言与进群退群,自动@新成员并屏蔽脏话踢人

    目录 前言 原理 准备 go cqhttp插件安装 nonebot框架的安装 插件的拓展 效果 警告 前言 emmmm 大家好 我叫善念 有点懒 好久没有给大家更新教程了 今天给大家带来的项目比较有趣 而且非常简单 简称 有手即可 很多人应
  • FFmpeg教程(一) 视音频基础知识

    视频播放器的原理 封装格式 作用 视频码流和音频码流按照一定的格式储存在一个文件汇总 视频编码数据 作用 将视频像素数据 RGB YUV等 压缩成为视频码流 从而降低视频的数据量 音频编码数据 作用 将音频采样数据 PCM等 压缩成为音频码
  • MySQL 数据库创建 表间 关系 设置外键

    将表中已有字段设置 外键 似乎不能设置为主键即使定义时没有定义主键也会报错 如 Multiple primary key defined 1 添加新字段 alter table 表名 add 字段名 字段描述 alter table stu
  • 【C++】猜成语

    题目描述 成语是中国汉字语言词汇中一部分定型的词组或短句 是中国传统文化的一大特色 有固定的结构形式和固定的说法 表示一定的意义 在语句中是作为一个整体来应用的 承担主语 宾语 定语等成分 请设计猜四字成语游戏 功能需求 1 设计一个四字成
  • Python学习笔记(六)————列表遍历

    目录 1 列表的遍历 while循环 2 列表的遍历 for循环 3 while循环和for循环的对比 1 列表的遍历 while循环 2 列表的遍历 for循环 3 while循环和for循环的对比 在循环控制上 while 循环 可以自
  • 使用 docker 容器化 Go-Gin 应用程序!

    文章目录 介绍 先决条件 构建 Gin 框架应用程序 创建 Dockerfile 构建 Docker 镜像 运行 Docker 容器 结论 使用 docker 容器化 Go Gin 应用程序 马赫什瓦尔 利加德的照片 马赫什瓦尔 利加德 1
  • html生成jsessionid,H5 APP 使用 JSESSIONID 保持会话登录

    前言 这段时间给电影网站加了收费在线观看的权限 由于之前的 APP 没有添加登录模块 所以现在必须得添加上了 APP 基于 H5 MUI 开发 在实现的过程中真的是碰得焦头烂额的 过程 H5 开发 APP 无非就是利用 WebView 操作
  • 论文学习 Deep Adversarial Subspace Clustering

    deep adversarial subspace clustering 前提知识 1 子空间聚类 2 LRR 低秩表示 摘要 introduction Method 原理 生成器 操作步骤 G的损失函数 判别器 参数的学习 损失函数 de
  • MediaSource 缓存

    wfs js MediaSource 缓存 window mediasource
  • 线程池的优点

    线程池的优点 普通线程 线程池 普通线程 通常我们使用new Thread 新建线程 但是这样新建的线程会缺乏统一管理 会导致线程之前存在竞争 从而过多占用系统的资源导致效率变低 而且这种线程功能比较单一 相较于线程池无法定时 定期执行线程
  • 操作系统内存管理

    内存管理 一级目录 二级目录 三级目录 虚拟内存空间 分段内存 段选择符 段描述符 分页内存 逻辑地址 虚拟地址和线性地址的关系 分页管理 概念 页式管理 说明 步骤 硬件高速缓存 内存管理的结构体 总览 struct page struc
  • 7.26作业

    百钱买百鸡 include
  • Leetcode 53最大子序和

    最大子序和 给定一个整数数组 nums 找到一个具有最大和的连续子数组 子数组最少包含一个元素 返回其最大和 示例 输入 2 1 3 4 1 2 1 5 4 输出 6 解释 连续子数组 4 1 2 1 的和最大 为 6 进阶 如果你已经实现
  • Backtrader 获得上个交易日的日期

    在策略类backtrader Strategy中使用 self datetime date 1 即可得到上个交易日 但是不能得到下个交易日的日期 因为目前还没有循环过这个时间
  • filebrowser文件管理系统详细使用说明

    1 所有可用参数 a address string 要侦听的地址 默认值为 127 0 0 1 b baseurl string 基础url cache dir string 文件缓存目录 如果为空则禁用 t cert string tls
  • Ubuntu双系统启动时卡死解决办法

    ubuntu双系统启动时卡死解决办法 在ubuntu16 04和18 04测试无误 问题描述 在安装完ubuntu双系统后 第一次启动ubuntu系统时 卡死在启动界面 或者黑屏 这大概都是由于显卡驱动的原因 具体不在这里阐述 通过以下方法
  • flink-cdc 实现oracle 实时同步到kudu

    其实网上也有很多相关话题的代码实现 但是发现有很多坑 在 腾讯官方文档中 有介绍 但是屏蔽了很多细节 我做了以下四点修改才能正常运行 1 前置条件 保证oracle中相关表开启了归档日志和补充日志 因为flink cdc基于debezium