Flink1.13.0 + Hudi 0.11.1 + Hive2.1.1 + presto0.273.3 + yanagishima 18.0

2023-11-12

摘要:
      flink1.13.0 整合 Hudi 0.11.1 ,通过FlinkSQL程序、FlinkSQL命令行对Hudi的MOR及COW进行批量写、流式写、流式读取、批量读取。通过flink sql cdc 、flink sql kafka 、flink stream kafka 探究流式数据入Hudi案例。
      探究Hudi和Hive的整合,怎么通过hive查询管理Hudi表。第1种是在hive 创建Hudi 外部表,手动加分区的形式;第2种是通过FlinkSQL程序和FlinkSQL命令行在写Hudi的时候就同步Hive;第3种是使用Hudi自带的工具run_sync_tool.sh进行同步。
      探究hive数仓的数据怎么入到Hudi。
      使用presto客户端查询Hudi数据,使用DBeaver 连接Presto查询Hudi数据。
      使用yanagishima 界面化查询Hudi数据。

组件版本
flink 1.13.0
scala 2.11
hudi 0.11.1
hive 2.1.1
cdh 6.3.2

1.FlinkSQL程序入Hudi

1.1 flink-cdc 入 hudi

1.1.1 hive手动建表,手动同步分区

1.1.1.1 mysql cdc 入 hudi cow 表
1.1.1.1.1 在mysql 创建表,并初始化插入几条数据
use test;
-- ----------------------------
-- Table structure for users
-- ----------------------------
DROP TABLE IF EXISTS `users`;
CREATE TABLE `users` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(20) DEFAULT NULL,
  `birthday` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of users
-- ----------------------------
INSERT INTO `users` VALUES ('1', 'hello scala', '2022-06-08 17:30:03', '2022-06-08 17:30:03');
INSERT INTO `users` VALUES ('2', 'bb', '2022-09-09 11:29:42', '2022-09-09 11:29:45');
INSERT INTO `users` VALUES ('3', 'hello cc', '2022-06-09 11:14:38', '2022-06-09 11:14:41');
INSERT INTO `users` VALUES ('4', 'aa', '2022-06-10 15:29:57', '2022-06-10 15:30:01');
INSERT INTO `users` VALUES ('5', '55', '2022-09-09 11:38:30', '2022-09-09 11:38:34');
INSERT INTO `users` VALUES ('6', ' hello java', '2022-06-08 17:37:24', '2022-06-09 11:15:41');
INSERT INTO `users` VALUES ('10', 'hello hh', '2022-06-09 11:16:25', '2022-06-09 11:16:25');
INSERT INTO `users` VALUES ('11', 'eee', '2022-06-21 12:02:10', '2022-06-21 12:02:10');
1.1.1.1.2 编写程序

新建maven项目flink-hudi
pom文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>work.jiang</groupId>
    <artifactId>flink_hudi</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <scala.version>2.11</scala.version>
        <flink.version>1.13.0</flink.version>
        <hudi.version>0.11.1</hudi.version>
    </properties>

    <dependencies>
        <!-- table start -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.8.3-10.0</version>
            <!--            <scope>provided</scope>-->
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.commons</groupId>
                    <artifactId>commons-compress</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
            <exclusions>
                <exclusion>
                    <groupId>commons-io</groupId>
                    <artifactId>commons-io</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.lz4</groupId>
                    <artifactId>lz4-java</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.2.0</version>
        </dependency>

        <!-- hudi start -->
        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-flink1.13-bundle_2.11</artifactId>
            <version>${hudi.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-hadoop-mr-bundle</artifactId>
            <version>${hudi.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-hive-2.2.0_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet_${scala.version}</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.10.2</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.10.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

将hive、hdfs的配置文件拷贝到项目
在这里插入图片描述
新建类 work.jiang.hudi.cdc.cow.FlinkcdcToHudiCOWTable

package work.jiang.hudi.cdc.cow;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * 参考连接:https://blog.csdn.net/Aaron_ch/article/details/124147649
 * Hudi --COW表 mysql 每修改一条记录,hudi就新增加一个parquet文件
 * 在hive查询的时候,查的是最新的那个parquet文件(是可以查到最新数据的,是写放大,会生成很多parquet文件),也是需要在hive手动增加分区
 */
public class FlinkcdcToHudiCOWTable {
    public static void main(String[] args) throws Exception {
        System.setProperty("HADOOP_USER_NAME", "hdfs");

        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

        bsEnv.setParallelism(1);
        // 每 1000ms 开始一次 checkpoint
        bsEnv.enableCheckpointing(1000);

        bsEnv.setParallelism(1);

        StreamTableEnvironment env = StreamTableEnvironment.create(bsEnv, bsSettings);

        // 1. 新建Flink MySQL CDC表
        env.executeSql("CREATE TABLE mysql_users (\n" +
                "                             id BIGINT PRIMARY KEY NOT ENFORCED ,\n" +
                "                             name STRING,\n" +
                "                             birthday TIMESTAMP(3),\n" +
                "                             ts TIMESTAMP(3)\n" +
                ") WITH (\n" +
                "      'connector' = 'mysql-cdc',\n" +
                "      'hostname' = '172.16.43.159',\n" +
                "      'port' = '3306',\n" +
                "      'username' = 'root',\n" +
                "      'password' = '*',\n" +
                "      'server-time-zone' = 'Asia/Shanghai',\n" +
                "      'database-name' = 'test',\n" +
                "      'table-name' = 'users'\n" +
               // "      'scan.incremental.snapshot.enabled' = 'false'\n" +
                "      )");

        // 2. 新建Flink Hudi表
        env.executeSql("CREATE TABLE hudi_users23 (\n" +
                "    id BIGINT PRIMARY KEY NOT ENFORCED,\n" +
                "    name STRING,\n" +
                "    birthday TIMESTAMP(3),\n" +
                "    ts TIMESTAMP(3),\n" +
                "    `partition` VARCHAR(20)\n" +
                ") PARTITIONED BY (`partition`) WITH (\n" +
                "    'connector' = 'hudi',\n" +
                "    'table.type' = 'COPY_ON_WRITE',\n" +
                "    'path' = 'hdfs://cdh01:8022/user/hudi/hudi_users23',\n" +
                "    'write.insert.drop.duplicates' = 'true',\n" +
                "    'write.tasks' = '1' \n" +
                ")");

        //3.mysql-cdc 写入hudi ,会提交有一个flink任务
        env.executeSql("INSERT INTO hudi_users23 SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') FROM mysql_users");

        env.executeSql("select * from hudi_users23").print();
    }
}
1.1.1.1.3 运行程序,查看hdfs目录

运行程序,此时在hdfs目录 /user/hudi/ 下新生成了hudi_users23表目录
在这里插入图片描述
在这里插入图片描述

1.1.1.1.4 在hive 创建hudi 外部表
hive> add jar /home/bonc/hudi-hive-sync-bundle-0.11.1.jar;
hive> add jar /home/bonc/hudi-hadoop-mr-bundle-0.11.1.jar;
hive> use hudi_db;
hive> CREATE EXTERNAL TABLE `hudi_users_23`(
     `_hoodie_commit_time` string,
     `_hoodie_commit_seqno` string,
     `_hoodie_record_key` string,
     `_hoodie_partition_path` string,
     `_hoodie_file_name` string,
     `id` bigint,
     `name` string,
     `birthday` bigint,
     `ts` bigint)
   PARTITIONED BY (
     `partition` string)
   ROW FORMAT SERDE
     'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
   STORED AS INPUTFORMAT
     'org.apache.hudi.hadoop.HoodieParquetInputFormat'
   OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
   LOCATION
     'hdfs://cdh01:8022/user/hudi/hudi_users23';

需要手动添加分区

hive> alter table hudi_users_23 add if not exists partition(`partition`='20220608') location 'hdfs://cdh01:8022/user/hudi/hudi_users23/20220608';
hive> alter table hudi_users_23 add if not exists partition(`partition`='20220609') location 'hdfs://cdh01:8022/user/hudi/hudi_users23/20220609';

查询hudi_users_23

hive> select * from hudi_users_23;

查询结果,只能查到已经手动添加的分区
在这里插入图片描述
在mysql 新插入一条分区为20220608的数据,修改一条分区为20220608的数据,删除一条分区为20220608的数据,在hive里查询,看数据有没有更新

mysql> INSERT INTO `users` VALUES ('12', 'eee', '2022-06-08 12:02:10', '2022-06-08 12:02:10');
hive> select * from hudi_users_23;

在这里插入图片描述

mysql> update users set name = 'aa'  where id = 12;
hive> select * from hudi_users_23;

在这里插入图片描述

mysql> delete from users where id = 12;
hive> select * from hudi_users_23;

在这里插入图片描述
结论:在mysql新插入、更新、删除已手动添加的分区的数据,会自动地向hive同步数据,在hive会实时更新数据

1.1.1.1 mysql cdc 入 hudi mor 表
1.1.1.1.1 在mysql 创建表,并初始化几条数据

同1.1.1.1.1

1.1.1.1.2 编写程序

创建类 work.jiang.hudi.cdc.mor.FlinkcdcToHudiMORTable

package work.jiang.hudi.cdc.mor;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * mysql cdc 入 hudi,在hive创建外部表进行查询
 * 参考连接:https://blog.51cto.com/u_15458633/4829214
 * 问题点:需要在hive手动创建外部表、手动创建分区
 * 结论:
 *  hudi -- MOR表 会生成.hoodie(提交信息)、分区文件
 *  分区文件里边有.hoodie_partition_metadata(元数据)、log(增量数据)、parquet(历史数据)文件
 *  hudi -- COW表 会生成.hoodie、分区文件
 *  分区文件里边有.hoodie_partition_metadata(元数据)、parquet(历史数据)文件,每更新一次会重新生成一个parquet文件
 *  hive --- COW表(HoodieParquetInputFormat视图)只会读取到一开始程序运行的数据(历史数据,即parquet文件数据)
 *  hive --- MOR表(HoodieParquetRealtimeInputFormat视图)会读取实时更新的数据(即parquet + log 文件数据)
 */
public class FlinkcdcToHudiMORTable {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

        env.setParallelism(1);
        // 每 1000ms 开始一次 checkpoint,默认5次checkpoint才会生成parquet
        // 重新运行程序得时候,会新生成一个parquet,这个时候是重新读取mysql表的全量数据
        env.enableCheckpointing(5 * 1000);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);

       //1.创建 mysql cdc,流式读取
        tableEnv.executeSql("CREATE TABLE mysql_users (\n" +
                "                             id BIGINT PRIMARY KEY NOT ENFORCED ,\n" +
                "                             name STRING,\n" +
                "                             birthday TIMESTAMP(3),\n" +
                "                             ts TIMESTAMP(3)\n" +
                ") WITH (\n" +
                "      'connector' = 'mysql-cdc',\n" +
                "      'hostname' = '172.16.43.159',\n" +
                "      'port' = '3306',\n" +
                "      'username' = 'root',\n" +
                "      'password' = '*',\n" +
                "      'server-time-zone' = 'Asia/Shanghai',\n" +
                "      'database-name' = 'test',\n" +
                "      'table-name' = 'users'\n" +
                "      )");

        //2.创建hudi 表
        tableEnv.executeSql("CREATE TABLE hudi_users3 (\n" +
                "    id BIGINT PRIMARY KEY NOT ENFORCED,\n" +
                "    name STRING,\n" +
                "    birthday TIMESTAMP(3),\n" +
                "    ts TIMESTAMP(3),\n" +
                "    `partition` VARCHAR(20)\n" +
                ") PARTITIONED BY (`partition`) WITH (\n" +
                "    'connector' = 'hudi',\n" +
                "    'table.type' = 'MERGE_ON_READ',\n" +
                "    'path' = 'hdfs://cdh01:8022/user/hudi/hudi_users3',\n" +
                "    'read.streaming.enabled' = 'true',\n" +
                "    'read.streaming.check-interval' = '1', \n" +
                "    'write.tasks' = '1' \n" +
                ")");

        //3.mysql-cdc 写入hudi ,会提交有一个flink任务
        tableEnv.executeSql("INSERT INTO hudi_users2 SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') FROM mysql_users");

        tableEnv.executeSql("select * from hudi_users2").print();
    }
}
1.1.1.1.3 运行程序,查看hdfs目录

