【源码改造】Flink-jdbc-connector源码简析+改造支持谓词下推

2023-11-18

一、需求背景分析

flink sql在维表关联时,会有一个场景:当右表的数据量比较大且有些数据虽然符合join条件,但其实对于下游来说数据可能没用,这样就浪费了flink的计算资源,且拉低了数据处理能力。如果在join前就把维表的数据进一步过滤,然后再join,这样就会使减轻“无用数据”对flink内存的占用,提高计算能力,进而优化数据处理的能力。

有两个思路可以解决这个问题,
一是将维表在join前先过滤不需要的数据,然后再注册为时态表函数,接着join的维表数据就是“下推后的数据”,但目前flink对于时态表函数join时并不完善,场景覆盖的不够全面。
二是在直接修改源码,给维表一个with参数去过滤数据,在join拉取维表数据时,直接过滤。说起来有点抽象,别急,这就是我们接下来要讨论内容。

 

二、flink-jdbc-connector源码分析与实现

1. 源码分析

1.1.官网解读

先了解下flink sql在代码层面从一个阶段到其他阶段的翻译过程。
在这里插入图片描述

阶段 解释
元数据管理

执行CREATE TABLE时,会在目标catalog种更新元数据,比如上图黄色的create table语句。语句中with下的参数不会被校验和解释。DDL语句将会被解释成CatalogTable实例。

生成逻辑计划

接下来,当规划和优化一个job下的sql时,CatalogTable实例将会根据sql的语法具体解析为:当读select 语句时,会解析成DynamicTableSource;当读到insert into 语句时,解析成DynamicTableSink。

DynamicTableSource\SinkFactory将会提供具体的解析方法将CatalogTable解析为DynamicTableSource\Sink。具体的,比如说with下的port,Factory会从CatalogTable中校验port是否是连接器支持的参数,然后获取值,并创建参数实例,以便往下个阶段传输。

默认情况下,通过java的SPI机制发现工厂实例,工厂实例要定义一个能被校验的工厂标识符,例如上图:‘connector’ = ‘custom’。

DynamicTableSource\Sink 在运行时将会实际的读写数据。

运行时

当逻辑计划生成时,planner将会获取连接器的实现。运行时核心接口是,InputFormat和SourceFunction,按另一个抽象级别分组为ScanRuntimeProvider、 LookupRuntimeProvider和的子类SinkRuntimeProvider

通过对官网的解读,
我们知道对于一个flink sql,首先会被维护到catalogTable实例中,catalogTable根据sql类型找到对应的DynamicTable;

这里有一点:create table不会生成DynamicTable,而是select语句会生成DynamicTableSource代表从物理表中拉取数据,insert into语句生成DynamicTableSink代表写入数据到物理表。

在Planning期间,通过with下的connector参数获取对应的工厂,然后通过工厂校验、解析(with下的)参数值(比如’port’=‘5022’)最后封装成参数实例,并传递给runtime的实例;

最后Runtime期间,根据参数实例定义的数据消费规则,开始真正的从物理表中处理(拉取\写入)数据。

 

1.2 JDBC-connector的解读

通过上面的分析,现在我们通过一个场景分析下,一个job sql是如何在源码中被解析和运行的。

先看一个job下的sql:sql的逻辑是clickhouse join clickhouse 后将数据输出到flink的控制台

CREATE TABLE `in_table` (
   `id` BIGINT NOT NULL,
   `name` String NOT NULL,
   `cloud_wise_proc_time` AS `proctime`()
 ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:clickhouse://xxx/gaogao?socket_timeout=900000',
   'username' = 'default',
   'table-name' = 'user'
 );

 CREATE TABLE `in_table_1` (
   `id` BIGINT NOT NULL,
   `age` BIGINT NOT NULL,
   _cw_insert_time TIMESTAMP(3)
 ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:clickhouse://xxx/gaogao?socket_timeout=900000',
   'username' = 'default',
   'table-name' = 'info',
   'lookup.cache.max-rows' = '100000',
   'lookup.cache.ttl' = '10MINUTE'
 );

 create table out_table(
	`id` BIGINT,
    `age` BIGINT,
    _cw_insert_time TIMESTAMP(3)
 ) with (
 'connector' = 'print');

