选择 PySpark 中每行最大值的列名

2024-03-10

我有一个像这样的数据框,仅显示两列,但是原始数据框中有很多列

data = [(("ID1", 3, 5)), (("ID2", 4, 12)), (("ID3", 8, 3))]
df = spark.createDataFrame(data, ["ID", "colA", "colB"])
df.show()

+---+----+----+
| ID|colA|colB|
+---+----+----+
|ID1|   3|   5|
|ID2|   4|  12|
|ID3|   8|   3|
+---+----+----+

我想提取每行具有最大值的列的名称。因此预期的输出是这样的

+---+----+----+-------+
| ID|colA|colB|Max_col|
+---+----+----+-------+
|ID1|   3|   5|   colB|
|ID2|   4|  12|   colB|
|ID3|   8|   3|   colA|
+---+----+----+-------+

如果出现平局,其中 colA 和 colB 具有相同的值,请选择第一列。

我怎样才能在 pyspark 中实现这一目标


您可以使用UDF在每一行上进行逐行计算和使用struct将多列传递给 udf。希望这可以帮助。

from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from operator import itemgetter

data = [(("ID1", 3, 5,78)), (("ID2", 4, 12,45)), (("ID3", 70, 3,67))]
df = spark.createDataFrame(data, ["ID", "colA", "colB","colC"])
df.show()

+---+----+----+----+
| ID|colA|colB|colC|
+---+----+----+----+
|ID1|   3|   5|  78|
|ID2|   4|  12|  45|
|ID3|  70|   3|  70|
+---+----+----+----+
cols = df.columns

# to get max of values in a row
maxcol = F.udf(lambda row: max(row), IntegerType())
maxDF = df.withColumn("maxval", maxcol(F.struct([df[x] for x in df.columns[1:]])))
maxDF.show()

+---+----+----+----+-------+
|ID |colA|colB|colC|Max_col|
+---+----+----+----+-------+
|ID1|3   |5   |78  |78     |
|ID2|4   |12  |45  |45     |
|ID3|70  |3   |67  |70     |
+---+----+----+----+-------+

# to get max of value & corresponding column name

schema=StructType([StructField('maxval',IntegerType()),StructField('maxval_colname',StringType())])

maxcol = F.udf(lambda row: max(row,key=itemgetter(0)), schema)
maxDF = df.withColumn('maxfield', maxcol(F.struct([F.struct(df[x],F.lit(x)) for x in df.columns[1:]]))).\
select(df.columns+['maxfield.maxval','maxfield.maxval_colname'])

+---+----+----+----+------+--------------+
| ID|colA|colB|colC|maxval|maxval_colname|
+---+----+----+----+------+--------------+
|ID1| 3  | 5  | 78 | 78   | colC         |
|ID2| 4  | 12 | 45 | 45   | colC         |
|ID3| 70 | 3  | 67 | 68   | colA         |
+---+----+----+----+------+--------------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

选择 PySpark 中每行最大值的列名 的相关文章

