Flink 1.11:更好用的流批一体 SQL 引擎

2023-11-05

许多的数据科学家,分析师和 BI 用户依赖交互式 SQL 查询分析数据。Flink SQL 是 Flink 的核心模块之一。作为一个分布式的 SQL 查询引擎。Flink SQL 提供了各种异构数据源的联合查询。开发者可以很方便地在一个程序中通过 SQL 编写复杂的分析查询。通过 CBO 优化器、列式存储、和代码生成技术,Flink SQL 拥有非常高的查询效率。同时借助于 Flink runtime 良好的容错和扩展性,Flink SQL 可以轻松处理海量数据。

 

在保证优秀性能的同时,易用性是 1.11 版本 Flink SQL 的重头戏。易用性的提升主要体现在以下几个方面:

 

  • 更方便的追加或修改表定义

  • 灵活的声明动态的查询参数

  • 加强和统一了原有 TableEnv 上的 SQL 接口

  • 简化了 connector 的属性定义

  • 对 Hive 的 DDL 做了原生支持

  • 加强了对 python UDF 的支持

 

下面逐一为大家介绍 ~

 

Create Table Like

 

在生产中,用户常常有调整现有表定义的需求。例如用户想在一些外部的表定义(例如 Hive metastore)基础上追加 Flink 特有的一些定义比如 watermark。在 ETL 场景中,将多张表的数据合并到一张表,目标表的 schema 定义其实是上游表的合集,需要一种方便合并表定义的方式。

 

从 1.11 版本开始,Flink 提供了 LIKE 语法,用户可以很方便的在已有的表定义上追加新的定义。

 

例如我们可以使用下面的语法给已有表 base_table 追加 watermark 定义:

CREATE [TEMPORARY] TABLE base_table (    id BIGINT,    name STRING,    tstmp TIMESTAMP,    PRIMARY KEY(id)) WITH (    'connector': 'kafka') CREATE [TEMPORARY] TABLE derived_table (    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND)LIKE base_table;

 

这里 derived_table 表定义等价于如下定义:

CREATE [TEMPORARY] TABLE derived_table (    id BIGINT,    name STRING,    tstmp TIMESTAMP,    PRIMARY KEY(id),    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND) WITH (    ‘connector’: ‘kafka’)

 

对比之下,新的语法省去了重复的 schema 定义,用户只需要定义追加属性,非常方便简洁。

 

多属性策略

 

有的小伙伴会问,原表和新表的属性只是新增或追加吗?如果我想覆盖或者排除某些属性该如何操作?这是一个好问题,Flink LIKE 语法提供了非常灵活的表属性操作策略。

 

LIKE 语法支持使用不同的 keyword 对表属性分类:

 

  • ALL:完整的表定义

  • CONSTRAINTS: primary keys, unique key 等约束

  • GENERATED: 主要指计算列和 watermark

  • OPTIONS: WITH (...) 语句内定义的 table options

  • PARTITIONS: 表分区信息

 

在不同的属性分类上可以追加不同的属性行为:

 

  • INCLUDING:包含(默认行为)

  • EXCLUDING:排除

  • OVERWRITING:覆盖

 

下面这张表格说明了不同的分类属性允许的行为:

 

 

INCLUDING

EXCLUDING

OVERWRITING

ALL

✔️

✔️

CONSTRAINTS

✔️

✔️

PARTITIONS

✔️

✔️

GENERATED

✔️

✔️

✔️

OPTIONS

✔️

✔️

✔️

 

例如下面的语句:

CREATE [TEMPORARY] TABLE base_table (    id BIGINT,    name STRING,    tstmp TIMESTAMP,    PRIMARY KEY(id)) WITH (    'connector': 'kafka',    'scan.startup.specific-offsets': 'partition:0,offset:42;partition:1,offset:300',    'format': 'json') CREATE [TEMPORARY] TABLE derived_table (    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND)WITH (    'connector.starting-offset': '0')LIKE base_table (OVERWRITING OPTIONS, EXCLUDING CONSTRAINTS);

等价的表属性定义为:

