您可以使用UDF
在每一行上进行逐行计算和使用struct
将多列传递给 udf。希望这可以帮助。
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from operator import itemgetter
data = [(("ID1", 3, 5,78)), (("ID2", 4, 12,45)), (("ID3", 70, 3,67))]
df = spark.createDataFrame(data, ["ID", "colA", "colB","colC"])
df.show()
+---+----+----+----+
| ID|colA|colB|colC|
+---+----+----+----+
|ID1| 3| 5| 78|
|ID2| 4| 12| 45|
|ID3| 70| 3| 70|
+---+----+----+----+
cols = df.columns
# to get max of values in a row
maxcol = F.udf(lambda row: max(row), IntegerType())
maxDF = df.withColumn("maxval", maxcol(F.struct([df[x] for x in df.columns[1:]])))
maxDF.show()
+---+----+----+----+-------+
|ID |colA|colB|colC|Max_col|
+---+----+----+----+-------+
|ID1|3 |5 |78 |78 |
|ID2|4 |12 |45 |45 |
|ID3|70 |3 |67 |70 |
+---+----+----+----+-------+
# to get max of value & corresponding column name
schema=StructType([StructField('maxval',IntegerType()),StructField('maxval_colname',StringType())])
maxcol = F.udf(lambda row: max(row,key=itemgetter(0)), schema)
maxDF = df.withColumn('maxfield', maxcol(F.struct([F.struct(df[x],F.lit(x)) for x in df.columns[1:]]))).\
select(df.columns+['maxfield.maxval','maxfield.maxval_colname'])
+---+----+----+----+------+--------------+
| ID|colA|colB|colC|maxval|maxval_colname|
+---+----+----+----+------+--------------+
|ID1| 3 | 5 | 78 | 78 | colC |
|ID2| 4 | 12 | 45 | 45 | colC |
|ID3| 70 | 3 | 67 | 68 | colA |
+---+----+----+----+------+--------------+