如何将聚合数据添加到 Apache Spark 中的原始数据集中?

2024-01-09

我试图弄清楚如何聚合数据集中的数据,然后使用 Apache Spark 将结果添加到原始数据集。 我已经尝试了 2 个我不满意的解决方案,我想知道是否有一个我没有看到的更具可扩展性和更高效的解决方案。

以下是输入和预期输出数据的非常简化的示例:

Input:

客户列表,以及每个客户的购买商品列表。

(John, [toast, butter])
(Jane, [toast, jelly])

Output:

客户列表、每个客户的已购买商品列表以及每个商品的购买该商品的客户数量。

(John, [(toast, 2), (butter, 1)])
(Jane, [(toast, 2), (jelly, 1)])

以下是我迄今为止尝试过的解决方案,列出了步骤和输出数据。

解决方案#1:

Start with a pair rdd:
(John, [toast, butter])
(Jane, [toast, jelly])

flatMapToPair:
(toast, John)
(butter, John)
(toast, Jane)
(jelly, Jane)

aggregateByKey: 
(toast, [John, Jane])
(butter, [John])
(jelly, [Jane])

flatMapToPair: (using the size of the list of customers)
(John, [(toast, 2), (butter, 1)])
(Jane, [(toast, 2), (jelly, 1)])

虽然这适用于小型数据集,但对于较大的数据集来说这是一个糟糕的想法,因为在某一时刻,您为每个产品保存了一个巨大的客户列表,这些客户列表可能无法放入执行器的内存中。

解决方案#2:

Start with a pair rdd:
(John, [toast, butter])
(Jane, [toast, jelly])

flatMapToPair:
(toast, John)
(butter, John)
(toast, Jane)
(jelly, Jane)

aggregateByKey: (counting customers without creating a list)
(toast, 2)
(butter, 1)
(jelly, 1)

join: (using the two previous results)
(toast, (John, 2))
(butter, (John, 1))
(toast, (Jane, 2))
(jelly, (Jane, 1))

mapToPair:
(John, (toast, 2))
(John, (butter, 1))
(Jane, (toast, 2))
(Jane, (jelly, 1))

aggregateByKey:
(John, [(toast, 2), (butter, 1)])
(Jane, [(toast, 2), (jelly, 1)])

这个解决方案应该可行,但我觉得应该有一些其他可能不涉及加入 RDD 的解决方案。

对于这个问题是否有更可扩展/更高效/更好的“解决方案#3”?


这是一个dataframe供你尝试和玩耍的方式

如果您已经有配对rdds然后打电话toDF与列名应该给你一个dataframe as

val df = pairedRDD.toDF("key", "value")

应该是

+----+---------------+
|key |value          |
+----+---------------+
|John|[toast, butter]|
|Jane|[toast, jelly] |
+----+---------------+

现在你所要做的就是explode, groupby, 计数的聚合然后再次explode, groupby and 聚合以获得带有计数的原始数据集 as

import org.apache.spark.sql.functions._
df.withColumn("value", explode(col("value")))
  .groupBy("value").agg(count("value").as("count"), collect_list("key").as("key"))
  .withColumn("key", explode(col("key")))
  .groupBy("key").agg(collect_list(struct("value", "count")).as("value"))

这应该给你

+----+-----------------------+
|key |value                  |
+----+-----------------------+
|John|[[toast,2], [butter,1]]|
|Jane|[[jelly,1], [toast,2]] |
+----+-----------------------+

