Kylin Flink Cube 引擎的前世今生

2023-10-29

Apache Kylin™ 是一个开源的、分布式的分析型数据仓库,提供Hadoop/Spark 之上的 SQL 查询接口及多维分析(OLAP)能力以支持超大规模数据,它能在亚秒内查询巨大的表。

Kylin 的核心思想是”预计算“,将数据按照指定的维度和指标,预先计算出所有可能的查询结果,利用空间换时间来加速模式固定的 OLAP 查询。

 

Kylin 的理论基础是 Cube 理论,每一种维度组合称之为 Cuboid,所有 Cuboid 的集合称之为 Cube。如下图,整个立方体称为 1 个 Cube,立方体中每个网格点称为 1 个 Cuboid,图中 (A, B, C, D) 和 (A, D) 都是 Cuboid,其中 (A, B, C, D) 称为 Base Cuboid。

基于预计算的思想,Cuboid 需要提前算好并存储起来。由于 Kylin 的场景是面向海量规模的大数据分析,所以 Cube 的构建利用了大数据的计算框架,我们常将计算框架构建 Cube 的实现称之为“Cube 引擎“。在过去的很长一段时间,构建 Cube 时所能选择的引擎只能是 Spark 或者 Hadoop 的 MapReduce 框架。

 

但随着 Kylin 3.1 版本的发布,我们将看到另一个 Cube 引擎正式加入到 Kylin 生态中:Kylin Flink Cube 引擎。在下文中,我们就来对 Flink 的 Cube 引擎做一个全面的介绍。

 

Flink Cube 引擎简介

2018 年底,我萌生了给 Kylin 写一个以 Flink 计算框架来作为 Cube 引擎的想法,当时我还在腾讯,主要从事 Flink 框架的研发。Flink 框架在当时已经不是大数据领域的一颗新星了,它基于 Google 的 DataFlow 模型以及 Streaming First 的设计理念要比 Spark 在流处理领域拥有先天的优势,而且已经被国内外众多公司所广泛使用。在释放计算能力方面,Spark 和 Flink 提供了相似的功能,都是大数据领域支持流和批的通用型计算引擎。既然 Spark 能作为 Kylin 的 Cube 引擎,那么 Flink 理论上没有理由不可以。

 

想要实现 Flink 的 Cube 引擎有两个初衷:

  1. 扩大 Flink 的生态;

  2. 满足腾讯内部统一流批计算引擎的需求(因为 Flink 是当时腾讯内部主推的流计算平台)。

当我跟 Kylin PMC 史少锋交流并提出这一想法后,他对此表示非常欢迎,这里我们必须要称赞一下 Kylin 社区对于接受新技术所持有的积极、开放的心态。

 

Flink Cube 引擎的开发就从 2019 年 1 月开始了,对我而言这是一个跨领域的过程,我需要从头了解 Kylin 以及 OLAP 领域的一些核心思想和概念(毕竟之前一直在做计算框架)以及 Kylin 和 Spark Cube 引擎的一些关键设计。

 

前后加起来差不多利用了数月的业余时间实现完成了整个实现。其中经历了数次调优,这里需要特别感谢 Kyligence 多位童鞋耐心、仔细地进行对比测试(尤其是 Kylin PMC 倪春恩和 Kyligence 的张亚倩童鞋),终于这个引擎的性能到了能够跟 Spark 相提并论的地步。

 

随后,我们在 6,7 月份开始在腾讯内部试点该引擎来构建 Cube,以支持 QQ 音乐、广点通等业务的分析需求。经过内部的试运行,我们观察到整体上它的性能表现要优于 Spark 的实现。然后,在 19 年 9 月的 Kylin 深圳 Meetup 上,我跟前同事程广旭共同分享了一个 Talk 介绍了 Kylin 在腾讯的落地实践以及 Flink Cube 引擎。[2]

Flink Cube 引擎实现 

跟 Kylin 的很多其他 Feature 一样,Flink Engine 最初也是在一个独立的分支上开发的,这个分支就是 engine-flink,2019 年底 Kylin 社区经过测试将该分支合并到了 master 分支。Flink Engine 最初使用的 Flink 版本是 1.7.2,后面升级到了 1.9.0。整个 Cube 引擎在 Jira 上有一个 Umbrella issue,编号是 KYLIN-3758[3],所有的子任务都在这个 issue 下面。

 