在这里插入图片描述
在这里插入图片描述
这里没有生成parquet文件,默认是5个commit才生会去压缩,可以修改下参数

tableEnv.executeSql("CREATE TABLE hudi_users3 (\n" +
                "    id BIGINT PRIMARY KEY NOT ENFORCED,\n" +
                "    name STRING,\n" +
                "    birthday TIMESTAMP(3),\n" +
                "    ts TIMESTAMP(3),\n" +
                "    `partition` VARCHAR(20)\n" +
                ") PARTITIONED BY (`partition`) WITH (\n" +
                "   'connector' = 'hudi',\n" +
                "   'table.type' = 'MERGE_ON_READ',\n" +
                "   'path' = 'hdfs://cdh01:8022/user/hudi/hudi_users3',\n" +
                "   'compaction.tasks'= '1', \n" +
                "   'compaction.async.enabled'= 'true',\n" +
                "   'compaction.trigger.strategy'= 'num_commits',\n" +
                "   'compaction.delta_commits'= '1',\n" +
                "   'changelog.enabled'= 'true',\n" +
                "   'read.streaming.enabled' = 'true',\n" +
                "   'read.streaming.check-interval' = '1', \n" +
                "   'write.tasks' = '1' \n" +
                ")");

再运行程序,此时就有生成的压缩文件了
在这里插入图片描述
注意:没有parquet压缩文件,在hive将无法查询出数据

1.1.1.1.4 在hive 创建外部表
1.1.1.1.4.1COW表
hive> CREATE EXTERNAL TABLE `hudi_users_3`(
     `_hoodie_commit_time` string,
     `_hoodie_commit_seqno` string,
     `_hoodie_record_key` string,
     `_hoodie_partition_path` string,
     `_hoodie_file_name` string,
    `id` bigint,
     `name` string,
     `birthday` bigint,
    `ts` bigint)
   PARTITIONED BY (
     `partition` string)
   ROW FORMAT SERDE
    'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
   STORED AS INPUTFORMAT
     'org.apache.hudi.hadoop.HoodieParquetInputFormat'
   OUTPUTFORMAT
     'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
   LOCATION
     'hdfs://cdh01:8022/hudi/hudi_users3';
 
hive> alter table hudi_users_3 add if not exists partition(`partition`='20220608') location 'hdfs://cdh01:8022/user/hudi/hudi_users3/20220608';
hive> alter table hudi_users_3 add if not exists partition(`partition`='20220609') location 'hdfs://cdh01:8022/user/hudi/hudi_users3/20220609';
 
hive> select * from hudi_users_3 where `partition`=20220608;
hive> select * from hudi_users_3 where `partition`=20220609;

查询结果,只能查到已经手动添加的分区,且只能查询parquet文件,无法查询到最新写到log里边的数据
在这里插入图片描述

1.1.2.1.4.2 MOR表
hive> CREATE EXTERNAL TABLE `hudi_users_3_mor`(
     `_hoodie_commit_time` string,
     `_hoodie_commit_seqno` string,
     `_hoodie_record_key` string,
     `_hoodie_partition_path` string,
     `_hoodie_file_name` string,
     `id` bigint,
     `name` string,
     `birthday` bigint,
     `ts` bigint)
   PARTITIONED BY (
     `partition` string)
  ROW FORMAT SERDE
     'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
  STORED AS INPUTFORMAT
    'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'
  OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
  LOCATION
    'hdfs://cdh01:8022/user/hudi/hudi_users3';
 
hive> alter table hudi_users_3_mor add if not exists partition(`partition`='20220608') location 'hdfs://cdh01:8022/user/hudi/hudi_users3/20220608';
hive> alter table hudi_users_3_mor add if not exists partition(`partition`='20220609') location 'hdfs://cdh01:8022/user/hudi/hudi_users3/20220609';

hive> select * from hudi_users_3_mor where `partition`=20220608;
hive> select * from hudi_users_3_mor where `partition`=20220609;

查询结果,只能查到已经手动添加的分区,可以查询历史的parquet + log文件,即可以实时查询数据
在这里插入图片描述

在mysql 新插入一条分区为20220608的数据,修改一条分区为20220608的数据,删除一条分区为20220608的数据,在hive里查询,看数据有没有更新

mysql> INSERT INTO `users` VALUES ('12', 'fff', '2022-06-08 12:02:10', '2022-06-08 12:02:10');
hive> select * from hudi_users_3;
hive> select * from hudi_users_3_mor;
```![在这里插入图片描述](https://img-blog.csdnimg.cn/5421baa10dff43c083941a7a4d6c283a.png)


```sql
mysql> update users set name = 'hhh'  where id = 12;
hive> select * from hudi_users_3;
hive> select * from hudi_users_mor;
mysql> delete from users where id = 12;
hive> select * from hudi_users_3;
hive> select * from hudi_users_mor;

结论:在mysql新插入、更新、删除已手动添加的分区的数据,会自动地向hive同步数据,在hive会实时更新数据

cow表会向hive同步parquet文件的数据
mor表会向hive同步parquet + log文件的数据
因为我设置了
‘compaction.trigger.strategy’= ‘num_commits’
‘compaction.delta_commits’= ‘1’ --每几个数据一压缩
所以在cow也能看到实时变化的数据,默认是5次commit一压缩,如果不设置,cow表是看不到最新数据的变化的

1.1.2 flinksql自动建hive表,自动同步分区

1.1.2.1 mysql cdc 入 hudi cow表
1.1.2.1.1 在mysql创建表,并初始化插入几条数据
mysql> DROP TABLE IF EXISTS `stu4`;
mysql> CREATE TABLE `stu4` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `name` varchar(20) NOT NULL COMMENT '学生名字',
  `school` varchar(20) NOT NULL COMMENT '学校名字',
  `nickname` varchar(20) NOT NULL COMMENT '学生小名',
  `age` int(11) NOT NULL COMMENT '学生年龄',
  `score` decimal(4,2) NOT NULL COMMENT '成绩',
  `class_num` int(11) NOT NULL COMMENT '班级人数',
  `phone` bigint(20) NOT NULL COMMENT '电话号码',
  `email` varchar(64) DEFAULT NULL COMMENT '家庭网络邮箱',
  `ip` varchar(32) DEFAULT NULL COMMENT 'IP地址',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of stu4
-- ----------------------------
mysql> INSERT INTO `stu4` VALUES ('1', 'aa', 'Aschool', 'aa', '18', '70.00', '1', '12345', 'sdss@163.com', '12.12.12.12');
mysql> INSERT INTO `stu4` VALUES ('2', 'bb', 'Bschool', 'bb', '17', '70.00', '1', '123', '343', '1233');
mysql> INSERT INTO `stu4` VALUES ('4', 'cc', 'Cschool', 'cc', '19', '70.00', '1', '123', '123', '123');
1.1.2.1.2 编写程序

新建work.jiang.hudi.cdc.cow.FlinkcdcToHudiCOWTableAndSyncToHive

package work.jiang.hudi.cdc.cow;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkcdcToHudiCOWTableAndSyncToHive {
    public static void main(String[] args) throws Exception {
       // System.setProperty("HADOOP_USER_NAME", "hdfs");
        Configuration conf = new Configuration();
        conf.setString(RestOptions.BIND_PORT, "8081-8099");
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        //1.获取表的执行环境
        //StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        //StreamExecutionEnvironment senv = StreamExecutionEnvironment.createLocalEnvironment();
        //并行度设置为1
        senv.setParallelism(1);
        //由于增量将数据写入到Hudi表,所以需要启动Flink Checkpoint 检查点
        senv.enableCheckpointing(2 * 1000);
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()//blink 版本的流处理环境
                .inStreamingMode()//设置流式模式
                .build();
        StreamTableEnvironment tbEnv = StreamTableEnvironment.create(senv, settings);

        // 1. 新建Flink MySQL CDC表
        tbEnv.executeSql("create table stu4(\n" +
                "  id bigint not null,\n" +
                "  name string,\n" +
                "  school string,\n" +
                "  nickname string,\n" +
                "  age int not null,\n" +
                "  score decimal(4,2) not null,\n" +
                "  class_num int not null,\n" +
                "  phone bigint not null,\n" +
                "  email string,\n" +
                "  ip string,\n" +
                "  PRIMARY KEY (id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "      'connector' = 'mysql-cdc',\n" +
                "      'hostname' = '172.16.43.159',\n" +
                "      'port' = '3306',\n" +
                "      'username' = 'root',\n" +
                "      'password' = '*',\n" +
                "      'server-time-zone' = 'Asia/Shanghai',\n" +
                "      'database-name' = 'test',\n" +
                "      'table-name' = 'stu4'\n" +
                "      )");

        // 2. 新建Flink Hudi表
        tbEnv.executeSql("create table stu4_tmp_1(\n" +
                "  id bigint not null,\n" +
                "  name string,\n" +
                "  `school` string,\n" +
                "  nickname string,\n" +
                "  age int not null,\n" +
                " score decimal(4,2) not null,\n" +
                "  class_num int not null,\n" +
                "  phone bigint not null,\n" +
                "  email string,\n" +
                "  ip string,\n" +
                "  primary key (id) not enforced\n" +
                ")\n" +
                " partitioned by (`school`)\n" +
                " with (\n" +
                "  'connector' = 'hudi',\n" +
                "  'path' = 'hdfs://cdh01:8022/user/hudi/stu4_tmp_cow',\n" +
                "  'table.type' = 'COPY_ON_WRITE',\n" +
                "  'write.precombine.field' = 'school',\n" +
                "  'write.tasks'= '1',\n" +
                "  'write.rate.limit'= '2000', \n" +
                "  'compaction.tasks'= '1', \n" +
                "  'compaction.async.enabled'= 'true',\n" +
                "  'compaction.trigger.strategy'= 'num_commits',\n" +
                "  'compaction.delta_commits'= '1',\n" +
                "  'changelog.enabled'= 'true',\n" +
                "  'hive_sync.enable' = 'true',\n" +
                "  'hive_sync.mode' = 'hms',\n" +
                "  'hive_sync.metastore.uris' = 'thrift://cdh01:9083',\n" +
                "  'hive_sync.jdbc_url' = 'jdbc:hive2://cdh01:10000',\n" +
                "  'hive_sync.table' = 'stu4_tmp_1',\n" +
                "  'hive_sync.db' = 'hudi_db',\n" +
                "  'hive_sync.username' = 'hive',\n" +
                "  'hive_sync.password' = 'hive'\n" +
                ")");

        //3.mysql-cdc 写入hudi ,会提交有一个flink任务
        tbEnv.executeSql("insert into stu4_tmp_1 select * from stu4");
        tbEnv.executeSql("select * from stu4_tmp_1").print();
    }
}

1.1.2.1.3 运行程序查看hdfs目录

在这里插入图片描述
在这里插入图片描述

1.1.2.1.4 查询hive
hive> show tables;
hive> select * from stu4_tmp_1; 

查询结果:在hive自动创建了一个表,名字和’hive_sync.table’ = ‘stu4_tmp_1’ 配置的一致,能够查询出来表的所有数据,实现了自动创建表,自动同步分区与数据
在这里插入图片描述
在mysql 增删改数据,查看hive数据是否更新

mysql> INSERT INTO `stu4` VALUES ('5', 'aa', 'Aschool', 'aa', '18', '70.00', '1', '12345', 'sdss@163.com', '12.12.12.12');
mysql> DELETE from `stu4` where id = '1';
mysql> update `stu4` set name = 'hhh'  where id = 2;
hive> select * from stu4_tmp_1;

查询结果:hive表中的数据也实时更新了
在这里插入图片描述

1.1.2.2 mysql cdc 入 hudi mor表
1.1.2.2.1 在mysql创建表,并初始化插入几条数据

