关于 Apache Beam 实战指南系列文章
随着大数据 2.0 时代悄然到来,大数据从简单的批处理扩展到了实时处理、流处理、交互式查询和机器学习应用。近年来涌现出诸多大数据应用组件,如 HBase、Hive、Kafka、Spark、Flink 等。开发者经常要用到不同的技术、框架、API、开发语言和 SDK 来应对复杂应用的开发,这大大增加了选择合适工具和框架的难度,开发者想要将所有的大数据组件熟练运用几乎是一项不可能完成的任务。
面对这种情况,Google 在 2016 年 2 月宣布将大数据流水线产品(Google DataFlow)贡献给 Apache 基金会孵化,2017 年 1 月 Apache 对外宣布开源 Apache Beam,2017 年 5 月迎来了它的第一个稳定版本 2.0.0。在国内,大部分开发者对于 Beam 还缺乏了解,社区中文资料也比较少。InfoQ 期望通过Apache Beam 实战指南系列文章推动 Apache Beam 在国内的普及。对大数据的概念都是模糊不清的,大数据是什么,能做什么,学的时候,该按照什么线路去学习,学完往哪方面发展,想深入了解,想学习的同学欢迎加入大数据学习qq群:458345782,有大量干货(零基础以及进阶的经典实战)分享给大家,并且有清华大学毕业的资深大数据讲师给大家免费授课,给大家分享目前国内最完整的大数据高端实战实用学习流程体系 。
一.概述
随着 2018 年 10 月 2 日欧洲 Beam 首届峰会结束后,Beam 的使用者越来越多,关注度越来越高。不光外国公司 Google、Spotify、亚马逊、Data Artisans 等用上了 Beam,TensorFlow 机器学习框架也跟 Beam 结合使用做机器学习的预处理工作,背靠谷歌巨头,Beam 不光在大数据一统上做强有力的部署,在云计算、大数据、机器学习、人工智能的集成和运用也越来越广泛。
Beam 在发布第一个版本后,不断完善模型和运行平台。SDKs 也添加了许多 IO,例如消息中间件又新增了 ActiveMQ 和 RabbitMQ ,缓存新增 Redis ,大数据分析神器 Kudu,大数据存储格式 Parquet 等等。Runner 新增了实时流处理 Samza 和 JStorm、MapReduce 和加速 Hadoop 查询 Tez,此外新增了 Beam 部署 Docker 的 DockerCommand 接口 ,以及 Metrics 监控的引入和集成。其他 SDK 和 Runner 也在不断更新中,Beam 每 6 周发布一个小版本,及时完善了一些一次性未集成完善的功能。
在科技日新月异的浪潮中,不管是人工智能的机器学习、还是 AI 的人脸识别、以及物联网的工业互联、互联网的深度挖掘等都必须有一定的数据积累,恰恰这些早期的数据很多公司都存到不同的数据库中,很多公司在早期没有其他大数据存储情况下,基本都存在 Hadoop 的 HDFS 中。对于 HDFS 这个被公认的大数据存储基石,Beam 是怎样简单的操作的呢?底层源码是怎样跟 Beam 结合使用的?我们今天就重点看一下。
二.Apache Beam 中 HdfsIO 源码剖析
由于 Beam 在发布稳定版本 2.0 之前的源码,Beam 操作 HdfsIO 都比较不稳定,并且 API 都比较 Low。在 2.0 版本之后 HdfsIO 的变化很大,2.0 版本之前命名为 HDFSFileSink 读写等操作,2.0 之后都是命名为 HadoopFileSystem 来操作 Hadoop 的 HDFS 。本文按照 Beam 2.4 版本源码进行剖析,2.4 之后的版本基本没有很大变化,直到最新的 2.9 版本才有一个小优化,2.4 版本的 HdfsIO 还是比较稳定的。
2.1 Hdfs 的配置类 Configuration 源码部分
HadoopFileSystem(Configuration configuration) throws IOException { this.fileSystem = org.apache.hadoop.fs.FileSystem.newInstance(configuration);
}
在源码中 HadoopFileSystem 把 Hadoop Hdfs 的配置类当参数,在构造函数外面配置好传参到内部。 Configuration 类其实有三个配置,一个是 HdfsConfiguration 类,另外是 map-reduce Job 任务和 YarnConfiguration 资源调度器用到的配置,今天我们主要看 HdfsConfiguration 类,因为 Map-Reduce 去年集成到 Beam 之后基本很少人使用。
在 HdfsConfiguration 类中支持很多配置,最主要的“fs.default.name’”是配置我们 Hadoop 集群。
2.2 HDFS 的读写都是基于 ByteBuffer 的
@Override
public int read(ByteBuffer dst) throws IOException {
if (closed) {
throw new IOException("Channel is closed");
}
// O length read must be supported
int read = 0;
// We avoid using the ByteBuffer based read for Hadoop because some FSDataInputStream