如何在 PySpark 数据框中创建动态组?

2024-04-27

虽然问题是基于连续行的两个或多个列的值创建多个组,但我只是通过这种方式简化问题。假设有这样的 pyspark 数据框

>>> df=sqlContext.createDataFrame([
... Row(SN=1,age=45, gender='M', name='Bob'),
... Row(SN=2,age=28, gender='M', name='Albert'),
... Row(SN=3,age=33, gender='F', name='Laura'),
... Row(SN=4,age=43, gender='F', name='Gloria'),
... Row(SN=5,age=18, gender='T', name='Simone'),
... Row(SN=6,age=45, gender='M', name='Alax'),
... Row(SN=7,age=28, gender='M', name='Robert')])
>>> df.show()

+---+---+------+------+
| SN|age|gender|  name|
+---+---+------+------+
|  1| 45|     M|   Bob|
|  2| 28|     M|Albert|
|  3| 33|     F| Laura|
|  4| 43|     F|Gloria|
|  5| 18|     T|Simone|
|  6| 45|     M|  Alax|
|  7| 28|     M|Robert|
+---+---+------+------+

现在我想添加“部分”列,如果连续行中的性别值匹配,并且下一行部分值中的性别更改增加,则该列将具有相同的值。所以准确地说,我想要这样的输出

+---+---+------+------+-------+
| SN|age|gender|  name|section|
+---+---+------+------+-------+
|  1| 45|     M|   Bob|      1|
|  2| 28|     M|Albert|      1|
|  3| 33|     F| Laura|      2|
|  4| 43|     F|Gloria|      2|
|  5| 18|     T|Simone|      3|
|  6| 45|     M|  Alax|      4|
|  7| 28|     M|Robert|      4|
+---+---+------+------+-------+

不清楚您是在寻找 Python 还是 Scala 解决方案,但它们非常相似 - 所以这里有一个 Scala 解决方案,使用窗口函数:

import spark.implicits._
import functions._

// we'll use this window to attach the "previous" gender to each record
val globalWindow = Window.orderBy("SN")

// we'll use this window to compute "cumulative sum" of 
// an indicator column that would be 1 only if gender changed
val upToThisRowWindow = globalWindow.rowsBetween(Long.MinValue, 0)

val result = df
  .withColumn("prevGender", lag("gender", 1) over globalWindow) // add previous record's gender
  .withColumn("shouldIncrease", when($"prevGender" =!= $"gender", 1) otherwise 0) // translate to 1 or 0
  .withColumn("section", (sum("shouldIncrease") over upToThisRowWindow) + lit(1)) // cumulative sum
  .drop("prevGender", "shouldIncrease") // drop helper columns

result.show()
// +---+---+------+------+-------+
// | SN|age|gender|  name|section|
// +---+---+------+------+-------+
// |  1| 45|     M|   Bob|      1|
// |  2| 28|     M|Albert|      1|
// |  3| 33|     F| Laura|      2|
// |  4| 43|     F|Gloria|      2|
// |  5| 18|     T|Simone|      3|
// |  6| 45|     M|  Alax|      4|
// |  7| 28|     M|Robert|      4|
// +---+---+------+------+-------+

以下是等效的pyspark code

from pyspark.sql import Window as W
import sys
globalWindow = W.orderBy("SN")
upToThisRowWindow = globalWindow.rowsBetween(-sys.maxsize-1, 0)
from pyspark.sql import functions as F
df.withColumn("section", F.sum(F.when(F.lag("gender", 1).over(globalWindow) != df.gender, 1).otherwise(0)).over(upToThisRowWindow)+1).show()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 PySpark 数据框中创建动态组? 的相关文章

随机推荐