同1.1.1.1.1

1.1.2.2.2 编写程序

新建work.jiang.hudi.cdc.mor.FlinkcdcToHudiMORTableAndSyncToHive

package work.jiang.hudi.cdc.mor;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * 已测通
 * flinksql 数据写入 hudi,并自动同步创建hive分区表+自动同步数据
 * 参考连接:https://blog.csdn.net/wudonglianga/article/details/122954751
 * https://blog.csdn.net/wzp1986/article/details/125894473
 * https://blog.csdn.net/xiaoyixiao_/article/details/125239236
 * https://blog.csdn.net/yy8623977/article/details/123673316
 * 测试结果:会在hive里边创建两个表,hudi_users2_ro、hudi_users2_rt
 * ro表示COW表,只会读取parquet格式的文件;rt表示MOR表,会读取parquet + log文件
 * 如果需要查最新数据,需要查询MOR表
 */
public class FlinkcdcToHudiMORTableAndSyncToHive {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString(RestOptions.BIND_PORT, "8081-8099");
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        //1.获取表的执行环境
        //StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        //StreamExecutionEnvironment senv = StreamExecutionEnvironment.createLocalEnvironment();
        //并行度设置为1
        senv.setParallelism(1);
        //由于增量将数据写入到Hudi表,所以需要启动Flink Checkpoint 检查点
        senv.enableCheckpointing(10 * 10000);
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()//blink 版本的流处理环境
                .inStreamingMode()//设置流式模式
                .build();
        StreamTableEnvironment tbEnv = StreamTableEnvironment.create(senv, settings);

        //1.创建 mysql cdc,流式读取
        tbEnv.executeSql("CREATE TABLE mysql_users (\n" +
                "                             id BIGINT PRIMARY KEY NOT ENFORCED ,\n" +
                "                             name STRING,\n" +
                "                             birthday TIMESTAMP(3),\n" +
                "                             ts TIMESTAMP(3)\n" +
                ") WITH (\n" +
                "      'connector' = 'mysql-cdc',\n" +
                "      'hostname' = '172.16.43.159',\n" +
                "      'port' = '3306',\n" +
                "      'username' = 'root',\n" +
                "      'password' = '*',\n" +
                "      'server-time-zone' = 'Asia/Shanghai',\n" +
                "      'database-name' = 'test',\n" +
                "      'table-name' = 'users'\n" +
                "      )");

        tbEnv.executeSql("CREATE TABLE hudi_users2(\n" +
                "    id BIGINT PRIMARY KEY NOT ENFORCED,\n" +
                "    name STRING,\n" +
                "    birthday TIMESTAMP(3),\n" +
                "    ts TIMESTAMP(3),\n" +
                "    `partition` VARCHAR(20)\n" +
                ") PARTITIONED BY (`partition`) WITH (\n" +
                "'connector'='hudi',\n" +
                "'path'= 'hdfs://cdh01:8022/user/hudi/hudi_users2', \n" +
                "'table.type'= 'MERGE_ON_READ',\n" +
                "'hoodie.datasource.write.recordkey.field'= 'id', \n" +
                "'write.precombine.field'= 'ts',\n" +
                "'write.tasks'= '1',\n" +
                "'write.rate.limit'= '2000', \n" +
                "'compaction.tasks'= '1', \n" +
                "'compaction.async.enabled'= 'true',\n" +
                "'compaction.trigger.strategy'= 'num_commits',\n" +
                "'compaction.delta_commits'= '1',\n" +
                "'changelog.enabled'= 'true',\n" +
                "'read.streaming.enabled'= 'true',\n" +
                "'read.streaming.check-interval'= '3',\n" +
                "'hive_sync.enable'= 'true',\n" +
                "'hive_sync.mode'= 'hms',\n" +
                "'hive_sync.metastore.uris'= 'thrift://cdh01:9083',\n" +
                "'hive_sync.jdbc_url'= 'jdbc:hive2://cdh01:10000',\n" +
                "'hive_sync.table'= 'hudi_users2',\n" +
                "'hive_sync.db'= 'hudi_db',\n" +
                "'hive_sync.username'= 'hive',\n" +
                "'hive_sync.password'= 'hive',\n" +
                "'hive_sync.support_timestamp'= 'true'\n" +
                ")");
        
        //3.mysql-cdc 写入hudi ,会提交有一个flink任务
        tbEnv.executeSql("INSERT INTO hudi_users2 SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') FROM mysql_users");
        
        tbEnv.executeSql("select * from hudi_users2").print();
    }
}
1.1.2.2.3 运行程序查看hdfs目录

在这里插入图片描述
在这里插入图片描述

1.1.2.2.4 查询hive

自动生成了ro、rt两个表
在这里插入图片描述

hive> select * from hudi_users2_ro;
hive> select * from hudi_users2_rt;

查询结果,ro表可以查询parquet文件的数据,rt表可以查询parquet + log数据

查看两者的表结构,可以看到两者的INPUTFORMAT 解析类不一样
,ro是’org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat’
rt是’org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat’

hive> show create table hudi_users2_ro;
OK
CREATE EXTERNAL TABLE `hudi_users2_ro`(
  `_hoodie_commit_time` string COMMENT '', 
  `_hoodie_commit_seqno` string COMMENT '', 
  `_hoodie_record_key` string COMMENT '', 
  `_hoodie_partition_path` string COMMENT '', 
  `_hoodie_file_name` string COMMENT '', 
  `_hoodie_operation` string COMMENT '', 
  `id` bigint COMMENT '', 
  `name` string COMMENT '', 
  `birthday` bigint COMMENT '', 
  `ts` bigint COMMENT '')
PARTITIONED BY ( 
  `partition` string COMMENT '')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
WITH SERDEPROPERTIES ( 
  'hoodie.query.as.ro.table'='true', 
  'path'='hdfs://cdh01:8022/user/hudi/hudi_users2') 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://cdh01:8022/user/hudi/hudi_users2'
TBLPROPERTIES (
  'last_commit_time_sync'='20220921150431744', 
  'spark.sql.sources.provider'='hudi', 
  'spark.sql.sources.schema.numPartCols'='1', 
  'spark.sql.sources.schema.numParts'='1', 
  'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_operation","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"birthday","type":"timestamp","nullable":true,"metadata":{}},{"name":"ts","type":"timestamp","nullable":true,"metadata":{}},{"name":"partition","type":"string","nullable":true,"metadata":{}}]}', 
  'spark.sql.sources.schema.partCol.0'='partition', 
  'transient_lastDdlTime'='1662694156')
Time taken: 0.4 seconds, Fetched: 32 row(s)
hive> show create table hudi_users2_rt;
OK
CREATE EXTERNAL TABLE `hudi_users2_rt`(
  `_hoodie_commit_time` string COMMENT '', 
  `_hoodie_commit_seqno` string COMMENT '', 
  `_hoodie_record_key` string COMMENT '', 
  `_hoodie_partition_path` string COMMENT '', 
  `_hoodie_file_name` string COMMENT '', 
  `_hoodie_operation` string COMMENT '', 
  `id` bigint COMMENT '', 
  `name` string COMMENT '', 
  `birthday` bigint COMMENT '', 
  `ts` bigint COMMENT '')
PARTITIONED BY ( 
  `partition` string COMMENT '')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
WITH SERDEPROPERTIES ( 
  'hoodie.query.as.ro.table'='false', 
  'path'='hdfs://cdh01:8022/user/hudi/hudi_users2') 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://cdh01:8022/user/hudi/hudi_users2'
TBLPROPERTIES (
  'last_commit_time_sync'='20220921150431744', 
  'spark.sql.sources.provider'='hudi', 
  'spark.sql.sources.schema.numPartCols'='1', 
  'spark.sql.sources.schema.numParts'='1', 
  'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_operation","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"birthday","type":"timestamp","nullable":true,"metadata":{}},{"name":"ts","type":"timestamp","nullable":true,"metadata":{}},{"name":"partition","type":"string","nullable":true,"metadata":{}}]}', 
  'spark.sql.sources.schema.partCol.0'='partition', 
  'transient_lastDdlTime'='1662694157')
Time taken: 0.267 seconds, Fetched: 32 row(s)

在mysql进行增、删、改,数据也会自动同步到hudi,在hive里,ro表依旧是能够查询更新的parquet文件,rt表是能够查询更新的parquet + log文件

在mysql修改表结构,新增列,不会影响原来的程序
修改原有的列名,hudi原来的程序,有一列的结果会是null
删除原有的列,hudi原来的程序,有一列的结果会是null

1.2 flink-kafka 入 hudi 并自动同步hive

1.2.1 flink-kafka sql 入 hudi cow

1.2.1.1 创建kafka topic
 cd /opt/cloudera/parcels/CDH/bin
 kafka-topics --zookeeper cdh01:2181,cdh02:2181,cdh03:2181 -create --replication-factor 2 --partitions 1 --topic hudi_table_user
1.2.1.2 编写程序

新建work.jiang.hudi.kafka.cow.FlinkKafkaToHudiCOWTableAndSyncToHive

package work.jiang.hudi.kafka.cow;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import work.jiang.hudi.pojo.Hudi_users2;
import work.jiang.hudi.util.KafkaUtil;

public class FlinkKafkaToHudiCOWTableAndSyncToHive {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString(RestOptions.BIND_PORT, "8081-8099");
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        //1.获取表的执行环境
        //StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        //StreamExecutionEnvironment senv = StreamExecutionEnvironment.createLocalEnvironment();
        //并行度设置为1
        senv.setParallelism(1);
        //由于增量将数据写入到Hudi表,所以需要启动Flink Checkpoint 检查点
        senv.enableCheckpointing(5 * 1000);
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()//blink 版本的流处理环境
                .inStreamingMode()//设置流式模式
                .build();
        StreamTableEnvironment tbEnv = StreamTableEnvironment.create(senv, settings);

        kafka_sql_to_hudi(tbEnv);

        //kafka_stream_to_hudi(senv,tbEnv);

