如何在 pyspark 中有效地将大型 .tsv 文件上传到具有拆分列的 Hive 表?

2023-12-25

我有一个大型(约 1000 万行).tsv 文件,其中包含两列:“id”和“group”。 “Group”列实际上是某个 id 所属的所有组的列表,因此该文件如下所示:

id1     group1,group2
id2     group2,group3,group4
id3     group1
...

我需要使用 pyspark 将其上传到 Hive 表,但是我想拆分组列,以便一行中只有一个组,因此生成的表如下所示:

id1    group1
id1    group2
id2    group2
id2    group3
id2    group4
id3    group1

我尝试过逐行读取行,然后使用 python split() 来分割列,然后为每一行创建 Spark 数据帧并将其与每次迭代合并。我的代码可以工作,但是效率极低,因为处理 1000 行需要 2 分钟。我的代码如下:

fields = [StructField('user_id', StringType(), True),StructField('group_id', StringType(), True)] 
membership_schema = StructType(fields) 

result_df = sqlContext.createDataFrame(sc.emptyRDD(), membership_schema)

with open('file.tsv','r') as f:
    for line in f:
        parts = line.split()
        id_part = parts[0]
        audience_parts = parts[1].split(',')
        for item in audience_parts:
            newRow = sqlContext.createDataFrame([(id_part,item)], membership_schema)
            result_df = result_df.union(newRow)
df_writer = DataFrameWriter(result_df)
df_writer.insertInto("my_table_in_hive")

是否有一种更简单、更有效的方法将整个文件上传到表中,而无需迭代各行?

感谢帮助。


我查看了上面代码的计划,似乎它扫描了很多,而且也没有为您提供与 Spark 的并行性。 您可以使用spark本机方法将文件数据读入更多分区并控制它们在分区之间均匀分布数据。

df = sc.textFile(file_path,10).map(lambda x: x.split()).map(lambda x :(x[0],x[1].split(","))).toDF(['id','group'])
from pyspark.sql.functions import explode
newdf = df.withColumn("group", explode(df.group))

newdf.write.format("orc").option("header", "true").mode("overwrite").saveAsTable('db.yourHivetable')

此外,您可以增加或减少进入爆炸的分区的大小或控制随机播放分区。

spark.conf.set("spark.sql.files.maxPartitionBytes","30")
spark.conf.set("spark.sql.shuffle.partitions", "100")
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 pyspark 中有效地将大型 .tsv 文件上传到具有拆分列的 Hive 表? 的相关文章

随机推荐