CREATE [TEMPORARY] TABLE derived_table (    id BIGINT,    name STRING,    tstmp TIMESTAMP,    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND) WITH (    'connector': 'kafka',    'scan.startup.specific-offsets': 'partition:0,offset:42;partition:1,offset:300',    'format': 'json')

 

细节参见:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table

 

 

Dynamic Table Options

 

在生产中,调整参数是一个常见需求,很多的时候是临时修改(比如通过终端查询和展示),比如下面这张 Kafka 表:

 
create table kafka_table (  id bigint,  age int,  name STRING) WITH (  'connector' = 'kafka',  'topic' = 'employees',  'scan.startup.mode' = 'timestamp',  'scan.startup.timestamp-millis' = '123456',  'format' = 'csv',  'csv.ignore-parse-errors' = 'false')

 

 

在之前的版本,如果用户有如下需求:

 

  • 用户需要指定特性的消费时间戳,即修改 scan.startup.timestamp-millis 属性

  • 用户想忽略掉解析错误,需要将 format.ignore-parse-errors 改为 true

 

只能使用 ALTER TABLE 这样的语句修改表的定义,从 1.11 开始,用户可以通过动态参数的形式灵活地设置表的属性参数,覆盖或者追加原表的 WITH (...) 语句内定义的 table options。

 

基本语法为:

 

table_name /*+ OPTIONS('k1'='v1', 'aa.bb.cc'='v2') */

 

OPTIONS 内的键值对会覆盖原表的 table options,用户可以在各种 SQL 语境中使用这样的语法,例如:

CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);
-- override table options in query sourceselect id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
-- override table options in joinselect * from    kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1    join    kafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2    on t1.id = t2.id;
-- override table options for INSERT target tableinsert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;

 

动态参数的使用没有语境限制,只要是引用表的地方都可以追加定义。在指定的表后面追加的动态参数会自动追加到原表定义中,是不是很方便呢 :)

 

由于可能对查询结果有影响,动态参数功能默认是关闭的, 使用下面的方式开启该功能:

// instantiate table environmentTableEnvironment tEnv = ...// access flink configurationConfiguration configuration = tEnv.getConfig().getConfiguration();// set low-level key-value optionsconfiguration.setString("table.dynamic-table-options.enabled", "true");

 

细节参见:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/hints.html

 

SQL API 改进

 

随着 Flink SQL 支持的语句越来越丰富,老的 API 容易引起一些困惑:

 

  • 原先的 sqlUpdate() 方法传递 DDL 语句会立即执行,而 INSERT INTO 语句在调用 execute 方法时才会执行

  • Table 程序的执行入口不够清晰,像 TableEnvironment.execute() 和 StreamExecutionEnvironment.execute() 都可以触发 table 程序执行

  • execute 方法没有返回值。像 SHOW TABLES 这样的语句没有很好地方式返回结果。另外,sqlUpdate 方法加入了越来越多的语句导致接口定义不清晰,sqlUpdate 可以执行 SHOW TABLES 就是一个反例

  • 在 Blink planner 一直提供多 sink 优化执行的能力,但是在 API 层没有体现出来

 

1.11 重新梳理了 TableEnv 上的 sql 相关接口,提供了更清晰的执行语义,同时执行任意 sql 语句现在都有返回值,用户可以通过新的 API 灵活的组织多行 sql 语句一起执行。

 

更清晰的执行语义

 

新的接口 TableEnvironment#executeSql 统一返回抽象 TableResult,用户可以迭代 TableResult 拿到执行结果。根据执行语句的不同,返回结果的数据结构也有变化,比如 SELECT 语句会返回查询结果,而 INSERT 语句会异步提交作业到集群。

 

组织多条语句一起执行

 

新的接口 TableEnvironment#createStatementSet 允许用户添加多条 INSERT 语句并一起执行,在多 sink 场景,Blink planner 会针对性地对执行计划做优化。

 

新旧 API 对比

 

一张表格感受新老 API 的变化:

 

sqlUpdate vs executeSql

 

Current Interface

New Interface

tEnv.sqlUpdate("CREATE TABLE ...");

TableResult result = tEnv.executeSql("CREATE TABLE ...");

tEnv.sqlUpdate("INSERT INTO ... SELECT ...");

tEnv.execute("test");