        senv.execute();
    }


    /**
     * 测试目的:通过 flink sql 接收kafka(kafka只能发json) 入 hudi
     *
     * cd /opt/cloudera/parcels/CDH/bin
     * kafka-topics --zookeeper cdh01:2181,cdh02:2181,cdh03:2181 -create --replication-factor 2 --partitions 1 --topic hudi_table_user
     * kafka-console-producer --broker-list cdh01:9092,cdh02:9092,cdh03:9092 --topic hudi_table_user
     * {"pid": 1, "name":"aa", "birthday": "2022-09-08", "ts": "2022-09-08 00:00:01", "create_day": "2022-09-08"}
     * {"pid": 2, "name":"aa", "birthday": "2022-09-08", "ts": "2022-09-08 00:00:01", "create_day": "2022-09-08"}
     * {"pid": 3, "name":"aa", "birthday": "2022-09-08", "ts": "2022-09-08 00:00:01", "create_day": "2022-09-08"}
      * 测试结果:会在hdfs创建表目录,在hive生成一个表,并且自动同步分区与数据
     */
    public static  void kafka_sql_to_hudi(StreamTableEnvironment tableEnv) throws Exception{
        tableEnv.executeSql("CREATE TABLE users_kafka_sql_tmp (\n" +
                "   pid STRING,\n" +
                "   name STRING,\n" +
                "   birthday STRING,\n" +
                "   ts STRING,\n" +
                "   create_day STRING\n" +
                ") WITH (\n" +
                " 'connector' = 'kafka',\n" +
                " 'topic' = 'hudi_table_user',\n" +
                " 'properties.bootstrap.servers' = 'cdh01:9092,cdh02:9092,cdh03:9092',\n" +
                " 'properties.group.id' = 'hudi_table_test',\n" +
                " 'scan.startup.mode' = 'latest-offset',\n" +
                " 'format' = 'json',\n" +
                " 'json.fail-on-missing-field' = 'false',\n" +
                " 'json.ignore-parse-errors' = 'true'\n" +
                ")");

        //tableEnv.executeSql("select * from users_kafka_sql_tmp").print();

        tableEnv.executeSql("CREATE TABLE czs_hive_kafka_cow(\n" +
                "pid string primary key not enforced,\n" +
                "name string,\n" +
                "birthday string,\n" +
                "ts string,\n" +
                "create_day string \n" +
                ")\n" +
                "PARTITIONED BY (create_day)\n" +
                "with(\n" +
                "'connector'='hudi',\n" +
                "'path'= 'hdfs://cdh01:8020/user/hudi/czs_hive_kafka_cow', \n" +
                "'table.type'= 'COPY_ON_WRITE',\n" +
                "'write.operation' = 'upsert',\n" +
                "'hoodie.datasource.write.recordkey.field'= 'pid', \n" +
                "'write.precombine.field'= 'ts',\n" +
                "'write.tasks'= '1',\n" +
                "'write.rate.limit'= '2000', \n" +
                "'compaction.tasks'= '1', \n" +
                "'compaction.async.enabled'= 'true',\n" +
                "'compaction.trigger.strategy'= 'num_commits',\n" +
                "'compaction.delta_commits'= '1',\n" +
                "'compaction.delta_seconds' = '120',\n" +
                "'changelog.enabled'= 'true',\n" +
               // "'read.streaming.enabled'= 'true',\n" +
                //"'read.streaming.check-interval'= '3',\n" +
                "'hive_sync.enable'= 'true',\n" +
                "'hive_sync.mode'= 'hms',\n" +
                "'hive_sync.metastore.uris'= 'thrift://cdh01:9083',\n" +
                "'hive_sync.jdbc_url'= 'jdbc:hive2://cdh01:10000',\n" +
                "'hive_sync.table'= 'czs_hive_kafka_cow',\n" +
                "'hive_sync.db'= 'hudi_db',\n" +
                "'hive_sync.username'= 'hive',\n" +
                "'hive_sync.password'= 'hive',\n" +
                "'hive_sync.support_timestamp'= 'true'\n" +
                ")");

        tableEnv.executeSql("insert into czs_hive_kafka_cow select * from users_kafka_sql_tmp");

        tableEnv.executeSql("select * from czs_hive_kafka_cow").print();
    }
1.2.1.3 运行程序,向kafka发送数据
> kafka-console-producer --broker-list cdh01:9092,cdh02:9092,cdh03:9092 --topic hudi_table_user
> {"pid": 1, "name":"aa", "birthday": "2022-09-08", "ts": "2022-09-08 00:00:01", "create_day": "2022-09-08"}
> {"pid": 2, "name":"aa", "birthday": "2022-09-08", "ts": "2022-09-08 00:00:01", "create_day": "2022-09-08"}
> {"pid": 3, "name":"aa", "birthday": "2022-09-08", "ts": "2022-09-08 00:00:01", "create_day": "2022-09-08"}
1.2.1.4 查询hdfs

在这里插入图片描述

1.2.1.5 查询hive

hive有自动建表,建立一个表,kafka发送数据会实时更新到hudi
在这里插入图片描述

1.2.2 flink-kafka stream 入 hudi cow

1.2.2.1 创建kafka topic

同1.2.1.1

1.2.2.2 编写程序

在work.jiang.hudi.kafka.cow.FlinkKafkaToHudiCOWTableAndSyncToHive类添加方法kafka_stream_to_hudi

public class FlinkKafkaToHudiCOWTableAndSyncToHive {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString(RestOptions.BIND_PORT, "8081-8099");
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        //1.获取表的执行环境
        //StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        //StreamExecutionEnvironment senv = StreamExecutionEnvironment.createLocalEnvironment();
        //并行度设置为1
        senv.setParallelism(1);
        //由于增量将数据写入到Hudi表,所以需要启动Flink Checkpoint 检查点
        senv.enableCheckpointing(5 * 1000);
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()//blink 版本的流处理环境
                .inStreamingMode()//设置流式模式
                .build();
        StreamTableEnvironment tbEnv = StreamTableEnvironment.create(senv, settings);

        //kafka_sql_to_hudi(tbEnv);

        kafka_stream_to_hudi(senv,tbEnv);

        senv.execute();
    }
/**
     * 测试目的:通过 flink stream 接收kafka,把流转成table,把 table 入 hudi
     *
     * cd /opt/cloudera/parcels/CDH/bin
     * kafka-topics --zookeeper cdh01:2181,cdh02:2181,cdh03:2181 -create --replication-factor 2 --partitions 1 --topic hudi_table_user
     * kafka-console-producer --broker-list cdh01:9092,cdh02:9092,cdh03:9092 --topic hudi_table_user
     * {"pid": 1, "name":"aa", "birthday": "2022-09-08", "ts": "2022-09-08 00:00:01", "create_day": "2022-09-08"}
     * {"pid": 2, "name":"aa", "birthday": "2022-09-08", "ts": "2022-09-08 00:00:01", "create_day": "2022-09-08"}
     * {"pid": 3, "name":"aa", "birthday": "2022-09-08", "ts": "2022-09-08 00:00:01", "create_day": "2022-09-08"}
     * 测试结果:会在hdfs创建表目录,在hive生成一个表,并且自动同步分区与数据
     */
    public static  void kafka_stream_to_hudi(StreamExecutionEnvironment senv,StreamTableEnvironment tableEnv) throws Exception{
        DataStreamSource<String> hudi_table_user_kafka = senv.addSource(
                new FlinkKafkaConsumer<String>(
                        "hudi_table_user",
                        new SimpleStringSchema(),
                        KafkaUtil.buildKafkaProps("cdh01:9092,cdh02:9092,cdh03:9092",
                                "hudi_table_test")));

        SingleOutputStreamOperator<Hudi_users2> hudi_table_user_stream = hudi_table_user_kafka.map(new MapFunction<String, Hudi_users2>() {
            @Override
            public Hudi_users2 map(String value) throws Exception {
                JSONObject jsonObject = JSONObject.parseObject(value);
                String pid = jsonObject.getString("pid");
                String name = jsonObject.getString("name");
                String birthday = jsonObject.getString("birthday");
                String ts = jsonObject.getString("ts");
                String create_day = jsonObject.getString("create_day");
               return  new Hudi_users2(pid, name,birthday, ts, create_day);
            }
        });

        DataStream<Hudi_users2> hudi_table_user_ = hudi_table_user_stream.rebalance();

        Table user_table = tableEnv.fromDataStream(hudi_table_user_);

        tableEnv.createTemporaryView("user_kafka_stream_tmp",user_table);

        // 这里执行print了,就无法执行下边的语句了,需要注释掉
       // tbEnv.executeSql("select * from user_kafka_stream_tmp").print();

        tableEnv.executeSql("CREATE TABLE czs_hive_kafka_stream_cow(\n" +
                "pid string primary key not enforced,\n" +
                "name string,\n" +
                "birthday string,\n" +
                "ts string,\n" +
                "create_day string \n" +
                ")\n" +
                "PARTITIONED BY (create_day)\n" +
                "with(\n" +
                "'connector'='hudi',\n" +
                "'path'= 'hdfs://cdh01:8020/user/hudi/czs_hive_kafka_stream_cow', \n" +
                "'table.type'= 'COPY_ON_WRITE',\n" +
                "'write.operation' = 'upsert',\n" +
                "'hoodie.datasource.write.recordkey.field'= 'pid', \n" +
                "'write.precombine.field'= 'ts',\n" +
                "'write.tasks'= '1',\n" +
                "'write.rate.limit'= '2000', \n" +
                "'compaction.tasks'= '1', \n" +
                "'compaction.async.enabled'= 'true',\n" +
                "'compaction.trigger.strategy'= 'num_commits',\n" +
                "'compaction.delta_commits'= '1',\n" +
                "'compaction.delta_seconds' = '120',\n" +
                "'changelog.enabled'= 'true',\n" +
               // "'read.streaming.enabled'= 'true',\n" +
                //"'read.streaming.check-interval'= '3',\n" +
                "'hive_sync.enable'= 'true',\n" +
                "'hive_sync.mode'= 'hms',\n" +
                "'hive_sync.metastore.uris'= 'thrift://cdh01:9083',\n" +
                "'hive_sync.jdbc_url'= 'jdbc:hive2://cdh01:10000',\n" +
                "'hive_sync.table'= 'czs_hive_kafka_stream_cow',\n" +
                "'hive_sync.db'= 'hudi_db',\n" +
                "'hive_sync.username'= 'hive',\n" +
                "'hive_sync.password'= 'hive',\n" +
                "'hive_sync.support_timestamp'= 'true'\n" +
                ")");

        tableEnv.executeSql("insert into czs_hive_kafka_stream_cow select * from user_kafka_stream_tmp");

        tableEnv.executeSql("select * from czs_hive_kafka_stream_cow").print();
    }
  }

新建work.jiang.hudi.util.KafkaUtil

package work.jiang.hudi.util;

import java.util.Properties;

public class KafkaUtil {

    /**
     * 构建kafka配置
     * @return
     */
    public static Properties buildKafkaProps(String kafkaBroker,String kafkaGroupId ) {

        Properties props =new Properties();
        props.setProperty("bootstrap.servers", kafkaBroker);
        props.setProperty("group.id", kafkaGroupId);
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("auto.offset.reset", "earliest");
        props.setProperty("session.timeout.ms", "300000");
        props.setProperty("fetch.message.max.bytes", "10000000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return props;
    }
}
1.2.2.3 运行程序,向kafka发送数据

同1.2.1.3

1.2.2.4 查询hdfs

没有发数据前,表目录下只有.hoodie文件
在这里插入图片描述
发数据之后,才会生成分区文件
在这里插入图片描述
在kafka发送相同主键的数据,会更新hudi历史的主键的那行数据

1.2.2.5 查询hive

会自动创建一个表,在表里能够查到更新的数据
在这里插入图片描述

1.2.3 flink-kafka sql 入 hudi mor

1.2.3.1 创建kafka topic

同1.2.1.1

1.2.3.2 编写程序

创建work.jiang.hudi.kafka.mor.FlinkKafkaToHudiMORTableAndSyncToHive

package work.jiang.hudi.kafka.mor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import work.jiang.hudi.pojo.Hudi_users2;
import work.jiang.hudi.util.KafkaUtil;

public class FlinkKafkaToHudiMORTableAndSyncToHive {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString(RestOptions.BIND_PORT, "8081-8099");
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        //1.获取表的执行环境
        //StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        //StreamExecutionEnvironment senv = StreamExecutionEnvironment.createLocalEnvironment();
        //并行度设置为1
        senv.setParallelism(1);
        //由于增量将数据写入到Hudi表,所以需要启动Flink Checkpoint 检查点
        senv.enableCheckpointing(5 * 1000);
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()//blink 版本的流处理环境
                .inStreamingMode()//设置流式模式
                .build();
        StreamTableEnvironment tbEnv = StreamTableEnvironment.create(senv, settings);

        kafka_sql_to_hudi(tbEnv);

        //kafka_stream_to_hudi(senv,tbEnv);

        senv.execute();
    }