Flink Cube 引擎基于 Kylin 原先的插件化的架构,继承 IBatchCubingEngine 接口实现了 FlinkBatchCubingEngine2,是一个相对独立的模块,跟 Kylin 其他部件没有产生太多的耦合。它整体上延续了 Spark Cube 引擎的设计与实现,由于 Spark 跟 Flink 的 DataSet API 存在着一定程度的差异,所以在开发时需要进行一些适配工作。

 

这里我们先介绍一下 Spark Cube 引擎的核心:”By layer“ 算法。Kylin 官方曾经出了一篇博客介绍 Spark Cube 引擎以及该算法的实现[1],以下是这篇博客里的一段文字摘录:

 

The “by-layer” Cubing divides a big task into a couple steps, and each step bases on the previous step’s output, so it can reuse the previous calculation and also avoid calculating from very beginning when there is a failure in between. These makes it as a reliable algorithm. When moving to Spark, we decide to keep this algorithm, that’s why we call this feature as “By layer Spark Cubing”.

 

简而言之,"By layer" 算法的核心思想是逐层计算 Cube ,首先计算 Base Cuboid,然后计算维度数依次减少,逐层向下计算每层的 Cuboid。

 

在实现时,Cube 的构建流程,包含若干个步骤。选择特定的构建引擎通常会使用相应的计算框架提供的 API 去实现这些步骤。由于这些步骤都是一个个独立的 YARN application,所以,也并不是一个 segment 构建任务里所有的子任务都一定要由同一个构建引擎的 API 来实现。

 

接下来,介绍一下我们如何选择 Flink Cube 引擎来构建。我们在 Kylin 的 Web UI 上提供了 Flink Cube Engine 的选项,当用户编辑一个 Cube 信息时,可以在第五步 (Advanced Setting) 中的 Cube Engine 下拉选项中选择 “Flink”。

 

Cube 构建的若干步骤中,当属 ”Cuboid build“ 步骤最为耗时也最为关键。下面我们就来介绍一下,Flink Cube 引擎在对 ”Cuboid build" 步骤调优时有哪些考虑。

 

Flink Cube 引擎调优 

其实,在最初进行对比测试时,Flink 引擎要比 Spark 引擎慢不少。我们发现性能问题后首先对 Flink 框架的参数进行了调优。这里除了内存外,有三个核心参数,分别是并行度、单个 TM Slot 的数目、TM Container 的数目。他们之间的关系是:TM Container 的数目 = 并行度 / 单个 TM Slot 的数目。我们基于控制变量法(固定住并行度以及 Job 总内存不变)尝试调整出一个 Container 数与单 TM Slot 数性能最好的配比。结果得出的结论是,单个 TM 的 Slot 数目减少(当然单个 TM 的内存也会降低),拉起更多的 Container 数目的这种摊平的方式性能会更好。除此之外,在 Flink 批处理模块,它有几个优化配置项,包括对象复用,内存预分配等,通过对比测试,所起到的效果并不明显。

 

当然,仅仅对 Flink 框架的参数进行调优,并没有使得 Flink Cube 引擎的性能赶上 Spark Cube 引擎。接下来的一步优化也很关键:那就是合并/批量计算。通过分析 Flink Job 执行后的 ArchivedExecutionGraph,发现每一步都比较慢,且随着 Layer  的变化没有发生性能急剧下降的情况,基本维持了线性关系。于是,我认为问题应该还是在代码实现上,并不能完全照着 Spark 的方式来。所以,通过对整个 DAG 的重新分析,最终确认了性能瓶颈在于用于聚合 Cuboid 的 Reduce 算子以及对 Cuboid 进行 Encode 的 Map 算子上。

对于这两个算子,Flink 提供了相应的分组、分区的批量处理模式来提升整体处理的吞吐量,它们分别是 mapPartition/reduceGroup。它们会对上游的数据进行聚集,直到某分区的输入全部被接收,然后才会调用具体的 UDF 对数据进行迭代处理。当然,这两个算子也有它危险的地方,那就是它们很占用内存,数据量太大会存在内存耗尽从而导致 OOM 的风险。这一点,Flink 彻底的内存管理以及自定义类型系统的做法会有一些优势,它能够容纳更多的数据在内存中,并且有效地减少 GC 的频次,但仍然可能存在风险。所以,这一块的改进建议是引入一个场景化的开关:如果内存资源充足那么我们就可以尽量用这两个算子来降低构建时间,如果内存资源有限,那么我们可以选择更稳定的方式来构建。

 

Flink Cube & Spark Cube 

引擎对比测试 

  • 对比步骤:两个计算引擎分别构建 Cuboid 数据;

  • YARN 集群资源:4 个物理节点,每个物理节点 32 Core,125G 内存;

  • 数据源:基于 SSB 数据集,事实表包含 6 千万记录。

 

