我们有一个基于 Hadoop 的解决方案 (CDH 5.15),我们可以在 HDFS 的某些目录中获取新文件。在这些目录的顶部,我们有 4-5 个 Impala (2.1) 表。在 HDFS 中写入这些文件的过程是 Spark Structured Streaming (2.3.1)
现在,一旦我们将文件写入 HDFS,我们就会运行一些 DDL 查询:
ALTER TABLE table1 RECOVER PARTITONS
检测添加到表中的新分区(及其 HDFS 目录和文件)。
REFRESH table1 PARTITIONS (partition1=X, partition2=Y)
,使用每个分区的所有键。
目前,此 DDL 花费的时间有点太长,并且它们在我们的系统中排队,从而损害了系统的数据可用性。
所以,我的问题是:有没有一种方法可以更有效地进行数据合并?
我们考虑过:
使用ALTER TABLE .. RECOVER PARTITONS
但根据文档 https://docs.cloudera.com/documentation/enterprise/5-15-x/topics/impala_alter_table.html,它只刷新新分区。
尝试使用REFRESH .. PARTITON ...
一次具有多个分区,但语句语法不允许这样做。
尝试对查询进行批处理,但 Hive JDBC 驱动器不支持批处理查询。
鉴于系统已经很忙,我们是否应该尝试并行进行这些更新?
- 您还知道其他什么方式吗?
Thanks!
Victor
注意:我们知道哪些分区需要刷新的方法是使用 HDFS 事件,就像 Spark 结构化流一样,我们不知道文件何时写入。
注意#2:此外,HDFS 中写入的文件有时很小,因此如果能够同时合并这些文件那就太好了。
由于似乎没有人能解决我的问题,我想分享我们为提高处理效率而采取的方法,非常欢迎提出意见。
我们发现(文档对此不是很清楚)HDFS 中的 Spark“检查点”中存储的一些信息是许多元数据文件,描述每个 Parquet 文件的写入时间及其大小:
$hdfs dfs -ls -h hdfs://...../my_spark_job/_spark_metadata
w-r--r-- 3 hdfs 68K 2020-02-26 20:49 hdfs://...../my_spark_job/_spark_metadata/3248
rw-r--r-- 3 hdfs 33.3M 2020-02-26 20:53 hdfs://...../my_spark_job/_spark_metadata/3249.compact
w-r--r-- 3 hdfs 68K 2020-02-26 20:54 hdfs://...../my_spark_job/_spark_metadata/3250
...
$hdfs dfs -cat hdfs://...../my_spark_job/_spark_metadata/3250
v1
{"path":"hdfs://.../my_spark_job/../part-00004.c000.snappy.parquet","size":9866555,"isDir":false,"modificationTime":1582750862638,"blockReplication":3,"blockSize":134217728,"action":"add"}
{"path":"hdfs://.../my_spark_job/../part-00004.c001.snappy.parquet","size":526513,"isDir":false,"modificationTime":1582750862834,"blockReplication":3,"blockSize":134217728,"action":"add"}
...
所以,我们所做的是:
- Build a Spark Streaming Job polling that
_spark_metadata
folder.
- 我们使用一个
fileStream
因为它允许我们定义要使用的文件过滤器。
- 该流中的每个条目都是这些 JSON 行之一,解析该行以提取文件路径和大小。
- 按文件所属的父文件夹(映射到每个 Impala 分区)对文件进行分组。
- For each folder:
- 读取数据帧加载only目标 Parquet 文件(以避免与写入文件的其他作业发生竞争情况)
- 计算要写入的块数(使用 JSON 中的大小字段和目标块大小)
- 将数据帧合并到所需数量的分区并将其写回 HDFS
- 执行DDL
REFRESH TABLE myTable PARTITION ([partition keys derived from the new folder]
- 最后删除源文件
我们取得的成果是:
通过对每个分区和批次进行一次刷新来限制 DDL。
通过可配置批处理时间和块大小,我们能够使我们的产品适应具有更大或更小数据集的不同部署场景。
该解决方案非常灵活,因为我们可以为 Spark Streaming 作业分配更多或更少的资源(执行程序、核心、内存等),并且我们还可以启动/停止它(使用其自己的检查点系统)。
我们还在研究在执行此过程时应用一些数据重新分区的可能性,以使分区尽可能接近最佳大小。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)