    /**
     * 测试目的:通过 flink sql 接收kafka(kafka只能发json) 入 hudi
     *
     * cd /opt/cloudera/parcels/CDH/bin
     * kafka-topics --zookeeper cdh01:2181,cdh02:2181,cdh03:2181 -create --replication-factor 2 --partitions 1 --topic hudi_table_user
     * kafka-console-producer --broker-list cdh01:9092,cdh02:9092,cdh03:9092 --topic hudi_table_user
     * {"pid": 1, "name":"aa", "birthday": "2022-09-08", "ts": "2022-09-08 00:00:01", "create_day": "2022-09-08"}
     * {"pid": 2, "name":"aa", "birthday": "2022-09-08", "ts": "2022-09-08 00:00:01", "create_day": "2022-09-08"}
     * {"pid": 3, "name":"aa", "birthday": "2022-09-08", "ts": "2022-09-08 00:00:01", "create_day": "2022-09-08"}
      * 测试结果:会在hdfs创建表目录,在hive生成2个表,并且自动同步分区与数据
     */
    public static  void kafka_sql_to_hudi(StreamTableEnvironment tableEnv) throws Exception{
        tableEnv.executeSql("CREATE TABLE users_kafka_sql_tmp (\n" +
                "   pid STRING,\n" +
                "   name STRING,\n" +
                "   birthday STRING,\n" +
                "   ts STRING,\n" +
                "   create_day STRING\n" +
                ") WITH (\n" +
                " 'connector' = 'kafka',\n" +
                " 'topic' = 'hudi_table_user',\n" +
                " 'properties.bootstrap.servers' = 'cdh01:9092,cdh02:9092,cdh03:9092',\n" +
                " 'properties.group.id' = 'hudi_table_test',\n" +
                " 'scan.startup.mode' = 'latest-offset',\n" +
                " 'format' = 'json',\n" +
                " 'json.fail-on-missing-field' = 'false',\n" +
                " 'json.ignore-parse-errors' = 'true'\n" +
                ")");

        //tableEnv.executeSql("select * from users_kafka_sql_tmp").print();

        tableEnv.executeSql("CREATE TABLE czs_hive_kafka(\n" +
                "pid string primary key not enforced,\n" +
                "name string,\n" +
                "birthday string,\n" +
                "ts string,\n" +
                "create_day string \n" +
                ")\n" +
                "PARTITIONED BY (create_day)\n" +
                "with(\n" +
                "'connector'='hudi',\n" +
                "'path'= 'hdfs://cdh01:8020/user/hudi/czs_hive_kafka', \n" +
                "'table.type'= 'MERGE_ON_READ',\n" +
                "'write.operation' = 'upsert',\n" +
                "'hoodie.datasource.write.recordkey.field'= 'pid', \n" +
                "'write.precombine.field'= 'ts',\n" +
                "'write.tasks'= '1',\n" +
                "'write.rate.limit'= '2000', \n" +
                "'compaction.tasks'= '1', \n" +
                "'compaction.async.enabled'= 'true',\n" +
                "'compaction.trigger.strategy'= 'num_commits',\n" +
                "'compaction.delta_commits'= '1',\n" +
                "'compaction.delta_seconds' = '120',\n" +
                "'changelog.enabled'= 'true',\n" +
                "'read.streaming.enabled'= 'true',\n" +
                "'read.streaming.check-interval'= '3',\n" +
                "'hive_sync.enable'= 'true',\n" +
                "'hive_sync.mode'= 'hms',\n" +
                "'hive_sync.metastore.uris'= 'thrift://cdh01:9083',\n" +
                "'hive_sync.jdbc_url'= 'jdbc:hive2://cdh01:10000',\n" +
                "'hive_sync.table'= 'czs_hive_kafka',\n" +
                "'hive_sync.db'= 'hudi_db',\n" +
                "'hive_sync.username'= 'hive',\n" +
                "'hive_sync.password'= 'hive',\n" +
                "'hive_sync.support_timestamp'= 'true'\n" +
                ")");

        tableEnv.executeSql("insert into czs_hive_kafka select * from users_kafka_sql_tmp");

        tableEnv.executeSql("select * from czs_hive_kafka").print();
    }
}
1.2.3.3 运行程序,向kafka发送数据

同1.2.1.3

1.2.3.4 查询hdfs

没有数据之前,表目录下之后.hoodie文件
在这里插入图片描述
有数据之后,才会有分区文件
在这里插入图片描述

1.2.3.5 查询hive

在hive会自动生成两个表,分别是ro、rt表
在这里插入图片描述
都能查询出数据
在这里插入图片描述

1.2.4 flink-kafka stream 入 hudi mor

1.2.4.1 创建kafka topic

同1.2.1.1

1.2.4.2 编写程序

在work.jiang.hudi.kafka.mor.FlinkKafkaToHudiMORTableAndSyncToHive 添加方法kafka_stream_to_hudi

public class FlinkKafkaToHudiMORTableAndSyncToHive {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString(RestOptions.BIND_PORT, "8081-8099");
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        //1.获取表的执行环境
        //StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        //StreamExecutionEnvironment senv = StreamExecutionEnvironment.createLocalEnvironment();
        //并行度设置为1
        senv.setParallelism(1);
        //由于增量将数据写入到Hudi表,所以需要启动Flink Checkpoint 检查点
        senv.enableCheckpointing(5 * 1000);
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()//blink 版本的流处理环境
                .inStreamingMode()//设置流式模式
                .build();
        StreamTableEnvironment tbEnv = StreamTableEnvironment.create(senv, settings);

        //kafka_sql_to_hudi(tbEnv);

        kafka_stream_to_hudi(senv,tbEnv);

        senv.execute();
    }
/**
     * 测试目的:通过 flink stream 接收kafka,把流转成table,把 table 入 hudi
     *
     * cd /opt/cloudera/parcels/CDH/bin
     * kafka-topics --zookeeper cdh01:2181,cdh02:2181,cdh03:2181 -create --replication-factor 2 --partitions 1 --topic hudi_table_user
     * kafka-console-producer --broker-list cdh01:9092,cdh02:9092,cdh03:9092 --topic hudi_table_user
     * {"pid": 1, "name":"aa", "birthday": "2022-09-08", "ts": "2022-09-08 00:00:01", "create_day": "2022-09-08"}
     * {"pid": 2, "name":"aa", "birthday": "2022-09-08", "ts": "2022-09-08 00:00:01", "create_day": "2022-09-08"}
     * {"pid": 3, "name":"aa", "birthday": "2022-09-08", "ts": "2022-09-08 00:00:01", "create_day": "2022-09-08"}
     * 测试结果:会在hdfs创建表目录,在hive生成2个表,并且自动同步分区与数据
     */
    public static  void kafka_stream_to_hudi(StreamExecutionEnvironment senv,StreamTableEnvironment tableEnv) throws Exception{
        DataStreamSource<String> hudi_table_user_kafka = senv.addSource(
                new FlinkKafkaConsumer<String>(
                        "hudi_table_user",
                        new SimpleStringSchema(),
                        KafkaUtil.buildKafkaProps("cdh01:9092,cdh02:9092,cdh03:9092",
                                "hudi_table_test")));

        SingleOutputStreamOperator<Hudi_users2> hudi_table_user_stream = hudi_table_user_kafka.map(new MapFunction<String, Hudi_users2>() {
            @Override
            public Hudi_users2 map(String value) throws Exception {
                JSONObject jsonObject = JSONObject.parseObject(value);
                String pid = jsonObject.getString("pid");
                String name = jsonObject.getString("name");
                String birthday = jsonObject.getString("birthday");
                String ts = jsonObject.getString("ts");
                String create_day = jsonObject.getString("create_day");
               return  new Hudi_users2(pid, name,birthday, ts, create_day);
            }
        });

        DataStream<Hudi_users2> hudi_table_user_ = hudi_table_user_stream.rebalance();

        Table user_table = tableEnv.fromDataStream(hudi_table_user_);

        tableEnv.createTemporaryView("user_kafka_stream_tmp",user_table);

        // 这里执行print了,就无法执行下边的语句了,需要注释掉
       // tbEnv.executeSql("select * from user_kafka_stream_tmp").print();

        tableEnv.executeSql("CREATE TABLE czs_hive_kafka_stream(\n" +
                "pid string primary key not enforced,\n" +
                "name string,\n" +
                "birthday string,\n" +
                "ts string,\n" +
                "create_day string \n" +
                ")\n" +
                "PARTITIONED BY (create_day)\n" +
                "with(\n" +
                "'connector'='hudi',\n" +
                "'path'= 'hdfs://cdh01:8020/user/hudi/czs_hive_kafka_stream', \n" +
                "'table.type'= 'MERGE_ON_READ',\n" +
                "'write.operation' = 'upsert',\n" +
                "'hoodie.datasource.write.recordkey.field'= 'pid', \n" +
                "'write.precombine.field'= 'ts',\n" +
                "'write.tasks'= '1',\n" +
                "'write.rate.limit'= '2000', \n" +
                "'compaction.tasks'= '1', \n" +
                "'compaction.async.enabled'= 'true',\n" +
                "'compaction.trigger.strategy'= 'num_commits',\n" +
                "'compaction.delta_commits'= '1',\n" +
                "'compaction.delta_seconds' = '120',\n" +
                "'changelog.enabled'= 'true',\n" +
                "'read.streaming.enabled'= 'true',\n" +
                "'read.streaming.check-interval'= '3',\n" +
                "'hive_sync.enable'= 'true',\n" +
                "'hive_sync.mode'= 'hms',\n" +
                "'hive_sync.metastore.uris'= 'thrift://cdh01:9083',\n" +
                "'hive_sync.jdbc_url'= 'jdbc:hive2://cdh01:10000',\n" +
                "'hive_sync.table'= 'czs_hive_kafka_stream',\n" +
                "'hive_sync.db'= 'hudi_db',\n" +
                "'hive_sync.username'= 'hive',\n" +
                "'hive_sync.password'= 'hive',\n" +
                "'hive_sync.support_timestamp'= 'true'\n" +
                ")");

        tableEnv.executeSql("insert into czs_hive_kafka_stream select * from user_kafka_stream_tmp");

        tableEnv.executeSql("select * from czs_hive_kafka_stream").print();
    }
}
1.2.4.3 运行程序,向kafka发送数据

同1.2.1.3

1.2.4.4 查询hdfs

没有数据之前,表目录下只有.hoodie文件
在这里插入图片描述
有数据之后,表目录下面才有分区文件
在这里插入图片描述

1.2.4.5 查询hive

有自动生成两个表,分别是ro、rt表
在这里插入图片描述

hive> select * from czs_hive_kafka_stream_ro;
hive> select * from czs_hive_kafka_stream_rt;

两个表都能查询出数据
在这里插入图片描述

1.3 直接insert into hudi

这种方式是批的方式,hudi 的 insert 语句只能提交一次,建表之后,如果没有insert 操作,hdfs目录下没有生成表目录
cow 表,如果insert 数据之后,会生成parquet文件,在hive可以查到
mor表,如果insert 数据之后,会生成log文件,一直没有生成parquet文件

1.3.1 cow表

新建work.jiang.hudi.fieldtosql.FieldTosql_cow