随机推荐

  • Safari 中的页面转换效果?

    如何为网页添加像Safari中IE一样的页面切换效果 你可以看看这个例子 http sachiniscool blogspot com 2006 01 implementing page transitions in html http s
  • 帮我评估一下这个选角

    我在 PowerVR 网格绘图代码中找到了这个 但我真的不知道如何阅读它 unsigned short 0 3 mesh sBoneBatches pnBatchOffset batchNum 这里发生了什么 这是对 void 的引用 强制
  • Rails 3 简单形式错误:无法解析 YAML

    我的new html erb h1 New konkurrancer h1 我在视图中收到此错误 http localhost 3000 admin konkurrancers new Psych SyntaxError in Admin
  • 为什么我升级版本 django-mptt 后出现数据库迁移错误?

    我的 Django 应用程序有一个requirements txt 文件 如图所示here https gist github com saqib zmi b0168e18ee4a0a7ee2f6 我用它在虚拟环境中安装模块 一切正常 但是
  • 通过 Java 启动 VLC 播放器

    我想通过 Java 程序启动我的 VLC 播放器 有人可以帮助我吗 提前致谢 使用 VLCJ 这是新链接 http caprica github io vlcj
  • jQuery AJAX 调用获取 Web api 数据返回语法错误

    我正在尝试获取这个json questions tagged json来自此 url 的对象 https test3 diavgeia gov gr luminapi opendata dictionaries KANONISTIKI PR
  • 使用 PHP 最简单的双向加密

    在常见 PHP 安装中进行双向加密的最简单方法是什么 我需要能够使用字符串密钥加密数据 并使用相同的密钥在另一端解密 安全性并不像代码的可移植性那么重要 因此我希望能够使事情尽可能简单 目前 我正在使用 RC4 实现 但如果我能找到本机支持
  • 通过工作流基础4.0中的代码注册自定义跟踪参与者

    我在尝试在工作流基础 4 0 中附加自定义跟踪参与者时遇到问题 我有一个继承自 TrackingParticipant 的类 但除了通过大量混乱的 app config 条目 如下面的 SDK 示例 在 system servicemode
  • Mnesia 返回 {aborted, no_transaction}

    我有一个名为 Mnesia 的表person 使用以下记录定义 record person id firstname lastname phone 该表包含以下值 12 alen dumas 97888888 13 franco mocci
  • 是否可以在没有 LINQ 的情况下使用实体框架?

    是否可以在没有 LINQ 实体的 linq 的情况下使用实体框架 目前尚不清楚分别使用 Linq to Entities 和 EF 的含义 这是一个单独的库实体框架 dll 如果你想在普通 SQL 上编写查询 你可以使用SQL查询 http
  • 将渲染的 pdf 文件保存到模型字段 Django

    我现在正在尝试将使用 HTML 呈现的 pdf 文件保存到模型字段 它会抛出此错误 强制转换为 Unicode 需要字符串或缓冲区 已找到实例 这是代码 def save to pdf template src context dict p
  • Bash:在许多文件上并行化 md5sum 校验和

    假设我有一个 64 核服务器 我需要计算md5sum中所有文件的 mnt data 并将结果存储在文本文件中 find mnt data type f exec md5sum gt md5 txt 上述命令的问题是 在任何给定时间只有一个进
  • AWS Application Load Balancer 是否始终终止 HTTPS 连接(或者是否可配置)?

    我们使用应用程序负载均衡器 后面有一个 nginx 服务器 我们的客户要求我们实施mTLS https en wikipedia org wiki Mutual authentication但我认为如果 ALB终止 TLS 连接 https
  • 具有正确 x 轴格式的分钟刻度数据图?

    我想以分钟为单位绘制刻度数据 我的数据框如下所示 gt head df No Date Time Close Volume Weekday 1 3361 03 12 2012 08 00 00 000 7 435 27000000 Mont
  • 如何控制下拉选择菜单的位置?

    我有一个下拉选择菜单 我想知道是否有办法控制选项展开的方式 默认值似乎是任意一种 取决于列表中有多少项 我想防止下拉选项显示在其他表单字段上方 当您从列表中选择 国家 地区 时 菜单会向下展开 我想要的位置 但是当您从 州 地区 字段中选择
  • 使用 Google 自定义搜索 API 搜索多种文件类型

    我需要获取特定文件类型的 Google 搜索结果 例如 在浏览器中我会直接谷歌搜索 超级循环 文件类型 pdf 它将列出 Hyperloop 的 PDF 文件 为此 我的 Google 自定义搜索请求 URI 将是 但是 目前我想获取文件类
  • C、运行时测试 PATH 中是否存在可执行文件

    我目前正在用 C 语言编写一个应用程序 目标是 BSD 和 Linux 系统 希望能够普遍移植 该程序具有运行时依赖项 在本例中为 mplayer 就目前情况而言 我正在使用execlp 启动 mplayer 我正在检查 execlp 调用
  • 在 JavaScript 中获取平台特定的换行符?

    几年前 我为我的一个 Firefox 插件编写了以下函数 它可以帮助我获取特定于平台的换行符 GetNewLine function var platform navigator platform toLowerCase if platfo
  • x86 汇编中断服务程序可以调用另一个中断吗?

    我可以在独立 x686 环境中从中断服务程序中调用中断吗 那么可以执行以下操作 isr pusha call doSomething int 21h popa iret 如果可能的话 那么这些嵌套中断是否有任何重大的塌陷 虽然处理器对嵌套中
  • 选择 PySpark 中每行最大值的列名

    我有一个像这样的数据框 仅显示两列 但是原始数据框中有很多列 data ID1 3 5 ID2 4 12 ID3 8 3 df spark createDataFrame data ID colA colB df show ID colA