您可以进一步处理dataframe或改回rdd通过使用.rdd api.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将聚合数据添加到 Apache Spark 中的原始数据集中? 的相关文章

  • Spark 执行器 STDOUT 到 Kubernetes STDOUT

    我在 Spark Worker 中运行的 Spark 应用程序将执行程序日志输出到特定文件路径 worker home directory app xxxxxxxx 0 stdout I used log4j properties将日志从
  • 对于“迭代算法”,转换为 RDD 然后再转换回 Dataframe 有什么优势

    我在读高性能火花作者提出以下主张 虽然 Catalyst 优化器非常强大 但它目前遇到挑战的情况之一是非常大的查询计划 这些查询计划往往是迭代算法的结果 例如图算法或机器学习算法 一个简单的解决方法是将数据转换为 RDD 并在每次迭代结束时
  • Spark Scala:按小时或分钟计算两列的 DateDiff

    我在数据框中有两个时间戳列 我想获取它们的分钟差异 或者小时差异 目前我可以通过四舍五入获得日差 val df2 df1 withColumn time datediff df1 ts1 df1 ts2 但是 当我查看文档页面时https
  • Spark:替换嵌套列中的空值

    我想更换所有n a以下数据框中的值unknown 它可以是scalar or complex nested column 如果它是一个StructField column我可以循环遍历列并替换n a using WithColumn 但我希
  • ';'预期但发现“导入” - Scala 和 Spark

    我正在尝试使用 Spark 和 Scala 来编译一个独立的应用程序 我不知道为什么会收到此错误 topicModel scala 2 expected but import found error import org apache sp
  • 如何在 Apache Spark 中通过 DStream 使用特征提取

    我有通过 DStream 从 Kafka 到达的数据 我想进行特征提取以获得一些关键词 我不想等待所有数据的到达 因为它是可能永远不会结束的连续流 所以我希望以块的形式执行提取 如果准确性会受到一点影响 对我来说并不重要 到目前为止 我整理
  • 如何使用 Spark 2 屏蔽列?

    我有一些表 我需要屏蔽其中的一些列 要屏蔽的列因表而异 我正在读取这些列application conf file 例如 对于员工表如下所示 id name age address 1 abcd 21 India 2 qazx 42 Ger
  • pyspark flatmat 错误:TypeError:“int”对象不可迭代

    这是我书中的示例代码 from pyspark import SparkConf SparkContext conf SparkConf setMaster spark chetan ThinkPad E470 7077 setAppNam
  • Scala Spark 包含与不包含

    我可以使用 contains 过滤 RDD 中的元组 如下所示 但是使用 不包含 来过滤 RDD 又如何呢 val rdd2 rdd1 filter x gt x 1 contains 我找不到这个的语法 假设这是可能的并且我没有使用Dat
  • Spark 中的 Distinct() 函数如何工作?

    我是 Apache Spark 的新手 正在学习基本功能 有一个小疑问 假设我有一个元组 键 值 的 RDD 并且想从中获取一些唯一的元组 我使用distinct 函数 我想知道该函数基于什么基础认为元组是不同的 是基于键 值还是两者 di
  • Spark 有没有办法捕获执行器终止异常?

    在执行我的 Spark 程序期间 有时 其原因对我来说仍然是个谜 yarn 会杀死容器 执行器 并给出超出内存限制的消息 我的程序确实恢复了 但 Spark 通过生成一个新容器重新执行任务 但是 在我的程序中 任务还会在磁盘上创建一些中间文
  • 在 Spark 结构化流 2.3.0 中连接两个流时,左外连接不发出空值

    两个流上的左外连接不发出空输出 它只是等待记录添加到另一个流中 使用套接字流来测试这一点 在我们的例子中 我们想要发出具有 null 值的记录 这些记录与 id 不匹配或 且不属于时间范围条件 水印和间隔的详细信息如下 val ds1Map
  • 从 pyspark.sql 中的列表创建数据框

    我完全陷入了有线的境地 现在我有一个清单li li example data map lambda x get labeled prediction w x collect print li type li 输出就像 0 0 59 0 0
  • 纱线上的火花,连接到资源管理器 /0.0.0.0:8032

    我正在我的开发机器 Mac 上编写 Spark 程序 hadoop的版本是2 6 spark的版本是1 6 2 hadoop集群有3个节点 当然都在linux机器上 我在idea IDE中以spark独立模式运行spark程序 它运行成功
  • Spark scala 模拟 Spark.implicits 用于单元测试

    当尝试使用 Spark 和 Scala 简化单元测试时 我使用 scala test 和mockito scala 以及mockito Sugar 这只是让你做这样的事情 val sparkSessionMock mock SparkSes
  • 如何将包含多个字段的大型 csv 加载到 Spark

    新年快乐 我知道以前曾提出 回答过此类类似的问题 但是 我的问题有所不同 我有大尺寸的 csv 有 100 个字段和 100MB 我想将其加载到 Spark 1 6 进行分析 csv 的标题看起来像附件sample http www roc
  • 在spark-kafka中使用schema将ConsumerRecord值转换为Dataframe

    我正在使用 Spark 2 0 2 和 Kafka 0 11 0 并且 我正在尝试在火花流中使用来自卡夫卡的消息 以下是代码 val topics notes val kafkaParams Map String Object bootst
  • 了解 Spark 中的 DAG

    问题是我有以下 DAG 我认为当需要洗牌时 火花将工作划分为不同的阶段 考虑阶段 0 和阶段 1 有些操作不需要洗牌 那么为什么 Spark 将它们分成不同的阶段呢 我认为跨分区的实际数据移动应该发生在第 2 阶段 因为这里我们需要cogr
  • 如何使用 Scala 从 Spark 更新 ORC Hive 表

    我想更新 orc 格式的 hive 表 我可以从 ambari hive 视图进行更新 但无法从 sacla spark shell 运行相同的更新语句 objHiveContext sql select from table name 能
  • 我可以在没有 Hadoop 的情况下使用 Spark 作为开发环境吗?

    我对大数据和相关领域的概念非常陌生 如果我犯了一些错误或拼写错误 我很抱歉 我想了解阿帕奇火花 http spark apache org 并使用它仅在我的电脑中 在开发 测试环境中 由于Hadoop包含HDFS Hadoop分布式文件系统

