Spark:相当于数据帧中的 zipwithindex

2024-04-23

假设我有以下数据框:

dummy_data = [('a',1),('b',25),('c',3),('d',8),('e',1)]
df = sc.parallelize(dummy_data).toDF(['letter','number'])

我想创建以下数据框:

[('a',0),('b',2),('c',1),('d',3),('e',0)]

我所做的是将其转换为rdd并使用zipWithIndex函数和连接后的结果:

convertDF = (df.select('number')
              .distinct()
              .rdd
              .zipWithIndex()
              .map(lambda x:(x[0].number,x[1]))
              .toDF(['old','new']))


finalDF = (df
            .join(convertDF,df.number == convertDF.old)
            .select(df.letter,convertDF.new))

是否有类似的功能zipWIthIndex在数据框中?还有其他更有效的方法来完成这项任务吗?


请检查https://issues.apache.org/jira/browse/SPARK-23074 https://issues.apache.org/jira/browse/SPARK-23074对于数据帧中的这种直接功能奇偶校验.. 如果您有兴趣在 Spark 中的某个时候看到这一点,请为 jira 投票。

这是 PySpark 中的解决方法:

def dfZipWithIndex (df, offset=1, colName="rowId"):
    '''
        Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe 
        and preserves a schema

        :param df: source dataframe
        :param offset: adjustment to zipWithIndex()'s index
        :param colName: name of the index column
    '''

    new_schema = StructType(
                    [StructField(colName,LongType(),True)]        # new added field in front
                    + df.schema.fields                            # previous schema
                )

    zipped_rdd = df.rdd.zipWithIndex()

    new_rdd = zipped_rdd.map(lambda args: ([args[1] + offset] + list(args[0])))

    return spark.createDataFrame(new_rdd, new_schema)

这也可以在abalon https://github.com/Tagar/abalon/blob/00e14d9711e65d244508182dd7c2a9c5be7ca2c7/abalon/spark/sparkutils.py#L115包裹。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark:相当于数据帧中的 zipwithindex 的相关文章