TableResult result = tEnv.executeSql("INSERT INTO ... SELECT ...");

 

execute vs createStatementSet

 

Current Interface

New Interface

tEnv.sqlUpdate("insert into xx ...")

tEnv.sqlUpdate("insert into yy ...")

tEnv.execute("test")

StatementSet ss = tEnv.createStatementSet();

ss.addInsertSql("insert into xx ...");

ss.addInsertSql("insert into yy ...");

TableResult result = ss.execute();

 

tEnv.insertInto("sink1", table1)

tEnv.insertInto("sink2", table2)

tEnv.execute("test")

StatementSet ss = tEnv.createStatementSet();

ss.addInsert("sink1", table1);

ss.addInsert("sink2", table2);

TableResult result = ss.execute()

 

详情参见:https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878

 

Hive 语法兼容加强

 

从 1.11 开始,Flink  SQL 将 Hive parser 模块独立出来,用以兼容 Hive 的语法,目前 DDL 层面,DB、Table、View、Function 相关的语法均已支持。搭配 HiveCatalog,Hive 的同学可以直接使用 Hive 的语法来进行相关的操作。

 

在使用 hive 语句之前需要设置正确的 Dialect:

EnvironmentSettings settings = EnvironmentSettings.newInstance()...build();TableEnvironment tableEnv = TableEnvironment.create(settings);// to use hive dialecttableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);// use the hive catalogtableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);tableEnv.useCatalog(hiveCatalog.getName());

 

之后我们便可以使用 Hive 的语法来执行一些 DDL,例如最常见的建表操作:

create external table tbl1 (  d decimal(10,0),  ts timestamp)partitioned by (p string)location '%s'tblproperties('k1'='v1');  create table tbl2 (s struct<ts:timestamp,bin:binary>) stored as orc;
create table tbl3 (  m map<timestamp,binary>)partitioned by (p1 bigint, p2 tinyint)row format serde 'org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe';
create table tbl4 (  x int,  y smallint)row format delimited fields terminated by '|' lines terminated by '\n';

 

对于 DQL 的 Hive 语法兼容已经在规划中,1.12 版本会兼容更多 query 语法 ~

 

详情参见:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html

 

更简洁的 connector 属性

 

1.11 重新规范了 connector 的属性定义,新的属性 key 更加直观简洁,和原有的属性 key 相比主要做了如下改动:

 

  • 使用 connector 作为 connector 的类型 key,connector 版本信息直接放到 value 中,比如 0.11 的 kafka 为 kafka-0.11 

  • 去掉了其余属性中多余的 connector 前缀

  • 使用 scan 和 sink 前缀标记 source 和 sink 专有属性

  • format.type 精简为 format ,同时 format 自身属性使用 format 的值作为前缀,比如 csv format 的自身属性使用 csv 统一作前缀

 

例如,1.11 Kafka 表的定义如下:

CREATE TABLE kafkaTable ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3)) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'csv', 'scan.startup.mode' = 'earliest-offset')

 

详情参见:https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory

 

JDBC catalog

 

在之前的版本中,用户只能通过显示建表的方式创建关系型数据库的镜像表。用户需要手动追踪 Flink SQL 的表 schema 和数据库的 schema 变更。在 1.11,Flink SQL 提供了一个 JDBC catalog 接口对接各种外部的数据库系统,例如 Postgres、MySQL、MariaDB、AWS Aurora、etc。

 

当前 Flink 内置了 Postgres 的 catalog 实现,使用下面的代码配置 JDBC catalog:

CREATE CATALOG mypg WITH(    'type' = 'jdbc',    'default-database' = '...',    'username' = '...',    'password' = '...',    'base-url' = '...');
USE CATALOG mypg;

用户也可以实现 JDBCCatalog 接口定制其他数据库的 catalog ~

详情参见:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#postgres-database-as-a-catalog

 

Python UDF 增强

 

1.11 版本的 py-flink 在 python UDF 方面提供了很多增强,包括 DDL 的定义方式、支持了标量的向量化 python UDF,支持全套的 python UDF metrics 定义,以及在 SQL-CLI 中定义 python UDF。

 

DDL 定义 python UDF

 