随机推荐

  • R data.table 奇怪的值/引用语义

    这是一个后续问题this https stackoverflow com questions 62740267 is r data table documented to pass by reference as argument 检查这个
  • 我的应用因无效原因被 Instagram Basic Display API 审核拒绝

    我正在开发一个应用程序 允许用户在我的应用程序中连接他们的 Instagram 个人资料 因此 我的应用程序中的其他连接用户可以访问他们的 Instagram 个人资料 这类似于Tinder已经在他们的应用程序中实现了 参考这个链接 htt
  • 如何通过GLSL在THREE.js中实现MeshNormalMaterial?

    我想实现一个像这样的着色器网格法线材质 https threejs org docs api materials MeshNormalMaterial 但我不知道如何将法线转换为颜色 在 THREE js 中 我的测试1 varying v
  • 隐式转换,是否需要导入?

    I write object MyString implicit def stringToMyString s String new MyString s class MyString str String def camelize str
  • 如何在 C# 3.0 中比较两个通用列表? [复制]

    这个问题在这里已经有答案了 可能的重复 C 中有比较集合的内置方法吗 https stackoverflow com questions 43500 is there a built in method to compare collect
  • ExecJS::ProgramError:意外的标记:名称(选项)

    我的应用程序在本地环境中运行良好 我试图git pushHeroku 的构建 我的命令是 bundle install git add git commit am abcdef git push heroku master 然后我遇到了资产
  • 将 pandas DataFrame 写入 sql 时出现无效列名错误

    当我尝试将数据帧写入 ms sql server 时 如下所示 cnxn sqlalchemy create engine mssql pyodbc HOST PORT DATABASE driver SQL Server df to sq
  • 尽管未兑现的承诺,脚本还是结束了

    考虑一下 async function const arr await new Promise r gt arr push r console log done 脚本终止于await并且日志从未打印到标准输出 我不明白为什么 你的代码没有任
  • 从 F# 中特定位置的二进制文件读取整数的性能问题

    今天早上我问here https stackoverflow com questions 24381090 performance issue with reading integers from a binary file at spec
  • 运行“gem install Rails”时出错

    我的配置 操作系统 Ubuntu 12 04 rvm版本 14 25 红宝石版本 2 1 0p0 宝石版本 2 2 1 哪个红宝石 home tauhidul35 rvm rubies ruby 2 1 0 bin ruby 哪个 RVM
  • 如何使用 DocX 控制 Word 文档中的表格列宽?

    我正在尝试重新创建一个这样的表 我正在使用 DocX 库来操作 Word 文件 但在获取正确的宽度时遇到问题 尝试设置单元格的宽度似乎仅在未设置为窗口自动调整模式时才起作用 并且仅当指定宽度大于表格宽度的一半时才似乎调整大小 或者更确切地说
  • 如何保存音调改变后录制的音频?

    我正在努力录制声音 然后更改音频的音调并保存 我在录制语音后调用此方法 然后单击按钮更改音调 然后也会创建新文件 但无法收听音频 生成的音频没有语音 可能是什么错误 void saveEffectedAudioToFolder NSErro
  • setImageBitmap没有可见效果

    我有一个字节数组 其中包含从网络获取的图像 我使用 Bitmapfactory BitmapDrawable 和 setImageDrawable 将它们懒惰地加载到我的 UI 活动中 或者我至少尝试这样做 D 这是我的代码 Relativ
  • 将 2D 数组转换为 std::map?

    数组可以转换为std vector轻松高效 template
  • Linq 按布尔值排序

    我有一个 linq 查询 我想通过 f bar 它是一个字符串 排序 但我也想首先通过 f foo 它是一个布尔字段 排序 就像下面的查询一样 from f in foo orderby f foo f bar select f 虽然可以编
  • GTK:如何在开始时从元素上移除焦点

    我向应用程序添加了两个 GtkEntry 现在当我启动应用程序时 其中一个具有焦点 并且看起来已准备好进行编辑 不希望在开始时将焦点放在任何元素上 也不希望任何文本输入元素具有焦点或正在编辑 我只是想让他们显示他们的文本 然后如果用户想要更
  • Nginx 502 提供错误页面内容时?

    我一直在将 Nginx 设置为服务器上应用程序的反向代理 其中一部分包括具有外部内容 如图像 的维护页面 我找到了一种设置图像返回 200 的错误页面的方法 但看起来反向代理会改变整个环境 这是来自的原始解决方案nginx 维护页面有内容问
  • 允许的内存大小 262144 字节已耗尽(尝试分配 24576 字节)[重复]

    这个问题在这里已经有答案了 我对此快要发疯了 我收到下一条消息 Allowed memory size of 262144 bytes exhausted tried to allocate 24576 bytes 待办事项清单 检查 ph
  • Serverless调度java函数应该实现什么接口?

    我正在尝试使用无服务器编写 Java AWS Lambda 虽然我的由 HTTP 端点触发的 Lambda 部署并正常工作 但我在预定的尝试失败了 配置看起来像 来自无服务器 yml functions timedHandler handl
  • 如何将聚合数据添加到 Apache Spark 中的原始数据集中?

    我试图弄清楚如何聚合数据集中的数据 然后使用 Apache Spark 将结果添加到原始数据集 我已经尝试了 2 个我不满意的解决方案 我想知道是否有一个我没有看到的更具可扩展性和更高效的解决方案 以下是输入和预期输出数据的非常简化的示例