pySpark将mapPartitions的结果转换为spark DataFrame

2024-02-25

我有一项工作需要在分区的 Spark 数据帧上运行,该过程如下所示:

rdd = sp_df.repartition(n_partitions, partition_key).rdd.mapPartitions(lambda x: some_function(x))

结果是rdd of pandas.dataframe,

type(rdd) => pyspark.rdd.PipelinedRDD
type(rdd.collect()[0]) => pandas.core.frame.DataFrame

and rdd.glom().collect()返回结果如下:

[[df1], [df2], ...]

现在我希望将结果转换为spark dataframe,我所做的方式是:

sp = None
for i, partition in enumerate(rdd.collect()):
    if i == 0:
        sp = spark.createDataFrame(partition)
    else:
        sp = sp.union(spark.createDataFrame(partition))

return sp

然而,结果可能是巨大的rdd.collect()可能超出驱动程序的内存,所以我需要避免collect()手术。有办法解决这个问题吗?

提前致谢!


如果你想继续使用 rdd api。mapPartitions接受一种类型的迭代器并期望另一种类型的迭代器作为结果。 pandas_df 不是迭代器类型mapPartitions可以直接处理。如果你必须使用 pandas api,你可以从创建一个合适的生成器pandas.iterrows

这样你的整体mapPartitions结果将是行类型的单个 rdd,而不是 pandas 数据帧的 rdd。这样的 rdd 可以通过动态模式发现无缝转换为数据帧

from pyspark.sql import Row

def some_fuction(iter):
  pandas_df = some_pandas_result(iter)
  for index, row in pandas_df.iterrows():
     yield Row(id=index, foo=row['foo'], bar=row['bar'])


rdd = sp_df.repartition(n_partitions, partition_key).rdd.mapPartitions(lambda x: some_function(x))
df = spark.createDataFrame(rdd)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

pySpark将mapPartitions的结果转换为spark DataFrame 的相关文章

随机推荐

  • 在现有播放器上启用 YouTube API

    我有一个嵌入式 YouTube 视频 我希望应用 YouTube API 我使用 jQuery 添加 url 参数 如下所示 demo http jsfiddle net VVEY9 document ready function var
  • java字符串日期转换

    我想在存储字符串之前将其转换为日期 并且我使用了 SimpleDateFormat format new SimpleDateFormat yyyy mm dd Date returnDate format parse date 当我使用样
  • 在 MS SQL Server 2008 中创建序列

    我编写了一个程序 可以在其中请求身份证 有不同类型的身份证 红 蓝 绿 当提出请求时 程序应该生成标识号 数字 数字范围 取决于所请求的卡 Red Card 1 50000 Blue Card 50001 100000 Green Card
  • 为什么 VS Code 在 java 文件中显示 System.out.println() 的这些标签或参数名称?

    我已经安装了java扩展包 这件事从今天才开始发生 不确定这是由于某些 json 设置还是其他原因造成的 See 诸如 s x 和 参数名称之类的随机内容出现在我的打印语句中 https github com redhat develope
  • 如何强制执行不同的线程

    我有一个主线程执行一些 CPU 密集型操作 线程必须为其所有计算持有锁 还有一些其他线程偶尔需要在短时间内使用相同的锁 如果没有其他线程 如何强制主线程偶尔允许其他线程执行而不减慢速度 周期性的 lock release time slee
  • Linq Boolean 返回异常 DROPDOWNLIST 有一个无效的 SelectedValue,因为它不存在于项目列表中

    我有一个绑定到 linq 数据源的下拉列表 此下拉列表显示所有弓箭手编号 除了那些在数据库中状态设置为 false 的弓箭手编号 假设我有一条之前创建的记录 现在我想编辑现在设置为 false 的 Bowzer 我遇到了这个异常 我不知道如
  • apache-commons ftp 检索多个文件

    我正在尝试使用 apache commons net FTP lib 从 FTP 服务器获取数据 如果目录中只有 1 个文件 该代码可以正常工作 但在我第二次调用retrieveFileStream 时始终返回 null 有什么想法吗 我编
  • iOS6,UIWebView 和位置:固定

    我们有一个 PhoneGap 应用程序 其导航栏和选项卡栏 实现 为固定位置的 div 参见屏幕 1 在 iOS6 中 当显示键盘时 这些 div 会出现一些奇怪的行为 当我们输入第一个字母时 div 将消失 并显示空白区域 当我们关闭键盘
  • 多处理和 Selenium Python

    我有 3 个驱动程序 Firefox 浏览器 我希望它们能够do something在网站列表中 我有一个工人定义为 def worker browser queue while True id queue get True obj Rev
  • 在elasticsearch上查找具有空字符串值的文档

    我一直在尝试使用elasticsearch 仅过滤那些正文中包含空字符串的文档 到目前为止我还没有运气 在继续之前 我应该提到我已经尝试过many 解决方案 在 Interwebz 和 StackOverflow 上传播 因此 下面是我尝试
  • .NET LocalReport / .rdlc AppDomain 问题

    我正在使用 Microsoft Reporting WebForms LocalReport 和 rdlc 报告文件生成 pdf s 这是在 Windows 服务 NET 4 6 x64 VS2015 的后台完成的 我有两个问题 Windo
  • 多个 canActivate 防护在第一次失败时全部运行

    我有一条有两个人的路线canActivate警卫 AuthGuard and RoleGuard 首先 AuthGuard 检查用户是否已登录 如果没有 则重定向到登录页面 第二个检查用户是否定义了允许查看页面的角色 如果没有 则重定向到未
  • 为什么 imagemagick 中的 PNG 图像的 readimage 和 writeimage 需要花费大量时间?

    我正在使用 Imagemagick 版本 7 0 5 4 来执行图像处理操作 例如裁剪 调整大小等去图形 https github com gographics imagick图书馆 我还管理一个魔法棒对象池 Features Cipher
  • 常量折叠的具体规则是什么?

    我刚刚意识到 CPython 似乎对表示相同值的常量表达式的处理方式与常量折叠不同 例如 gt gt gt import dis gt gt gt dis dis 2 66 1 0 LOAD CONST 0 2 2 LOAD CONST 1
  • BigQuery 的速度是否足以满足实时现场请求

    我正在研究是否可以使用 BigQuery 及其 API 根据访问者查看的内容进行现场查询 因此 响应时间至关重要 我加载了一个包含 10k 行 4 列 的非常简单的结构化数据集 并运行了一个非常简单的查询 这需要 1 到 2 秒的时间 希望
  • CorFlags.exe /32BIT+ 如何工作?

    我想我的问题是关于CLR http en wikipedia org wiki Common Language Runtime装载机 我想了解背后的机制CorFlags exe http msdn microsoft com en us l
  • 无法按升序对列表进行排序

    Map
  • 实验::可选的 nullopt_t 构造函数

    Here http www open std org JTC1 SC22 WG21 docs papers 2013 n3793 html optional nullopt被描述为nullopt t and nullopt为了optiona
  • EKCalendar 中的“完整日历同步”到底是什么?

    的文档EKCalendar类指出了这一点calendarIdentifier财产 与日历完全同步将丢失此标识符 你应该 有一个处理标识符为 no 的日历的计划 通过缓存其其他属性 可以更长时间地获取 完全同步 究竟何时发生以及除了calen
  • pySpark将mapPartitions的结果转换为spark DataFrame

    我有一项工作需要在分区的 Spark 数据帧上运行 该过程如下所示 rdd sp df repartition n partitions partition key rdd mapPartitions lambda x some funct