insert into out_table 
select a.id,b.age,b._cw_insert_time 
from in_table as a  left join in_table_1  FOR SYSTEM_TIME AS OF a.cloud_wise_proc_time as  b  on a.id=b.id;

下图展示了在维表关联时维表侧(in_table_1)拉取数据的源码逻辑:
在这里插入图片描述
Planning时
JdbcDynamicTableFactory拿到(in_table_1)CatalogTable实例的参数值(如下),

   'connector' = 'jdbc',
   'url' = 'jdbc:clickhouse://xxx:18100/gaogao?socket_timeout=900000',
   'username' = 'default',
   'table-name' = 'info',
   'lookup.cache.max-rows' = '100000',
   'lookup.cache.ttl' = '10MINUTE'

进行检查和解析,如必要的连接参数(url、username、password、driver、table-name)用于连接clickhouse的那张表;可选的配置(例查询缓存、分区查询),用于拉取数据时的策略,比如单次查询最多拉取10000条数据到缓存中,缓存每10分钟过期一次,以免没有及时查到更新后的数据。
然后生成参数实例,并将参数传递到runtime阶段。

runtime时
JdbcRowDataLookupFunction主要有两个逻辑,一个是通过planning传递的必要参数,去建立JDBC连接,二是根据查询条件去查询并获取数据。

关键来了:

查询条件是维表关联的关联字段,而值来源于左表。具体的说,当左表通过物理表来查询到一条数据时,这条数据中关联字段对应的数据作为右表查询时的查询条件!!!

当获取到这条消息之后,那谓词下推要实现的逻辑点就引刃而解了,即将谓词下推逻辑放到eval的查询sql中!!!

 

2. 源码改造

按照planning到runtime的顺序改造:

2.1. planning

添加with参数,即谓词下推参数

JdbcDynamicTableFactory{
...
    private static final ConfigOption<String> PRE_FILTER_CONDITION =
            ConfigOptions.key("lookup.data.filter")
                    .stringType()
                    .defaultValue("")
                    .withDescription("filter data before dimension table join.");
 
     /**
     * 用于可选参数的存储,和检查。
     * @return
     */
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        Set<ConfigOption<?>> optionalOptions = new HashSet<>();
        。。。
        optionalOptions.add(PRE_FILTER_CONDITION);
        return optionalOptions;
    }

2.2 Runtime