package work.jiang.hudi.fieldtosql;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FieldTosql_cow {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString(RestOptions.BIND_PORT, "8081-8099");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()//设置流式模式
                .build();
        env.setParallelism(1);
        env.enableCheckpointing(5 *1000);
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);
        System.out.println("tenv====================================" + tenv);
        tenv.executeSql("CREATE TABLE hudi_users_fields2_cow(\n" +
                    "    id BIGINT PRIMARY KEY NOT ENFORCED,\n" +
                    "    name STRING,\n" +
                    "    birthday TIMESTAMP(3),\n" +
                    "    ts TIMESTAMP(3),\n" +
                    "    `partition` VARCHAR(20)\n" +
                    ") PARTITIONED BY (`partition`) WITH (\n" +
                    "'connector'='hudi',\n" +
                    "'path'= 'hdfs://cdh01:8022/user/hudi/hudi_users_fields2_cow', \n" +
                    "'table.type'= 'COPY_ON_WRITE',\n" +
                    "'write.operation' = 'upsert',\n" +
                    "'hoodie.datasource.write.recordkey.field'= 'id', \n" +
                    "'write.precombine.field'= 'ts',\n" +
                    "'write.tasks'= '1',\n" +
                    "'write.rate.limit'= '2000', \n" +
                    "'compaction.tasks'= '1', \n" +
                    "'write.bucket_assign.tasks'= '1', \n" +
                    "'compaction.async.enabled'= 'true',\n" +
                    "'compaction.trigger.strategy'= 'num_commits',\n" +
                    "'compaction.delta_commits'= '3',\n" +
                    "'changelog.enabled'= 'true',\n" +
                    //"'read.streaming.enabled'= 'true',\n" +
                   // "'read.streaming.check-interval'= '10',\n" +
                    "'read.tasks'= '1',\n" +
                    "'hive_sync.enable'= 'true',\n" +
                    "'hive_sync.mode'= 'hms',\n" +
                    "'hive_sync.metastore.uris'= 'thrift://cdh01:9083',\n" +
                    "'hive_sync.jdbc_url'= 'jdbc:hive2://cdh01:10000',\n" +
                    "'hive_sync.table'= 'hudi_users_fields2_cow',\n" +
                    "'hive_sync.db'= 'hudi_db',\n" +
                    "'hive_sync.username'= 'hive',\n" +
                    "'hive_sync.password'= 'hive',\n" +
                    "'hive_sync.support_timestamp'= 'true'\n" +
                    ")");

        String sql1 = "insert into hudi_users_fields2_cow values" +
                "(0,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:00','1970-01-01')," +
                "(1,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:01','1970-01-01')," +
                "(2,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:02','1970-01-01')," +
                "(3,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:03','1970-01-01')," +
                "(4,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:04','1970-01-01')," +
                "(5,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:05','1970-01-01')," +
                "(6,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:06','1970-01-01')," +
                "(7,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:07','1970-01-01')," +
                "(8,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:08','1970-01-01')," +
                "(9,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:09','1970-01-01')," +
                "(1,'bb',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:10','1970-01-01')," +
                "(11,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:11','1970-01-01')," +
                "(12,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:11','1970-01-01')," +
                "(13,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:11','1970-01-01')," +
                "(14,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:11','1970-01-01')," +
                "(15,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:11','1970-01-01')," +
                "(16,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:11','1970-01-01')," +
                "(17,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:11','1970-01-01')," +
                "(18,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:11','1970-01-01')," +
                "(19,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:11','1970-01-01')," +
                "(20,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:11','1970-01-01')," +
                "(21,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:12','1970-01-01')" ;

        tenv.executeSql(sql1);

        //测试结果:每重新运行一次main方法,会新生成一个parquet的文件,会在hive自动建表,自动同步数据

//        不支持重复提交
//        for (int i = 100; i < 105; i++) {
//            String name = "aa_" + i;
//            String birthday = "2020-01-01 12:12:12";
//            String ts = new Timestamp(System.currentTimeMillis()).toString().substring(0, 19);
//            String partition = "2020-01-01";
//            tenv.executeSql("insert into hudi_users_fields2_cow values( " + i + ", \'" + name + "\', TIMESTAMP \'" + birthday + "\', TIMESTAMP \'" + ts + "\', \'" + partition + "\')");
            tenv.executeSql("select * from hudi_users_fields2_cow").print(); // 这一句会导致程序直接执行提交操作,不会再去扫描for循环
//        }

    }
}

运行结果,如果没有insert 语句,hdfs目录下不会生成表目录。如果有insert 语句,会生成表目录和分区文件,在hive会自动建表,且自动同步数据到hive,每运行一次main方法重新生成一个parquet文件,是典型的批量插入
在这里插入图片描述
在这里插入图片描述

1.3.2 mor表

新建work.jiang.hudi.fieldtosql.FieldTosql_mor

package work.jiang.hudi.fieldtosql;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FieldTosql_mor {
    public static void main(String[] args) throws Exception {
        //System.setProperty("HADOOP_USER_NAME", "hdfs");
        //使用本地模式并开启WebUI
        Configuration conf = new Configuration();
        conf.setString(RestOptions.BIND_PORT, "8081-8099");
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        //1.获取表的执行环境
        //并行度设置为1
        senv.setParallelism(1);
        //由于增量将数据写入到Hudi表,所以需要启动Flink Checkpoint 检查点
        senv.enableCheckpointing(5 * 1000);
        //senv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        //senv.setStateBackend(new FsStateBackend("hdfs://cdh01:8022/tmp/hudi_statebackend/hudi_users_fields2/cp"));

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()//blink 版本的流处理环境
                .inStreamingMode()//设置流式模式
                .build();

        StreamTableEnvironment tbEnv = StreamTableEnvironment.create(senv, settings);

        field_sql_to_hudi(tbEnv);
    }

    /**
     * 测试目的:验证把得到的数据,通过insert into 形式插入 hudi MOR 表,会不会生成parquet文件
     * <p>
     * 测试结果:只有log文件,没有parquet文件,hive会创建表,但是在 hive 将无法查询出数据
     * 如果没有插入语句,hdfs将不会创建表目录,hive也不会自动建表
     */
    public static void field_sql_to_hudi(StreamTableEnvironment tbEnv) throws Exception {
        tbEnv.executeSql("CREATE TABLE hudi_users_fields2(\n" +
                    "    id BIGINT PRIMARY KEY NOT ENFORCED,\n" +
                    "    name STRING,\n" +
                    "    birthday TIMESTAMP(3),\n" +
                    "    ts TIMESTAMP(3),\n" +
                    "    `partition` VARCHAR(20)\n" +
                    ") PARTITIONED BY (`partition`) WITH (\n" +
                    "'connector'='hudi',\n" +
                    "'path'= 'hdfs://cdh01:8022/user/hudi/hudi_users_fields2', \n" +
                    "'table.type'= 'MERGE_ON_READ',\n" +
                    "'hoodie.datasource.write.recordkey.field'= 'id', \n" +
                    "'write.precombine.field'= 'ts',\n" +
                    "'write.tasks'= '1',\n" +
                    "'write.rate.limit'= '2000', \n" +
                    "'compaction.tasks'= '1', \n" +
                    "'write.bucket_assign.tasks'= '1', \n" +
                    "'compaction.async.enabled'= 'true',\n" +
                    "'compaction.trigger.strategy'= 'num_commits',\n" +
                    "'compaction.delta_commits'= '1',\n" +
                    "'changelog.enabled'= 'true',\n" +
                    "'read.streaming.enabled'= 'true',\n" +
                    "'read.streaming.check-interval'= '10',\n" +
                    "'read.tasks'= '1',\n" +
                    "'hive_sync.enable'= 'true',\n" +
                    "'hive_sync.mode'= 'hms',\n" +
                    "'hive_sync.metastore.uris'= 'thrift://cdh01:9083',\n" +
                    "'hive_sync.jdbc_url'= 'jdbc:hive2://cdh01:10000',\n" +
                    "'hive_sync.table'= 'hudi_users_fields2',\n" +
                    "'hive_sync.db'= 'hudi_db',\n" +
                    "'hive_sync.username'= 'hive',\n" +
                    "'hive_sync.password'= 'hive',\n" +
                    "'hive_sync.support_timestamp'= 'true'\n" +
                    ")");


//        long id = 3;
//        String name = "cc";
//        String birthday = "2020-01-01 12:12:12";
//        String ts = "2022-09-13 12:12:12";
//        String partition = "2020-01-01";

//        String sql = "insert into hudi_users_fields2 values( "+ id + ", \'"+ name +"\', TIMESTAMP \'" + birthday +"\', TIMESTAMP \'" + ts +"\', \'" + partition +"\')," +
//                "( "+ "2" + ", \'"+ name +"\', TIMESTAMP \'" + birthday +"\', TIMESTAMP \'" + "2022-09-13 12:13:13" +"\', \'" + partition +"\')";

        String sql1 = "insert into hudi_users_fields2 values" +
                "(0,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:00','1970-01-01')," +
                "(1,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:01','1970-01-01')," +
                "(2,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:02','1970-01-01')," +
                "(3,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:03','1970-01-01')," +
                "(4,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:04','1970-01-01')," +
                "(5,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:05','1970-01-01')," +
                "(6,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:06','1970-01-01')," +
                "(7,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:07','1970-01-01')," +
                "(8,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:08','1970-01-01')," +
                "(9,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:09','1970-01-01')," +
                "(1,'bb',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:10','1970-01-01')," +
                "(11,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:11','1970-01-01')," +
                "(12,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:11','1970-01-01')," +
                "(13,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:11','1970-01-01')," +
                "(14,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:11','1970-01-01')," +
                "(15,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:11','1970-01-01')," +
                "(16,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:11','1970-01-01')," +
                "(17,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:11','1970-01-01')," +
                "(18,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:11','1970-01-01')," +
                "(19,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:11','1970-01-01')," +
                "(20,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:11','1970-01-01')," +
                "(21,'aa',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:12','1970-01-01')" ;

        tbEnv.executeSql(sql1);

        //测试结果:每重新运行一次main方法,会新生成一个log文件,没有生成parquet文件,会在hive自动建表,hive查不到数据

//            不支持重复提交
//        for (int i = 1; i < 5; i++) {
//            String name = "aa_" + i;
//            String birthday = "2020-01-01 12:12:12";
//            String ts = new Timestamp(System.currentTimeMillis()).toString().substring(0, 19);
//            String partition = "2020-01-01";
//            String sql = "insert into hudi_users_fields2 values( " + i + ", \'" + name + "\', TIMESTAMP \'" + birthday + "\', TIMESTAMP \'" + ts + "\', \'" + partition + "\')";
//            tbEnv.executeSql(sql);
//        }

//        tbEnv.executeSql("select * from hudi_users_fields2").print();
    }
}

测试结果,如果没有insert 语句,hdfs目录下不会生成表目录。如果有insert 语句,hdfs目录下会生成表目录和分区目录,分区目录下会生成log文件,不会生成parquet文件,在hive中有自动创建ro、rt表,但是由于分区下没有parquet文件,在hive将不会查询出数据
在这里插入图片描述
在这里插入图片描述

1.4 把一个pojo 转化为 table 入 hudi

这种方式是批的方式,hudi 的 insert 语句只能提交一次,建表之后,如果没有insert 操作,hdfs目录下没有生成表目录
cow 表,如果insert 数据之后,会生成parquet文件,在hive可以查到
mor表,如果insert 数据之后,会生成log文件,一直没有生成parquet文件

1.4.1 cow表

新建work.jiang.hudi.fieldtosql.PojoTosql_cow

package work.jiang.hudi.fieldtosql;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import work.jiang.hudi.pojo.Hudi_users2;

import static org.apache.flink.table.api.Expressions.$;

public class PojoTosql_cow {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

        env.setParallelism(4);

        env.enableCheckpointing(1000 * 1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);

        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

        tableEnv.executeSql("CREATE TABLE czs_hive_pojo_cow(\n" +
                "pid string primary key not enforced,\n" +
                "name string,\n" +
                "birthday string,\n" +
                "ts string,\n" +
                "create_day string \n" +
                ")\n" +
                "PARTITIONED BY (create_day)\n" +
                "with(\n" +
                "'connector'='hudi',\n" +
                "'path'= 'hdfs://cdh01:8020/user/hudi/czs_hive_pojo_cow', \n" +
                "'table.type'= 'COPY_ON_WRITE',\n" +
                "'write.operation' = 'upsert',\n" +
                "'hoodie.datasource.write.recordkey.field'= 'pid', \n" +
                "'write.precombine.field'= 'ts',\n" +
                "'write.tasks'= '1',\n" +
                "'write.rate.limit'= '2000', \n" +
                "'compaction.tasks'= '1', \n" +
                "'compaction.async.enabled'= 'true',\n" +
                "'compaction.trigger.strategy'= 'num_commits',\n" +
                "'compaction.delta_commits'= '1',\n" +
                "'changelog.enabled'= 'true',\n" +
               // "'read.streaming.enabled'= 'true',\n" +
               // "'read.streaming.check-interval'= '3',\n" +
                "'hive_sync.enable'= 'true',\n" +
                "'hive_sync.mode'= 'hms',\n" +
                "'hive_sync.metastore.uris'= 'thrift://cdh01:9083',\n" +
                "'hive_sync.jdbc_url'= 'jdbc:hive2://cdh01:10000',\n" +
                "'hive_sync.table'= 'czs_hive_pojo_cow',\n" +
                "'hive_sync.db'= 'hudi_db',\n" +
                "'hive_sync.username'= 'hive',\n" +
                "'hive_sync.password'= 'hive',\n" +
                "'hive_sync.support_timestamp'= 'true'\n" +
                ")");

