分别处理spark中的多个目录

2024-04-25

我在 HDFS 中有一个目录列表,每个目录包含多个文件。我的目标是将一个目录中的所有文件合并为一个文件,但每个目录分别合并。在 Spark 中执行此操作最快的方法是什么?顺序迭代所有目录太慢。所以我想并行进行。一种解决方案可能是使用线程池。也许有更好、更快、更原生的?

Thanks!


考虑以下测试目录foo and bar包含以下文件:

cat /tmp/foo/0.csv
4
cat /tmp/foo/1.csv
3
cat /tmp/bar/0.csv
7

我们可以使用以下代码片段来读取它们:

val df = spark.read.csv("/tmp/foo", "/tmp/bar")
  .withColumn("dir", regexp_extract(input_file_name(), """([^/]*)/[^/]+\.csv$""", 1))
df.show()
/*
+---+---+
|_c0|dir|
+---+---+
|4  |foo|
|3  |foo|
|7  |bar|
+---+---+
*/

功能input_file_name给出文件的绝对路径,因此我们可以使用它来获取目录。功能regexp_extract仅用于转换,例如/tmp/foo/1.csv -> foo.

Spark 写入文件时,每个分区输出一个文件。因此,我们需要按列重新分区dir合并每个目录下的所有文件。最后,我们可以使用partitionBy也将目录名称获取到输出文件结构。例如

df.repartition($"dir")
  .write
  .partitionBy("dir")
  .csv("/tmp/out")

会产生文件

/tmp/out/._SUCCESS.crc
/tmp/out/dir=bar/.part-00067-d780b550-785f-416c-b090-8d93694ba65c.c000.csv.crc
/tmp/out/dir=bar/part-00067-d780b550-785f-416c-b090-8d93694ba65c.c000.csv
/tmp/out/_SUCCESS
/tmp/out/dir=foo/part-00110-d780b550-785f-416c-b090-8d93694ba65c.c000.csv
/tmp/out/dir=foo/.part-00110-d780b550-785f-416c-b090-8d93694ba65c.c000.csv.crc

where /tmp/out/dir=bar/part-00067-d780b550-785f-416c-b090-8d93694ba65c.c000.csv包含

7

and /tmp/out/dir=foo/part-00110-d780b550-785f-416c-b090-8d93694ba65c.c000.csv包含

4
3

AFAIK 如果没有例如,不可能将这些输出文件写入与原始输入相同的目录结构。拥有定制的 HadoopFileSystem类等

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

分别处理spark中的多个目录 的相关文章

随机推荐