public class JdbcLookupFunction extends TableFunction<Row> {
。。。
    public JdbcLookupFunction(
            JdbcOptions options,
            JdbcLookupOptions lookupOptions,
            String[] fieldNames,
            TypeInformation[] fieldTypes,
            String[] keyNames) {
            。。。
       //嵌入下推条件
       this.query =
                FieldNamedPreparedStatementImpl.parseNamedStatement(
                        options.getDialect()
                                .getSelectFromStatement(
                                        options.getTableName(), fieldNames, keyNames, lookupOptions.getPreFilterCondition()), new HashMap<>());
@Internal
public interface JdbcDialect extends Serializable {
。。。
    /**
     * 维表join查询sql语句
     * " SELECT `user_id`, `age` FROM `w` WHERE `user_id` = ? and `age`>3"
     *
     * @param tableName          表名
     * @param selectFields       查询字段
     * @param conditionFields    关联字段
     * @param preFilterCondition 预过滤条件:一定程度减小查询缓存 例如:age>3,进一步过滤user_id等于某值的数据
     * @return
     */
    default String getSelectFromStatement(
            String tableName, String[] selectFields, String[] conditionFields, String preFilterCondition) {
        String selectExpressions =
                Arrays.stream(selectFields)
                        .map(this::quoteIdentifier)
                        .collect(Collectors.joining(", "));
        String fieldExpressions =
                Arrays.stream(conditionFields)
                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
                        .collect(Collectors.joining(" AND "));
        return "SELECT "
                + selectExpressions
                + " FROM "
                + quoteIdentifier(tableName)
                + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "")
                + (StringUtils.isNotBlank(preFilterCondition) ? " AND " + preFilterCondition : "");
    }

 

3. SQL验证

CREATE TABLE `in_table` (
   `id` BIGINT NOT NULL,
   `name` String NOT NULL,
   `cloud_wise_proc_time` AS `proctime`()
 ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:clickhouse://xxx/gaogao?socket_timeout=900000',
   'username' = 'default',
   'table-name' = 'user'
 );

 CREATE TABLE `in_table_1` (
   `id` BIGINT NOT NULL,
   `age` BIGINT NOT NULL,
   _cw_insert_time TIMESTAMP(3)
 ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:clickhouse://xxx/gaogao?socket_timeout=900000',
   'username' = 'default',
   'table-name' = 'info',
   'lookup.cache.max-rows' = '100000',
   'lookup.cache.ttl' = '10MINUTE'
    'lookup.data.filter'='toInt32(toYYYYMMDD(_partition_day))=20220707',
 );

 create table out_table(
	`id` BIGINT,
    `age` BIGINT,
    _cw_insert_time TIMESTAMP(3)
 ) with (
 'connector' = 'print');

insert into out_table 
select a.id,b.age,b._cw_insert_time 
from in_table as a  left join in_table_1  FOR SYSTEM_TIME AS OF a.cloud_wise_proc_time as  b  on a.id=b.id;

此时flink job 在join前,in_table_1会只拉取_partition_day=20220707的数据,实现谓词下推。

 

参考:
https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/table/sourceSinks.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【源码改造】Flink-jdbc-connector源码简析+改造支持谓词下推 的相关文章

随机推荐

  • html网页如何将文字排版,【html】文字排版

    Web开发过程中文字排版 默认的情况下 行末的长单词会撑开容器 我们想要的是 像word一样 能够自动换行 既不撑大容器 也不强制拆开行末单词 并且不会隐藏行末单词的多余字母 不能撑开容器 完整的单词不能被强制拆开 如果行末是长单词的话 整
  • IDEA类文件后边有注释插件:Show Comment

    具体功能是在侧边文件树中 显示Java类的注释信息 IDEA文件树增强插件 Show Comment 使用方法 1 类上面加入javadoc注释 回车就可以了 2 在插件市场里面搜索Show Comment 3 重新idea即可 代码填写和
  • 电信客户流失预测----科大讯飞xDataWhale

    记录第一次参加正式的数据挖掘竞赛 由科大讯飞xDatawhale举办的 电信客户流失预测挑战赛 报名链接 2022 iFLYTEK A I 开发者大赛 讯飞开放平台 一 赛题概要 赛题背景 随着市场饱和度的上升 电信运营商的竞争也越来越激烈
  • Day 12: Twin Transformer by 美团

    这是美团和澳大利亚阿德莱德大学联合发表的新文章 也是和 Transformer 相关的 以下是一些要点 Swin Transformer 的 Shifted Windows 虽然有效 但是由于尺寸不同 因此在用现有的深度学习模型来实现的时候
  • CentOS常用zip压缩和解压缩命令

    1 压缩文件夹为zip文件 root cgls zip r mydata zip mydata 2 把mydata zip解压到mydatabak目录里面 root cgls unzip mydata zip d mydatabak 3 m
  • 电脑开机后,显示屏无信号怎么处理?

    转自 微点阅读 https www weidianyuedu com 随着使用电脑的用户越来越多 而使用的用户遇到的问题就越多了 而经常用电脑的同学大部分都遇到过电脑显示器无信号的情况吧 其实相比显示器没有任何显示而言 电脑显示器无信号的故
  • SQLServer如何统计每两小时的值

    把当前时间的 时分转为数字 select CONVERT FLOAT replace CONVERT VARCHAR 6 GETDATE 108 思路 select sum 数字 年月日 小时 2取整 from 表 group by 年月日
  • kafka学习笔记(一)简介

    这是对我找到的学习资料的整理 非手打 参考 https kafka apachecn org intro html https blog csdn net weixin 39468305 article details 106346280
  • Cannot forward after response has been committed问题解决及分析

    通过TOMCAT把系统启动 可以正常登陆门户 登陆进去选择子系统的时候点击登陆的时候 可是去又回到了登陆界面 如此反复就是不能够进入子系统 查看后台报的错误 Cannot forward after response has been co
  • 数据库密码忘记了怎么办

    修改数据库密码 方法1 用SET PASSWORD命令 首先登录MySQL 格式 mysql gt set password for 用户名 localhost password 新密码 例子 mysql gt set password f
  • 应急响应-账户排查

    用户信息排查 在服务器被入侵之后 攻击者可能会建立相关账户 方便进行远程控制 主要采用一下几种 直接建立一个新用户 有时候为了混淆视听 账户名称和系统常用名相似 激活一个系统中的默认用户 但是这个用户不经常使用 建立一个隐藏用 在windo
  • java-通过ip获取地址

    添加maven依赖
  • 关于ArcMap中打开ArcToolbox导致闪退的解决办法

    最近好久不用ArcGis的小编要用到ArcMap去发送一个GP服务 发现按照套路打开ArcMap点击ArcToolbox时 发生了ArcMap的闪退现象 几经周折终于解决了问题 希望也遇到这类问题的同学能够参考解决 而不是无脑的去重装软件
  • C# 实现ESC退出窗口的几种方法

    实现ESC退出窗口的几种方法 引言 方法一 同步按钮法 方法二 监听按键法 方法三 隐藏按钮法 最后 引言 我们通常用通过点击取消按键或者右上角的 X 盒子退出的方法来实现关闭当前Form窗体 但要使用按键ESC退出关闭窗口就显得更加高级了
  • 解决SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]的方案!!!!!