结果如下:

结论:Flink Engine在对比测试中已超过 2 mins + 的明显优势胜出。

*注:整个测试过程由 Kylin Committer 倪春恩实施并提供测试结果。

 

构建过程中的相关 UI 的截图如下:

两个 Engine 相关的配置信息如下:

Spark Cube Engine:

kylin.engine.spark-conf.spark.master=yarn
kylin.engine.spark-conf.spark.submit.deployMode=cluster
kylin.engine.spark-conf.spark.dynamicAllocation.enabled=true
kylin.engine.spark-conf.spark.dynamicAllocation.minExecutors=1
kylin.engine.spark-conf.spark.dynamicAllocation.maxExecutors=1000
kylin.engine.spark-conf.spark.dynamicAllocation.executorIdleTimeout=300
kylin.engine.spark-conf.spark.shuffle.service.enabled=true
kylin.engine.spark-conf.spark.hadoop.dfs.replication=2
kylin.engine.spark-conf.spark.driver.memory=4G
kylin.engine.spark-conf.spark.executor.memory=4G
kylin.engine.spark-conf.spark.executor.cores=1
kylin.engine.spark-conf.spark.yarn.executor.memoryOverhead=1024

Flink Cube Engine:

kylin.engine.flink-conf.jobmanager.heap.size=2G
kylin.engine.flink-conf.taskmanager.heap.size=4G
kylin.engine.flink-conf.taskmanager.numberOfTaskSlots=4
kylin.engine.flink-conf.taskmanager.memory.preallocate=false
kylin.engine.flink-conf.job.parallelism=80
kylin.engine.flink-conf.program.enableObjectReuse=false

Flink 引擎后续规划 

 

Flink Cube 引擎,随 Kylin 3.1 版本一起发布,这给了用户足够的信心来使用它。当然,由于维护精力受限,它还有一些不足和待改进的空间,我们很开心看到社区也有其他小伙伴将 Flink Cube 引擎在自己公司内使用并将相关的优化与改进反馈回 Kylin社区。例如,harveyyue 同学实现了 cubing step 中的 fact distinct 以及 convert to HFile 等子任务[2]。随着 Flink Cube 引擎被正式发布,我们有理由相信它能在 Kylin 生态中占有一席之地。

参考文献: 

[1]: http://kylin.apache.org/blog/2017/02/23/by-layer-spark-cubing/

[2]: https://github.com/apache/kylin/commits?author=harveyyue

[3]: https://issues.apache.org/jira/browse/KYLIN-3758

作者简介:

杨华,T3 出行大数据平台负责人,前腾讯高级工程师。Apache Hudi Committer & PMC Member。Apache Kylin 的 Flink Cube Engine 作者。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kylin Flink Cube 引擎的前世今生 的相关文章