1.10.0 版本引入了对 python UDF 的支持。但是仅仅支持 python table api 的方式。1.11 提供了 SQL DDL 的方式定义 python UDF, 用户可以在 Java/Scala table API 以及 SQL-CLI 场景下使用。

 

例如,现在用户可以使用如下方式定义 Java table API 程序使用 python UDF:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
tEnv.getConfig().getConfiguration().setString("python.files", "/home/my/test1.py");tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");tEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");Table table = tEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str").select("func1(str)");
tEnv.toDataSet(table, String.class).collect();

 

向量化支持

 

向量化 Python  UDF 相较于普通函数大大提升了性能。用户可以使用流行的 python 库例如 Pandas、Numpy 来实现向量化的 python UDF。用户只需在装饰器 udf 中添加额外的参数 udf_type="pandas" 即可。

 

例如,下面的样例展示了如何定义向量化的 Python 标量函数以及在 python table api 中的应用:

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT(), udf_type="pandas")def add(i, j):  return i + j
table_env = BatchTableEnvironment.create(env)
# register the vectorized Python scalar functiontable_env.register_function("add", add)
# use the vectorized Python scalar function in Python Table APImy_table.select("add(bigint, bigint)")
# use the vectorized Python scalar function in SQL APItable_env.sql_query("SELECT add(bigint, bigint) FROM MyTable")

 

详情参见:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/python/vectorized_python_udfs.html

 

另外,1.11 对 python UDF 的 metrics 做了全面支持,现在用户可以在 UDF 中方便地定义各种类型的 metrics,由于篇幅关系,这里不作详细描述,见 python UDF metrics。

详情参见:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/python/metrics.html

                         关注博主的微信公众号,其中有更多精彩文章,还有大量免费教学视频和电子书等你来拿!

                                                                

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink 1.11:更好用的流批一体 SQL 引擎 的相关文章

