一种方法是使用pyspark.sql.Window http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.Window添加一列来计算每行的重复项数("ID", "ID2", "Number")
组合。然后仅选择重复项数大于 1 的行。
import pyspark.sql.functions as f
from pyspark.sql import Window
w = Window.partitionBy('ID', 'ID2', 'Number')
df.select('*', f.count('ID').over(w).alias('dupeCount'))\
.where('dupeCount > 1')\
.drop('dupeCount')\
.show()
#+---+---+------+----+------------+------------+
#| ID|ID2|Number|Name|Opening_Hour|Closing_Hour|
#+---+---+------+----+------------+------------+
#|ALT|QWA| 2|null| 08:54:00| 23:25:00|
#|ALT|QWA| 2|null| 08:53:00| 23:24:00|
#|ALT|QWA| 6|null| 08:59:00| 23:30:00|
#|ALT|QWA| 6|null| 08:55:00| 23:26:00|
#+---+---+------+----+------------+------------+
I used pyspark.sql.functions.count() http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.functions.count计算每组中的项目数。这将返回一个包含所有重复项的 DataFrame(您显示的第二个输出)。
如果您只想获得一行("ID", "ID2", "Number")
组合,您可以使用另一个窗口来对行进行排序。
例如,下面我添加另一列row_number http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.functions.row_number并仅选择重复计数大于 1 且行号等于 1 的行。这保证每个分组一行。
w2 = Window.partitionBy('ID', 'ID2', 'Number').orderBy('ID', 'ID2', 'Number')
df.select(
'*',
f.count('ID').over(w).alias('dupeCount'),
f.row_number().over(w2).alias('rowNum')
)\
.where('(dupeCount > 1) AND (rowNum = 1)')\
.drop('dupeCount', 'rowNum')\
.show()
#+---+---+------+----+------------+------------+
#| ID|ID2|Number|Name|Opening_Hour|Closing_Hour|
#+---+---+------+----+------------+------------+
#|ALT|QWA| 2|null| 08:54:00| 23:25:00|
#|ALT|QWA| 6|null| 08:59:00| 23:30:00|
#+---+---+------+----+------------+------------+