随机推荐

  • CNN经典网络模型(四):GoogLeNet简介及代码实现(PyTorch超详细注释版)

    目录 一 开发背景 二 网络结构 三 模型特点 四 代码实现 1 model py 2 train py 3 predict py 4 spilit data py 五 参考内容 一 开发背景 GoogLeNet在2014年由Google团
  • @Validated 注解不起作用 怎么办?@Validated 无效 解决办法

    有一种可能是之前没有查到的 那就是pom缺少依赖 在项目的pom xml 文件中添加以上依赖 可有效解决问题
  • MySQL触发器trigger的使用

    Q 什么是触发器 A 触发器是与表有关的数据库对象 在满足定义条件时触发 并执行触发器中定义的语句集合 触发器的特性 1 有begin end体 begin end 之间的语句可以写的简单或者复杂 2 什么条件会触发 I D U 3 什么时
  • 线程的六种状态

    1 New 新建状态 线程刚被创建 start方法之前的状态 2 Runnable 运行状态 得到时间片运行中状态 Ready就绪 未得到时间片就绪状态 3 Blocked 阻塞状态 如果遇到锁 线程就会变为阻塞状态等待另一个线程释放锁 4
  • repo 使用

    repo 使用 repo start 创建并切换分支 repo start newbranchname all projectName repo start是对git checkout b 命令的封装 git checkout b 是在当前
  • 无监督特征选择算法综述

    无监督特征选择算法 Filter方法 只使用数据的内在属性 不使用聚类等其他辅助方法 速度快 单变量 Information based methods SUD Sequential backward selection method fo
  • 毕业设计-基于机器视觉的手写字体智能识别系统

    目录 前言 课题背景和意义 实现技术思路 一 系统整体结构框架设计 二 系统硬件设计 三 系统软件框架设计 四 实验与分析 五 总结 实现效果图样例 最后 前言 大四是整个大学期间最忙碌的时光 一边要忙着备考或实习为毕业后面临的就业升学做准
  • 分布式系统下的纠删码(二) -- Locally Repairable Codes (LRC)

    分布式系统下的纠删码 二 Locally Repairable Codes LRC 一 名词解释 MDS Maximun Distance Separable MDS 性质 是纠删码的一个重要性质 它保证n k m个磁盘中任意k个磁盘都可恢
  • OpenLayers官网教程-移动端地图和传感器

    这一系列翻译自openlayers官网的WorkShop OL官网提供了多个系列教程供开发者学习参考 其中QuickStart是面向初学者的hello world Tutorials提供了构建OL应用的一些基础知识 WorkShop 本系列
  • scss 样式穿透

    当一些组件 例如 轮播 全局引入时 只改当前页面的样式 用css类选择器不能直接选择更改 应用scss样式穿透 注 scoped让css只在当前组件生效 不考虑兼容问题 去掉scoped也可以直接更改css样式
  • # leetcode#5最长回文数C++

    leetcode 5最长回文数C 一 思路一 中心扩散 对每一个字符 检测它与它旁边的数是否为回文数 如果是 那么再扩展它 的长度检查 分奇偶情况讨论 得到以该字符为中心最长的回文数 在遍历过程中用max 2 储存该目前最长的回文数位置和长
  • iphone11屏比例_iPhone每一代的屏幕尺寸比例是多少

    iPhone2G屏幕为3 5英寸 分辨率为320 480像素 比例为3 2 iPhone3G屏幕为3 5英寸 分辨率为320 480像素 比例为3 2 iPhone3GS屏幕为3 5英寸 分辨率为320 480像素 比例为3 2 iPhon
  • QOpenGLWidget 纹理贴图

    环境 QT 5 12 8 本人初学Opengl 想要绘制一个正方形并且贴纹理 以下是参考别人代码自己整理的 创建QT工程 结构如下 代码如下 glwidget h ifndef GLWIDGET H define GLWIDGET H in
  • [Python 与 炒股] TuShare 使用篇之三

    2016年新年第一贴 大年夜搞这个只能说明春晚实在是有点无聊 在之前的blog里写了一个最简单的例子 http blog csdn net robertsong2004 article details 50642655 现在试一下简单的分析
  • 渗透测试-01信息收集

    0x01信息收集 1 什么是信息收集 信息收集是指通过各种方式获取所需要的信息 以便我们在后续的渗透过程更好的进行 比如目标的站点IP 中间件 脚本语言 端口 邮箱等等 信息收集包含资产收集但不限于资产收集 2 信息收集的意义 1 信息收集
  • 使用 easyjson,生成 xxx_easyjson.go 文件之后,对测试结果所产生的影响

    文章评论 原文地址 https blog csdn net luslin1711 article details 90244468 正文 博主 你好 文中的测试结果 似乎不是很正确 由于评论区字数的限制 我另开一篇文章 请您解惑 以下是我的
  • 轻量级c语言开源日志库log.c介绍 - 实现不同级别和参数化日志打印

    前言 c语言没有现成的日志库 如果要记录日志 需要自己封装一个日志库 如果要实现日志级别和参数打印 还是比较麻烦的 正好在github找到了一个c语言开源日志库 可以实现日志级别打印 参数打印 而且还会记录日期和行号 最重要的是代码非常少
  • Google API 设计指南-文档

    翻译自 API Design Guide Documentation 这一章是为 API 添加内部文档的指南 大部分 API 有概述 教程和更高级别的参考文档 此指南不讨论 API 名 资源名和方法名的信息请查看命名约定 注释格式 在 pr
  • OpenGL ES几个概念-顶点着色器、片元着色器、EGL

    一 OpenGL ES OpenGL ES是使用在手机端和嵌入式里的3D图形应用程序编程接口 是跨平台的API OpenGL ES是OpenGL的简化版本 OpenGL2 x 版本相比 1 x 版本有较大差异 1 x 版本为 fixed f
  • Kylin Flink Cube 引擎的前世今生

    Apache Kylin 是一个开源的 分布式的分析型数据仓库 提供Hadoop Spark 之上的 SQL 查询接口及多维分析 OLAP 能力以支持超大规模数据 它能在亚秒内查询巨大的表 Kylin 的核心思想是 预计算 将数据按照指定的