随机推荐

  • 判断两个List<Map>是否相同以及其他类操作

    根据自身的业务 搞了个list 是否相等的方法 较笨拙 有更好的方法 欢迎各位大佬指教 仅根据自身业务编写 如需改动请自便 改动也好改动 注释很详细 import com google common collect Maps import
  • [非线性控制理论]1_Lyapunov直接方法

    非线性控制理论 1 Lyapunov直接方法 非线性控制理论 2 不变性原理 非线性控制理论 3 基础反馈稳定控制器设计 非线性控制理论 4 反馈线性化 反步法 非线性控制理论 5 自适应控制器 Adaptive controller 非线
  • 【Python】Python可变参数*args **kwargs

    可变参数 args 和 kwargs 作为函数定义时 收集未匹配参数组成tuple或dict对象 参数也就是 args收集所有未匹配的位置参数组成一个tuple对象 局部变量args指向此tuple对象 参数也就是 kwargs收集所有未匹
  • 巧用异频测量快速评估友商4G网络覆盖

    摘 要 通过在联通4G网络异频测量配置数据中增加友商使用的4G网络频点 让联通现网中使用全网通的用户上报的测量报告数据 MR 包含有友商频点的测量信息 再对测量报告数据 MR 进行统计分析 可以实现快速 高效的对友商4G网络的覆盖进行全面评
  • 电流检测总结

    电流检测总结 电流检测是工业上常用到的一门技术 平常工作也经常与之打交道 因为我是小白 所以今天打算对它做一个简单的总结 防止遗忘 对于被检测的电路 一般使用阻值很小的采样电阻串联在电路中进行采样 然后检测这个采样电阻两端的电压信号 Vin
  • Unity Shader 实现描边OutLine效果

    Shader实现描边流程大致为 对模型进行2遍 2个pass 绘制 第一遍 描边pass 在vertex shader中对模型沿顶点法线方向放大 fragment shader设置输出颜色为描边颜色 第二遍正常绘制模型 除被放大的部分外 其
  • react如何调用子组件身上的方法

    使用场景 需要重复触发子组件弹窗或者需要在子组件修改值的时候可以采用调用子组件身上特定方法执行操作 在次介绍一下最新hooks的操作和旧版本class组件调用方式 一 Hooks api调用方式 需要用到的Api useRef useImp
  • 哈呀嗓,济南

    今天收到了泰莱区宏图三胞招聘主管的邮件 说是让联系沈经理 备注在这里以免忘记 还有就是 下午打印简历的时候潍坊新北海打电话过来 说他们公司也需要php程序员 这个也待定吧 一切还是以济南为主吧 不行再走 地形 济南市位于北纬36 40 东经
  • XXL-JOB(分布式任务调度平台)的使用(详细教程)

    概述 首先我们要知道什么是XXL JOB 官方简介 XXL JOB是一个分布式任务调度平台 其核心设计目标是开发迅速 学习简单 轻量级 易扩展 现已开放源代码并接入多家公司线上产品线 开箱即用 XXL JOB的有点特性 1 简单 支持通过W
  • Hibernate 项目查询数据报 UnknownEntityTypeException

    原因分析 1 hibernate cfg xml配置文件有没有映射实体类
  • 一文带你全面理解向量数据库

    近些年来 向量数据库引起业界的广泛关注 一个相关事实是许多向量数据库初创公司在短期内就筹集到数百万美元的资金 你很可能已经听说过向量数据库 但也许直到现在才真正关心向量数据库 至少 我想这就是你现在阅读本文的原因 如果你阅读本文只是为了简单
  • wireshark过滤器的使用

    目录 wireshark wireshark的基本使用 wireshark过滤器的区别 抓包案例 wireshark wireshark的基本使用 抓包采用 wireshark 提取特征时 要对 session 进行过滤 找到关键的stre
  • 华为云使用手册

    华为云重磅福利 云主机 海外云主机 云容器和多款云产品0元领取 华为云重磅推出云上优选 特惠来袭来迎接这个来之不易的春天 本次活动依然是给到了很低的折扣 0 7折起 活动走起 福利1 免费试用海外云主机和云原生容器网页连接 进入免费试用专区
  • CentOS安装python3.x最新版和chrome chromedriver

    之前使用selenium wire的响应拦截器获取请求头中的签名需要部署到服务器 所以得搭建一个服务器运行环境 安装过程有坑 这里记录一下 Linux平台安装需要下载源码包自己编译 下载地址 https www python org dow
  • hexo主题标签的使用

    https akilar top posts 615e2dec 这个是我看的教程 我直接复制的源码 友情链接 LrcShare 实现hexo标签的可以折叠 hexo标签的使用方法 要实现Hexo标签的可折叠 可以使用Hexo内置的foldi
  • ad中按钮开关的符号_收藏:电路图符号大全

    电子设备中有各种各样的图 能够说明它们工作原理的是电原理图 简称电路图 电路图是说明模拟电子电路工作原理的 它用各种图形符号表示电阻器 电容器 开关 晶体管等实物 用线条把元器件和单元电路按工作原理的关系连接起来 一张电路图就好像是一篇文章
  • 在SpringBoot中加入jsp

    SpringBoot官方不推荐在 SpringBoot 中使用 jsp 的 那么到底可以使用吗 答案是肯定的 不过需要导入tomcat 插件启动项目 不能再用 SpringBoot 默认 tomcat 了 一 导入SpringBoot的to
  • React实现大文件上传、react-dropzone

    React大文件上传的实现方案大致如下 使用第三方组件库实现文件上传 如react dropzone 将大文件分成多个小块 并使用XMLHttpRequest或者fetch发送分块上传请求 为了保证数据完整性 每个请求都需要携带校验码 在上
  • (0)JavaScript语法---小程序回调函数【幼儿园级教程】

    微信小程序中的回调函数 史上最简单的幼儿园基础教程 小程序的回调函数 汉字版的编码 你是不是第一次见到 总结 小程序的回调函数 在小程序包含逻辑时 回调函数几乎是无法避免 在整个使用中 发现大部分帖子都是针对有一定的基础的伙伴写的 也比较晦
  • Flink 1.11:更好用的流批一体 SQL 引擎

    许多的数据科学家 分析师和 BI 用户依赖交互式 SQL 查询分析数据 Flink SQL 是 Flink 的核心模块之一 作为一个分布式的 SQL 查询引擎 Flink SQL 提供了各种异构数据源的联合查询 开发者可以很方便地在一个程序