    目录 前提 一 安装maven helper插件 1 安装 2 安装成功 3 使用 二 去掉冲突的依赖包 1 前面已找到目标依赖 去pom文件内操作 2 去除 3 最后就可以了 前提 今天单元测试遇到了jar包冲突 SLF4J Class
  • 自己学驱动17——ARM工作模式和ARM9寄存器

    1 ARM体系CPU的7种工作模式 1 用户模式 usr ARM处理器正常的程序执行状态 2 快速中断模式 fiq 用于高速数据传输或通道处理 3 中断模式 irq 用于通用的中断处理 4 管理模式 svc 操作系统使用的保护模式 5 数据
  • 【Python】PyCharm中调用另一个文件的函数或类

    欢迎来到Python专栏 PyCharm中调用另一个文件的函数或类 o o 嗨 我是小夏与酒 博客主页 小夏与酒的博客 该系列文章专栏 Python学习专栏 文章作者技术和水平有限 如果文中出现错误 希望大家能指正 欢迎大家关注 目录 Py
  • 数据结构:栈

    文章目录 栈 一 概述 二 添加数据 三 删除数据 栈 一 概述 栈 Stack 是一种特殊的线性表 它只允许在一端进行插入和删除操作 通常被称为 后进先出 Last In First Out LIFO 的数据结构 栈由一系列元素组成 每个
  • python常见的三种格式化输出

    Author Father Teng Name input name Age int input age Job input job info info of 0 Name 0 Age 1 Job 2 format Name Age Job
  • 【源码改造】Flink-jdbc-connector源码简析+改造支持谓词下推

    一 需求背景分析 flink sql在维表关联时 会有一个场景 当右表的数据量比较大且有些数据虽然符合join条件 但其实对于下游来说数据可能没用 这样就浪费了flink的计算资源 且拉低了数据处理能力 如果在join前就把维表的数据进一步