        // hudi表插入数据
        // 思路1:把一个对象转化成table,再创建临时视图,再把视图插入到hudi表,会生成parquet
        pojo_insert_hudi(env,tableEnv);

        // 思路2:不需要对象,把需要的字段值取到就行,通过sql语句把字段拼进去就行,这种方式比较简单,会生成parquet
        //field_insert_hudi(tableEnv);


        // hudi表更新数据
        // 更新的语法和插入是一样的,插入一条已经存在的id的数据,即为更新
        //tableEnv.executeSql("insert into czs_hive_pojo_cow values('2','dd','1970-02-02','1970-02-02 00:00:01','1970-02-02')");

        // 查找并更新
        //思路1:根据查询语句得出table,通过table自身api修改字段的值,把table为临时视图,将临时视图更新到hudi表,行不通,table没有相关api
        //思路2:根据查询语句得到table或TableResult,循环遍历出字段,把每一行的字段取到,使用insert into 更新到hudi表,已把TableResult测通
        //query_and_update_hudi(tableEnv);


        //删除数据,不支持

    }


    public static void pojo_insert_hudi(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv) throws Exception{
        String timestamp = String.valueOf(System.currentTimeMillis());

        //新建对象
        Hudi_users2 hudi_users2_obj = new Hudi_users2("1","aa","1995-10-08",timestamp,"2022-08-29");

        //转化成流
        DataStream<Hudi_users2> hudi_users2_stream  = env.fromElements(hudi_users2_obj);

        //转化成一个table
        Table hudi_users2_table  = tableEnv.fromDataStream(
                hudi_users2_stream,
                $("pid"),
                $("name"),
                $("birthday"),
                $("ts"),
                $("create_day")
        );

        //创建临时视图
        tableEnv.createTemporaryView("hudi_users2_tmp",hudi_users2_table);

        //tableEnv.executeSql("select * from hudi_users2_tmp").print();

        tableEnv.executeSql("insert into czs_hive_pojo_cow select * from hudi_users2_tmp");

        tableEnv.executeSql("select * from czs_hive_pojo_cow").print();

        env.execute("pojo to sql cow czs_hive_pojo_cow");
    }


    public static void field_insert_hudi(StreamTableEnvironment tableEnv){
        String pid = "12";
        String name = "bb";
        String birthday = "2022-09-20";
        String ts = "2022-09-20 12:12:12";
        String create_day = "2022-09-20";
        tableEnv.executeSql("insert into czs_hive_pojo_cow values(\'" +
                pid +"\',\'"+name +"\',\'"+birthday +"\',\'"+ ts + "\',\'"+ create_day +"\')");
        tableEnv.executeSql("select * from czs_hive_pojo_cow").print();
    }


    public static void query_and_update_hudi(StreamTableEnvironment tableEnv){
        //需要更新的字段
        String ts = "2022-09-20 11:11:11";

        //查询出的历史记录
        TableResult result = tableEnv.executeSql("select pid,name,birthday,ts,create_day from czs_hive_pojo_cow where pid = '1' ");

        CloseableIterator<Row> row = result.collect();

        //取历史数据,如果有多条数据,需要取时间最新的那条数据,且不是脏数据
        System.out.println("aaaaaaaaaaaaaaaaaaaaaaaaaaaa");
        if(row.hasNext()){
            System.out.println("bbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
            Row row_tmp =  row.next();
            String pid = row_tmp.getField("pid").toString();
            String name = row_tmp.getField("name").toString();
            String birthday = row_tmp.getField("birthday").toString();
            String create_day = row_tmp.getField("create_day").toString();
            System.out.println(row_tmp.toString());
            tableEnv.executeSql("insert into czs_hive_pojo_cow values(\'" +
                    pid +"\',\'"+name +"\',\'"+birthday +"\',\'"+ ts + "\',\'"+ create_day +"\')");
        }

        tableEnv.executeSql("select * from czs_hive_pojo_cow").print();
    }

}

新建work.jiang.hudi.pojo.Hudi_users2

package work.jiang.hudi.pojo;

public class Hudi_users2 {
    String pid;
    String name;
    String birthday;
    String ts;
    String create_day;

    public Hudi_users2() {
        super();
    }

    public Hudi_users2(String pid, String name, String birthday, String ts, String create_day){
        this.pid = pid;
        this.name = name;
        this.birthday = birthday;
        this.ts = ts;
        this.create_day = create_day;
    }

    public String getPid() {
        return pid;
    }

    public void setPid(String pid) {
        this.pid = pid;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getBirthday() {
        return birthday;
    }

    public void setBirthday(String birthday) {
        this.birthday = birthday;
    }

    public String getTs() {
        return ts;
    }

    public void setTs(String ts) {
        this.ts = ts;
    }

    public String getCreate_day() {
        return create_day;
    }

    public void setCreate_day(String create_day) {
        this.create_day = create_day;
    }

    @Override
    public String toString() {
        return "hudi_users2{" +
                "pid='" + pid + '\'' +
                ", name='" + name + '\'' +
                ", birthday='" + birthday + '\'' +
                ", ts='" + ts + '\'' +
                ", create_day=" + create_day +
                '}';
    }
}

测试结果同1.3.1
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

1.4.2 mor表

新建work.jiang.hudi.fieldtosql.PojoTosql_mor

package work.jiang.hudi.fieldtosql;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import work.jiang.hudi.pojo.Hudi_users2;

import static org.apache.flink.table.api.Expressions.$;

///https://blog.csdn.net/wzp1986/article/details/125894473
public class PojoTosql_mor {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

        env.setParallelism(4);

        env.enableCheckpointing(1000 * 1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);

        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

        tableEnv.executeSql("CREATE TABLE czs_hive_pojo_mor(\n" +
                "pid string primary key not enforced,\n" +
                "name string,\n" +
                "birthday string,\n" +
                "ts string,\n" +
                "create_day string \n" +
                ")\n" +
                "PARTITIONED BY (create_day)\n" +
                "with(\n" +
                "'connector'='hudi',\n" +
                "'path'= 'hdfs://cdh01:8020/user/hudi/czs_hive_pojo_mor', \n" +
                "'table.type'= 'MERGE_ON_READ',\n" +
                "'write.operation' = 'upsert',\n" +
                "'hoodie.datasource.write.recordkey.field'= 'pid', \n" +
                "'write.precombine.field'= 'ts',\n" +
                "'write.tasks'= '1',\n" +
                "'write.rate.limit'= '2000', \n" +
                "'compaction.tasks'= '1', \n" +
                "'compaction.async.enabled'= 'true',\n" +
                "'compaction.trigger.strategy'= 'num_commits',\n" +
                "'compaction.delta_commits'= '1',\n" +
                "'changelog.enabled'= 'true',\n" +
                "'read.streaming.enabled'= 'true',\n" +
                "'read.streaming.check-interval'= '3',\n" +
                "'hive_sync.enable'= 'true',\n" +
                "'hive_sync.mode'= 'hms',\n" +
                "'hive_sync.metastore.uris'= 'thrift://cdh01:9083',\n" +
                "'hive_sync.jdbc_url'= 'jdbc:hive2://cdh01:10000',\n" +
                "'hive_sync.table'= 'czs_hive_pojo_mor',\n" +
                "'hive_sync.db'= 'hudi_db',\n" +
                "'hive_sync.username'= 'hive',\n" +
                "'hive_sync.password'= 'hive',\n" +
                "'hive_sync.support_timestamp'= 'true'\n" +
                ")");

        // hudi表插入数据
        // 思路1:把一个对象转化成table,再创建临时视图,再把视图插入到hudi表,会生成日志文件,不会生成parquet
        pojo_insert_hudi(env,tableEnv);

        // 思路2:不需要对象,把需要的字段值取到就行,通过sql语句把字段拼进去就行,这种方式比较简单,,会生成日志文件,不会生成parquet
        //field_insert_hudi(tableEnv);


        // hudi表更新数据
        // 更新的语法和插入是一样的,插入一条已经存在的id的数据,即为更新
        //tableEnv.executeSql("insert into czs_hive_pojo_mor values('2','dd','1970-02-02','1970-02-02 00:00:01','1970-02-02')");

        // 查找并更新
        //思路1:根据查询语句得出table,通过table自身api修改字段的值,把table为临时视图,将临时视图更新到hudi表,行不通,table没有相关api
        //思路2:根据查询语句得到table或TableResult,循环遍历出字段,把每一行的字段取到,使用insert into 更新到hudi表,已把TableResult测通
        //query_and_update_hudi(tableEnv);


        //删除数据,不支持

    }


    public static void pojo_insert_hudi(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv) throws Exception{
        String timestamp = String.valueOf(System.currentTimeMillis());

        //新建对象
        Hudi_users2 hudi_users2_obj = new Hudi_users2("1","aa","1995-10-08",timestamp,"2022-08-29");

        //转化成流
        DataStream<Hudi_users2> hudi_users2_stream  = env.fromElements(hudi_users2_obj);

        //转化成一个table
        Table hudi_users2_table  = tableEnv.fromDataStream(
                hudi_users2_stream,
                $("pid"),
                $("name"),
                $("birthday"),
                $("ts"),
                $("create_day")
        );

        //创建临时视图
        tableEnv.createTemporaryView("hudi_users2_tmp",hudi_users2_table);

        //tableEnv.executeSql("select * from hudi_users2_tmp").print();

        tableEnv.executeSql("insert into czs_hive_pojo_mor select * from hudi_users2_tmp");

        tableEnv.executeSql("select * from czs_hive_pojo_mor").print();

        env.execute("pojo to sql mor czs_hive_pojo_mor");
    }


    public static void field_insert_hudi(StreamTableEnvironment tableEnv){
        String pid = "12";
        String name = "bb";
        String birthday = "2022-09-20";
        String ts = "2022-09-20 12:12:12";
        String create_day = "2022-09-20";
        tableEnv.executeSql("insert into czs_hive_pojo_mor values(\'" +
                pid +"\',\'"+name +"\',\'"+birthday +"\',\'"+ ts + "\',\'"+ create_day +"\')");
        tableEnv.executeSql("select * from czs_hive_pojo_mor").print();
    }


    public static void query_and_update_hudi(StreamTableEnvironment tableEnv){
        //需要更新的字段
        String ts = "2022-09-20 11:11:11";

        //查询出的历史记录
        TableResult result = tableEnv.executeSql("select pid,name,birthday,ts,create_day from czs_hive_pojo_mor where pid = '1' ");

        CloseableIterator<Row> row = result.collect();

        //取历史数据,如果有多条数据,需要取时间最新的那条数据,且不是脏数据
        System.out.println("aaaaaaaaaaaaaaaaaaaaaaaaaaaa");
        if(row.hasNext()){
            System.out.println("bbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
            Row row_tmp =  row.next();
            String pid = row_tmp.getField("pid").toString();
            String name = row_tmp.getField("name").toString();
            String birthday = row_tmp.getField("birthday").toString();
            String create_day = row_tmp.getField("create_day").toString();
            System.out.println(row_tmp.toString());
            tableEnv.executeSql("insert into czs_hive_pojo_mor values(\'" +
                    pid +"\',\'"+name +"\',\'"+birthday +"\',\'"+ ts + "\',\'"+ create_day +"\')");
        }

        tableEnv.executeSql("select * from czs_hive_pojo_mor").print();
    }
}

测试结果同1.3.2,如果没有insert,hdfs不会生成表目录。如果有insert,hdfs会生成表目录和分区文件,分区下边只有log日志,没有生成parquet文件,在hive里边有创建两个表,但是由于没有parquet文件,在hive将查询不到数据
在这里插入图片描述
在这里插入图片描述

1.5 hudi 的更新和删除

在flink cdc 、flink kafka 或 其他cdc format 入hudi中,hudi可以获取到上游数据的变化,去更新删除数据。
在flink sql 不支持将 "‘write.operation’ = ‘delete’ 的方式删除数据,也不支持delete from语句
在flink sql 不支持update 语句,它的更新的实现方式是inset into 一条已存在的主键数据(‘hoodie.datasource.write.recordkey.field’= ‘id’,这句是指定表的主键是哪个字段),默认是uuid。会通过(‘write.precombine.field’= ‘ts’,这是指定如果存在两条相同主键的数据,根据哪个字段去保留最新的数据),默认是ts,会保留ts大的那条数据

1.6 hudi 的查询

1.6.1 cow表

package work.jiang.hudi.read;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

