Pyspark 合并数据帧行,一个数组包含在另一个数组中

2023-12-30

我什至不知道表达这些问题的最佳标题是什么。

我有以下数据集

df = spark.createDataFrame([\
            (["1", "2","3","4"], ),\
            (["1","2","3"], ),\
            (["2","1","3"], ),\
            (["2","3","4","1"], ),\
            (["6","7"], )\
], ['cycle', ])
df.show()

+------------+
|       cycle|
+------------+
|[1, 2, 3, 4]|
|   [1, 2, 3]|
|   [2, 1, 3]|
|[2, 3, 4, 1]|
|      [6, 7]|
+------------+

我最后想要的是:

  1. 删除排列
  2. 仅保留包含所有其他集合的最大行的行

我可以用sort_array() and distinct()摆脱排列

df.select(f.sort_array("cycle").alias("cycle")).distinct().show() 
+------------+
|       cycle|
+------------+
|[1, 2, 3, 4]|
|      [6, 7]|
|   [1, 2, 3]|
+------------+

我想用 Pyspark 减少数据集是:

+------------+
|       cycle|
+------------+
|[1, 2, 3, 4]|
|      [6, 7]|
+------------+

所以以某种方式检查一下[1, 2, 3]是其一部分[1, 2, 3, 4]并且只保留 所以Python子集命令A.issubset(B)应用在Pyspark、Spark方式上一列

我目前能想到的唯一方法是对每一行进行可怕的迭代循环,这将杀死所有性能


您可以尝试的一种方法是首先找到所有cycles 至少有一个superset(排除自我)通过使用自加入找到d2.cycle满足以下条件:

  • size(数组除外 http://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.functions.array_except(d2.cycle, d1.cycle))==0: 中没有项目d2.cycle被排除在d1.cycle(空数组将满足)
  • size(d2.cycle) < size(d1.cycle): the size of d2.cycle小于size of d1.cycle:

然后采用 left_anti 连接从原始数据帧中排除上述列表,最后运行 sort_array 和 drop_duplicates(或distinct):

from pyspark.sql.functions import expr

df_sub = df.alias('d1').join(
      df.alias('d2')
    , expr('size(array_except(d2.cycle, d1.cycle))==0 AND size(d2.cycle) < size(d1.cycle)')
).select('d2.cycle').distinct()

df_sub.show()
#+---------+
#|    cycle|
#+---------+
#|[1, 2, 3]|
#|[2, 1, 3]|
#+---------+

df.join(df_sub , on=['cycle'], how='left_anti') \
  .withColumn('cycle', expr('sort_array(cycle)')) \
  .distinct() \
  .show()
#+------------+                                                                  
#|       cycle|
#+------------+
#|[1, 2, 3, 4]|
#|      [6, 7]|
#+------------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Pyspark 合并数据帧行,一个数组包含在另一个数组中 的相关文章

随机推荐