随机推荐

  • 在 GAME 上使用 Spring 3 MVC + Maven 2 找不到映射

    我在尝试将 Spring MVC 和 Maven 集成到 Google AppEngine Web 应用程序中时遇到了一个奇怪的问题 这是我的 web xml
  • 是否可以在不安装的情况下使用 MSBuild 扩展包?

    有没有一种方法可以将 MSBuild 扩展包与 本地 引用一起使用 而不需要运行安装程序 换句话说 您能否将目标存储在解决方案项目文件夹中 以便每个开发人员都不必安装它 你必须申报财产 扩展任务路径 在任务的导入语句之前 例如看一下
  • 如何避免 WCF 中的大量通信类?

    我的理解是 所有合约实现代码都必须位于单个类中 显然 该类可能会变得非常大 我该如何避免这种情况 我真的更喜欢让几个小班来完成与客户沟通的一部分 而不是一个庞大的班级 我能想到的唯一想法是使用由单个类分割实现的多个接口partial 但我认
  • 使用 Django 会话存储登录用户

    我正在创建一个以 REST 为中心的应用程序 它将针对大多数特定于域的模型使用某种 NoSQL 数据存储 对于我打算围绕 REST 数据框架构建的主站点 我仍然希望对用户 计费信息和域数据模型范围之外的其他元数据使用传统的关系数据库 有人告
  • 我可以从带时间戳的图像创建 VFR 视频吗?

    首先 我对图像制作视频的经验几乎为零 我拥有的是一组带有 BMP 时间戳的图像 我想从中生成视频 由于时间戳的间隔不相等 我不能简单地使用从图像创建恒定帧速率视频的软件 一个可能的解决方案是在固定的时间间隔创建人造图像 但如果我无法制作 V
  • 我必须在哪里以及为什么必须放置“template”和“typename”关键字?

    在模板中 我必须在哪里以及为什么要放置typename and template关于从属名称 到底什么是从属名称 我有以下代码 template
  • 如何在没有 Eclipse 的情况下构建 apk 或使用配置文件修改 apk 构建?

    我想从 xml 文件构建一个具有一些大型配置的 apk 我想知道是否有任何方法可以控制 apk 的构建过程 或者是否有任何方法可以根据我们的配置 xml 文件对我们的源进行一些修改 从我们的源创建 apk 或者任何其他方式来构建apk文件
  • 无法向 Kafka 发送大消息

    我想从生产者向 Kafka 发送一条大消息 因此我更改了以下属性 代理 服务器 属性 replica fetch max bytes 317344026 message max bytes 317344026 max message byt
  • PHP return_var 代码?

    我正在测试 php exec 命令 http php net exec http php net exec 我得到的结果代码是 127 我的 PHP 代码是
  • 列表作为 python 类的成员,为什么它的内容在该类的所有实例之间共享?

    我定义了一个类Listener并创建了一本字典Listener对象 每个听众都有一个id来识别它们 以及一个列表artists他们听 artists 添加一些东西到artists列表将其添加到的所有实例Listener类 而不是引用的实例
  • 将数据库导入 MATLAB 错误

    我正在尝试将表导入到我的 MATLAB 工作区 但它一直向我抛出错误 Undefined function or method fetch for input arguments of type struct 这是我尝试执行的代码 dyn
  • 递归使用 scp 但排除某些文件夹

    假设有一些具有这些结构的文件夹 bench1 1cpu p 0 image bench1 1cpu p 0 fl 1 bench1 1cpu p 0 fl 1 bench1 1cpu p 0 fl 1 bench1 1cpu p 0 fl
  • 如何使用 PJSIP 为 iOS 客户端应用程序捕获并翻译其他语言的传入音频流?

    我想在基于 VoIP 的 iOS 客户端应用程序中集成语言转换器 该应用程序将根据用户选择将实时传入音频流翻译为其他选定的语言 我正在使用 PjSip 开源库来支持 VoIP 呼叫 对于语言翻译 我想使用语音到文本和文本到语音开源库 现在我
  • Windows 上的 PHP mail():没有错误,电子邮件未发送

    我目前正在尝试调试一个基于 Elgg 的网站 我没有开发它 我想直接从本地开发计算机 WinXP 发送电子邮件 我正在使用 Apache 2 2 11 和 PHP 5 3 0 运行 WAMP 经过一番搜索后 我遇到的最简单的解决方案是使用假
  • 分阶段加载 spring 上下文

    这是人们会问的奇怪问题之一 为什么 所以我将从我为什么要这样做开始 然后讨论这个问题 我想更好地控制 spring 上下文的加载方式 例如 我不想同时加载域和web api 这将使资源在其依赖项准备好之前可用 也可能我需要检查某些东西的状态
  • 禁用对特定主机的警报,同时对所有其他主机发出警报

    我有数百台主机向普罗米修斯服务器报告 我的每个主机有很多出口商 我希望能够列出我不希望收到警报的主机列表 我仍然需要对这些主机进行普罗米修斯监控 我尝试过匹配没有接收器的路线 这不起作用 我究竟做错了什么 或者说 我应该怎么做 我的路线规则
  • 为什么 pgAdmin 4 这么慢?

    postgreSQL 的 pgAdmin 4 GUI 非常慢 即使扩展服务器树或数据库树也需要花费太多时间 它们各自花费了近 30 秒的时间来展开 创建新数据库或表时它也会挂起 即使加载后 创建和保存新数据库也需要一分多钟的时间 几乎每次我
  • 回形针未保存,没有错误

    我被绊倒了 浏览了文档 教程等 但不确定我做错了什么 项目中的另一个模型是为 Paperclip 设置的 并且在测试时可以正常工作 它将附件文件信息保存和检索到数据库中 并将文件放入 public system 内的子文件夹中 我基本上将相
  • jQuery 剪贴板复制

    我需要剪贴板复制功能 即使我正在寻求帮助http plugins jquery com project copy http plugins jquery com project copy链接 但无法正常工作 li li
  • Spark:相当于数据帧中的 zipwithindex

    假设我有以下数据框 dummy data a 1 b 25 c 3 d 8 e 1 df sc parallelize dummy data toDF letter number 我想创建以下数据框 a 0 b 2 c 1 d 3 e 0