 /**
 * cow 读取方式:快照查询+增量查询
 * 1.Incremental Query
 * 默认是Batch query,查询最新的Snapshot
 * Batch query: 同时设置read.start-commit和read.end-commit,start commit和end commit都包含,或者设置read.start-commit
 * ————————————————
 * 版权声明:本文为CSDN博主「Bulut0907」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
 * 原文链接:https://blog.csdn.net/yy8623977/article/details/123810836
 */
public class FlinkReadHudiCOWTable {
    public static void main(String[] args) throws Exception {
       // System.setProperty("HADOOP_USER_NAME", "hdfs");

        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

        bsEnv.setParallelism(1);

        bsEnv.enableCheckpointing(5 * 1000);

        StreamTableEnvironment env = StreamTableEnvironment.create(bsEnv, bsSettings);

        env.executeSql("CREATE TABLE hudi_users_fields2_cow(\n" +
                "    id BIGINT PRIMARY KEY NOT ENFORCED,\n" +
                "    name STRING,\n" +
                "    birthday TIMESTAMP(3),\n" +
                "    ts TIMESTAMP(3),\n" +
                "    `partition` VARCHAR(20)\n" +
                ") PARTITIONED BY (`partition`) WITH (\n" +
                "    'connector' = 'hudi',\n" +
                "    'table.type' = 'COPY_ON_WRITE',\n" +
                "    'path' = 'hdfs://cdh01:8022/user/hudi/hudi_users_fields2_cow',\n" +
                "    'read.tasks' = '1' \n" +
                ")");

        env.executeSql("select * from hudi_users_fields2_cow").print();
    }
}

运行结果,运行一次程序自动停止
在这里插入图片描述

1.6.2 mor表

package work.jiang.hudi.read;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * mor 读取方式:快照查询+增量查询+读优化查询
 * 1.Streaming Query
 * 默认是Batch query,查询最新的Snapshot
 * Streaming Query需要设置read.streaming.enabled = true。再设置read.start-commit,如果想消费所有数据,设置值为earliest
 *
 * 2.Incremental Query
 * 有3种使用场景
 * Streaming query: 设置read.start-commit
 * Batch query: 同时设置read.start-commit和read.end-commit,start commit和end commit都包含
 * TimeTravel: 设置read.end-commit为大于当前的一个instant time,read.start-commit默认为latest
 * ————————————————
 * 版权声明:本文为CSDN博主「Bulut0907」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
 * 原文链接:https://blog.csdn.net/yy8623977/article/details/123810836
 */
public class FlinkReadHudiMORTable {
    public static void main(String[] args) throws Exception {
        System.setProperty("HADOOP_USER_NAME", "hdfs");
        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);

        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

        //1.Streaming Query
        tableEnv.executeSql("CREATE TABLE hudi_users_fields2 (\n" +
                "    id BIGINT PRIMARY KEY NOT ENFORCED,\n" +
                "    name STRING,\n" +
                "    birthday TIMESTAMP(3),\n" +
                "    ts TIMESTAMP(3),\n" +
                "    `partition` VARCHAR(20)\n" +
                ") PARTITIONED BY (`partition`) WITH (\n" +
                "    'connector' = 'hudi',\n" +
                "    'table.type' = 'MERGE_ON_READ',\n" +
                "    'path' = 'hdfs://cdh01:8022/user/hudi/hudi_users_fields2',\n" +
                "    'read.streaming.enabled' = 'true',\n" +
                //"    'read.streaming.start-commit' = '20210316134557',\n" +
                "    'read.streaming.check-interval' = '1', \n" +
                "    'read.tasks' = '1' \n" +
                ")");

        tableEnv.executeSql("select * from hudi_users_fields2").print();

        /*tableEnv.executeSql("CREATE TABLE hudi_users2_prss (\n" +
                "    Pid VARCHAR(100) PRIMARY KEY NOT ENFORCED,\n" +
                "    InVehicleManager  VARCHAR(100),\n" +
                "    BigTypes VARCHAR(100),\n" +
                "    InVehicleCode  VARCHAR(100),\n" +
                "    CreateTime  VARCHAR(100),\n" +
                "    `CreateDay` VARCHAR(100)\n" +
                ") PARTITIONED BY (`CreateDay`) WITH (\n" +
                "    'connector' = 'hudi',\n" +
                "    'table.type' = 'MERGE_ON_READ',\n" +
                "    'path' = 'hdfs://cdh01:8022/user/hudi/hudi_users2_prss',\n" +
                "    'read.streaming.enabled' = 'true',\n" +
                "    'read.streaming.check-interval' = '1', \n" +
                "    'read.tasks' = '1' \n" +
                ")");*/



       // tableEnv.executeSql("select * from hudi_users2_prss").print();

        env.execute("read_hudi");
    }
}

运行结果,程序会一直运行,在’read.streaming.check-interval’ = ‘1’ 设置的频率进行刷新
在这里插入图片描述

2. presto 集成 hudi

2.1 presto 部署

参见
https://blog.csdn.net/jiangjiangaa666/article/details/125723084

2.2 创建hive连接器,启动presto

presto 集成hudi 是基于hive catalog, 同样是访问hive 外表进行查询,如果要集成,需要把hudi 包copy 到presto hive-hadoop2插件下面。

presto集成hudi方法: 将hudi jar复制到 presto hive-hadoop2下

cp */hudi-hadoop-mr-bundle-0.11.1.jar $PRESTO_HOME/plugin/hive-hadoop2/

vi catalog/hive.properties
connector.name = hive-hadoop2
hive.metastore.uri = thrift://cdh01:9083
hive.parquet.use-column-names=true

其中,hive.parquet.use-column-names=true设置来解决presto读取parquet类型问题,必填项

2.3 部署 yanagishima

参见https://blog.csdn.net/jiangjiangaa666/article/details/125723084

2.4 使用 yanagishima 查询 hudi

在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink1.13.0 + Hudi 0.11.1 + Hive2.1.1 + presto0.273.3 + yanagishima 18.0 的相关文章

随机推荐

  • CUDA向量加法示例

    CUDA向量相加示例 贺志国 下面以向量加法为例 介绍CUDA实现代码 以下是具体代码vector add cu 我的版本除CUDA相关函数外 其他代码均以C 的方式实现 并且尽可能调用STL库的数据结构和算法 注意 CUDA核函数内部的日
  • delphi 获取有输入焦点的活动窗口信息

    var wintext array 0 MAXBYTE of Char WdChar array of Char focuswhd THandle processId Pointer threadid Cardinal GUITHREADI
  • c语言入门---调试技巧

    目录 什么是bug 调试是什么 调试的基本步骤是什么 调试是什么 调试的基本步骤是什么 Debug和release的区别 windows的调试介绍 调试的准备 调试的操作 1 F5 2 F9 3 F10 4 F11 调试的时候查看程序当前的
  • kali linux基本命令

    文章目录 shell 什么是shell 查看shell shell与终端的区别 VIM编辑器 Linux常用命令 shell 什么是shell 在计算机科学中 shell俗称外壳 能够接收用户的命令并翻译给操作系统执行 是用户与操作系统 内
  • CryptoPP的LC_RNG算法的使用

    随机数发生器是密码学的一个重要原语 密码学库CryptoPP中提供了一些随机数发生器算法 如下图所示 今天 介绍一些其中LC RNG算法的使用 该库中的LC RNG算法就是著名的线性同余发生器算法 该算法由于执行效率高而被广泛使用 C语言库
  • @Conditional 初学

    点击 Conditional Target ElementType TYPE ElementType METHOD Retention RetentionPolicy RUNTIME Documented public interface
  • win10安装Tensorflow1.14.0 CUP版

    安装cpu版本 python3 6 12 tensorflow1 14 0 numpy1 16 0 python tensorflow 和 numpy之间版本要相对应 这很重要 不然可能会装不上 这是尝试了4天后的可行搭配 目 录 预备备
  • 代码题-判断循环依赖

    interface Module name string imports Module const moduleC Module name moduleC const moduleB Module name moduleB imports
  • 【ORACLE性能分析和优化思路学习笔记02:什么时候需要对性能进行干预】

    背景 近期负责的一些单位 一些数据库节点总是出现宕机或者自动重启 之前简单接触过oracle RAC数据库的一些管理 但是对性能分析和优化研究不深 这次实在是没办法了 DBA协调不动 只能自己出马了 好在自己有一定的基础 上手很快 现在对学
  • pytorch常见问题

    1 pytorch 的 dataloader 在读取数据时 设置了较大的 batchsize 和 num workers 然后训练一段时间报错 RuntimeError Too many open files Communication w
  • LeetCode 414. 第三大的数-C语言

    LeetCode 414 第三大的数 C语言 题目描述 解题思路 1 设置数组max 3 用于保存前三大的值 初始化为LONG MIN意为最小值 2 遍历数组对前三大的值进行更新 3 判断max 2 是否存在 若不存在直接返回max 0 代
  • 笔记本电脑切换不到投影仪 问题 解决方法

    我的笔记本是ati显卡的 在某次切换到投影仪的时候 出现问题 无法正确应用您所选择的以下设置 请更改设置并重试 外部监视器或投影仪 电视机 分辨率 颜色质量 无法正确应用您所选择的以下设置 请更改设置并重试 显示配置 解决思路 公司还有一个
  • Neo-reGeorg正向代理配合kali使用

    Neo reGeorg正向代理配合kali使用 一 Neo reGeorg介绍 在了解Neo reGeorg之前 首先应该知道大名鼎鼎的项目 https github com sensepost reGeorg 其用于开启目标服务器到本地的
  • 数据存储的随想

    文章目录 数据分布的演变 数据的使用 总结 数据分布的演变 数据分布就是一个关于数据存放在哪里的问题 数据存储的地方不是固定的 随着应用规模的扩大 为了治理的方便 会适时地调整 其中就会包括数据存储的调整 数据与应用部署在同一台设备 在早期
  • ACCESS的VBA中如何打开文件对话框并获取选中文件的路径

    在 ACCESS 的 VBA 中 可以使用 FileDialog 对象的 Show 方法来打开文件对话框 并使用 SelectedItems 属性来获取选中文件的路径 例如 Dim fd As FileDialog Set fd Appli
  • C/C++ 报错提示 “表达式必须包含类类型” 与 “不可访问”

    今天给大家分享两个常见的错误 定义对象 调用函数 时提示 表达式必须包含类类型 的报错 对象调用函数时提示 不可访问 的报错 一 表达式必须包含类类型 这种报错会出现在两种情况 类没有数据成员时 使用类定义对象时带括号了 定义类时以指针方式
  • MySQL重装——Database initialization failed错误处理

    卸载MySQL 笔者由于跟着网上的教程将MySQL安装到了C盘 忘记了可以走更改路径这条路 在卸载MySQL的路上一去不复返 试过网上诸多重装方案 大体均为以下步骤 控制面板卸载MySQL 删除注册表 删除ProgramData Appli
  • 导出文件:window.open()

    导出文件 window open globalBus emit loading const Download http window location host DI activity orderExcel actId this actId
  • Python-ElasticSearch客户端的封装(聚合查询、统计查询、全量数据)

    目录 ES Python客户端介绍 封装代码 测试代码 参考 ES Python客户端介绍 官方提供了两个客户端elasticsearch elasticsearch dsl pip install elasticsearch pip in
  • Flink1.13.0 + Hudi 0.11.1 + Hive2.1.1 + presto0.273.3 + yanagishima 18.0

    摘要 flink1 13 0 整合 Hudi 0 11 1 通过FlinkSQL程序 FlinkSQL命令行对Hudi的MOR及COW进行批量写 流式写 流式读取 批量读取 通过flink sql cdc flink sql kafka f