Pyspark:将 tar.gz 文件加载到数据框中并按文件名过滤

2023-12-02

我有一个包含多个文件的 tar.gz 文件。层次结构如下所示。我的目的是读取tar.gz文件,过滤掉其中的内容b.tsv因为它是静态元数据,其中所有其他文件都是实际记录。

gzfile.tar.gz
|- a.tsv
|- b.tsv
|- thousand more files.

通过 pyspark load,我可以将文件加载到数据帧中。我使用了命令:

spark = SparkSession.\
        builder.\
        appName("Loading Gzip Files").\
        getOrCreate()
input = spark.read.load('/Users/jeevs/git/data/gzfile.tar.gz',\
          format='com.databricks.spark.csv',\
          sep = '\t'

为了过滤,我添加了文件名

from  pyspark.sql.functions import input_file_name
input.withColumn("filename", input_file_name())

现在生成的数据如下:

|_c0 |_c1 |filename |
|b.tsv0000666000076500001440035235677713575350214013124 0ustar  netsaintusers1|Lynx 2.7.1|file:///Users/jeevs/git/data/gzfile.tar.gz|
|2|Lynx 2.7|file:///Users/jeevs/git/data/gzfile.tar.gz|

当然,文件字段填充有 tar.gz 文件,使得该方法毫无用处。 更令人恼火的问题是,_c0 正在填充filename+garbage+first row values

此时,我想知道读取的文件本身是否变得奇怪,因为它是 tar.gz 文件。当我们执行此处理的 v1 时(spark 0.9),我们还有另一个步骤,将数据从 s3 加载到 ec2 框中,提取并写回 s3。我正在努力摆脱这些步骤。

提前致谢!


数据块不支持直接*.tar.gz迭代。为了处理文件,必须将它们解压缩到临时位置。数据块支持bash比能完成这项工作。

%sh find $source -name *.tar.gz -exec tar -xvzf {} -C $destination \;

上面的代码将解压所有带有扩展名的文件*.tar.gz在源位置到目标位置。 如果路径是通过dbutils.widgets或静态中%scala or %pyspark,路径必须声明为环境变量。 这可以实现在%pyspark

import os
os.environ[' source '] = '/dbfs/mnt/dl/raw/source/'

使用以下方法加载文件,假设内容在*.csv file:

DF = spark.read.format('csv').options(header='true', inferSchema='true').option("mode","DROPMALFORMED").load('/mnt/dl/raw/source/sample.csv')
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Pyspark:将 tar.gz 文件加载到数据框中并按文件名过滤 的相关文章

随机推荐