Spark聚合函数——aggregateByKey是如何工作的?

2024-01-12

假设我有一个分布在 3 个节点上的系统,并且我的数据分布在这些节点之间。例如,我有一个 test.csv 文件,该文件存在于所有 3 个节点上,并且包含 2 列:

**row   | id,  c.**
---------------
row1  | k1 , c1  
row2  | k1 , c2  
row3  | k1 , c3  
row4  | k2 , c4  
row5  | k2 , c5  
row6  | k2 , c6  
row7  | k3 , c7  
row8  | k3 , c8  
row9  | k3 , c9  
row10 | k4 , c10   
row11 | k4 , c11  
row12 | k4 , c12 

然后我使用 SparkContext.textFile 将文件读取为 rdd 等。据我了解,每个 Spark 工作节点都会从文件中读取一部分。现在假设每个节点将存储:

  • 节点1:第1~4行
  • 节点2:第5~8行
  • 节点3:第9~12行

我的问题是,假设我想对这些数据进行计算,并且有一个步骤我需要将键分组在一起,因此键值对将是[k1 [{k1 c1} {k1 c2} {k1 c3}]]..等等。

有一个函数叫做groupByKey()使用起来非常昂贵,并且aggregateByKey()推荐使用。所以我想知道如何groupByKey() and aggregateByKey()在幕后工作?有人可以用我上面提供的例子来解释一下吗?打乱后,行驻留在每个节点上的哪里?


aggregateByKey() 与reduceByKey 有很大不同。所发生的情况是,reduceByKey 是aggregateByKey 的一种特殊情况。

aggregateByKey() 将组合特定键的值,这种组合的结果可以是您指定的任何对象。您必须指定如何在一个分区(在同一节点中执行)内组合(“添加”)值以及如何组合来自不同分区(可能位于不同节点中)的结果。 reduceByKey 是一种特殊情况,因为组合的结果(例如求和)与值的类型相同,并且从不同分区组合时的操作也与组合内部值时的操作相同。分割。

一个例子: 想象一下你有一个配对列表。您将其并行化:

val pairs = sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5)))

现在你想通过按键“组合”它们来产生总和。在这种情况下,reduceByKey 和aggregateByKey 是相同的:

val resReduce = pairs.reduceByKey(_ + _) //the same operation for everything
resReduce.collect
res3: Array[(String, Int)] = Array((b,7), (a,9))

//0 is initial value, _+_ inside partition, _+_ between partitions
val resAgg = pairs.aggregateByKey(0)(_+_,_+_)
resAgg.collect
res4: Array[(String, Int)] = Array((b,7), (a,9))

现在,假设您希望聚合是一组值,这是与整数不同的类型(整数之和也是整数):

import scala.collection.mutable.HashSet
//the initial value is a void Set. Adding an element to a set is the first
//_+_ Join two sets is the  _++_
val sets = pairs.aggregateByKey(new HashSet[Int])(_+_, _++_)
sets.collect
res5: Array[(String, scala.collection.mutable.HashSet[Int])]  =Array((b,Set(7)), (a,Set(1, 5, 3)))
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark聚合函数——aggregateByKey是如何工作的? 的相关文章

  • 无法在 SBT 中运行 Apache Spark 相关单元测试 - NoClassDefFoundError

    我有一个简单的单元测试 使用SparkContext 我可以在 IntelliJ Idea 中运行单元测试 没有任何问题 但是 当尝试从 SBT shell 运行相同的测试时 我收到以下错误 java lang NoClassDefFoun
  • 如何从spark管道逻辑模型中提取变量权重?

    我目前正在尝试学习 Spark Pipeline Spark 1 6 0 我将数据集 训练和测试 导入为 oas sql DataFrame 对象 执行以下代码后 生成的模型是oas ml tuning CrossValidatorMode
  • 使用 pyspark 计算所有可能的单词对

    我有一个文本文档 我需要找到整个文档中重复单词对的可能数量 例如 我有下面的word文档 该文档有两行 每行用 分隔 文档 My name is Sam My name is Sam My name is Sam My name is Sa
  • 如何将模型结果保存到文本文件?

    我正在尝试将从模型生成的频繁项集保存到文本文件中 该代码是 Spark ML 库中 FPGrowth 示例的示例 Using saveAsTextFile直接在模型上写入 RDD 位置而不是实际值 import org apache spa
  • 如何在 Apache Spark 中通过 DStream 使用特征提取

    我有通过 DStream 从 Kafka 到达的数据 我想进行特征提取以获得一些关键词 我不想等待所有数据的到达 因为它是可能永远不会结束的连续流 所以我希望以块的形式执行提取 如果准确性会受到一点影响 对我来说并不重要 到目前为止 我整理
  • 如何读取一次流数据集并输出到多个接收器?

    我有 Spark 结构化流作业 它从 S3 读取数据 转换数据 然后将其存储到一个 S3 接收器和一个 Elasticsearch 接收器 目前 我正在做readStream一次然后writeStream format start 两次 这
  • 在 Spark MLlib 上使用 Java 中的 Breeze

    在尝试从Java使用MLlib时 使用微风矩阵运算的正确方法是什么 例如scala 中的乘法很简单 matrix vector 相应的功能在Java中是如何表达的 有一些方法 例如 colon times 可以通过正确的方式调用 breez
  • Hive - 线程安全的自动递增序列号生成

    我遇到一种情况 需要将记录插入到特定的 Hive 表中 其中一列需要是自动递增的序列号 即在任何时间点都必须严格遵循 max value 1 规则 记录从许多并行的 Hive 作业插入到这个特定的表中 这些作业每天 每周 每月批量运行 现在
  • 如何使用 Spark 2 屏蔽列?

    我有一些表 我需要屏蔽其中的一些列 要屏蔽的列因表而异 我正在读取这些列application conf file 例如 对于员工表如下所示 id name age address 1 abcd 21 India 2 qazx 42 Ger
  • 错误:无法找到或加载主类 org.apache.spark.launcher.Main [重复]

    这个问题在这里已经有答案了 如果有人能帮我解决以下路径问题 我将不胜感激 我非常怀疑这与缺少路径设置有关 但不知道如何修复它 rxie ubuntu Downloads spark echo PATH usr bin java usr lo
  • 根据 pyspark 中的条件从数据框中删除行

    我有一个包含两列的数据框 col1 col2 22 12 2 1 2 1 5 52 1 2 62 9 77 33 3 我想创建一个新的数据框 它只需要行 col1 的值 gt col2 的值 就像注释一样col1 很长类型和col2 有双
  • 从 pyspark.sql 中的列表创建数据框

    我完全陷入了有线的境地 现在我有一个清单li li example data map lambda x get labeled prediction w x collect print li type li 输出就像 0 0 59 0 0
  • Spark.sql.shuffle.partitions 的最佳值应该是多少,或者在使用 Spark SQL 时如何增加分区?

    我实际上正在使用 Spark SQLhiveContext sql 它使用 group by 查询 我遇到了 OOM 问题 所以考虑增加价值spark sql shuffle partitions从默认的 200 到 1000 但这没有帮助
  • 在spark-kafka中使用schema将ConsumerRecord值转换为Dataframe

    我正在使用 Spark 2 0 2 和 Kafka 0 11 0 并且 我正在尝试在火花流中使用来自卡夫卡的消息 以下是代码 val topics notes val kafkaParams Map String Object bootst
  • 火花内存不足

    我有一个文件夹 里面有 150 G 的 txt 文件 大约 700 个文件 平均每个 200 MB 我使用 scala 来处理文件并最终计算一些汇总统计数据 我认为有两种可能的方法可以做到这一点 手动循环所有文件 对每个文件进行计算并最终合
  • 如何使用 Scala 从 Spark 更新 ORC Hive 表

    我想更新 orc 格式的 hive 表 我可以从 ambari hive 视图进行更新 但无法从 sacla spark shell 运行相同的更新语句 objHiveContext sql select from table name 能
  • 数量重新分配逻辑 - 具有外部数据集的 MapGroups

    我正在研究一种复杂的逻辑 需要将数量从一个数据集重新分配到另一个数据集 在例子中我们有Owner and Invoice 我们需要从数量中减去Invoice准确地Owner匹配 在给定汽车的给定邮政编码处 减去的数量需要重新分配回同一辆车出
  • Spark:导入UTF-8编码的文本文件

    我正在尝试处理一个包含很多特殊字符的文件 例如德语变音符号 o 等 如下所示 sc hadoopConfiguration set textinputformat record delimiter r n r n sc textFile f
  • 使用已知模式保存空 DataFrame (Spark 2.2.1)

    是否可以使用已知模式保存一个空的 DataFrame 以便将该模式写入文件 即使它有 0 条记录 def example spark SparkSession path String schema StructType val datafr
  • 缩放数据框的每一列

    我正在尝试缩放数据框的每一列 首先 我将每一列转换为向量 然后使用 ml MinMax Scaler 除了简单地重复它之外 是否有更好 更优雅的方法将相同的函数应用于每一列 import org apache spark ml linalg

随机推荐

  • Firestore 通过数组的字段值进行查询

    我正在尝试运行一个简单的查询 在其中搜索包含对象数组内的值的文档 例如 看看我的数据库结构 我想运行与此类似的查询 db collection identites where partyMembers array contains name
  • 向 Objective-C 添加“forCount”控制结构的最佳方法?

    Adam Ko 为这个问题提供了一个很好的解决方案 感谢 Adam Ko 顺便说一句 如果您像我一样喜欢 c 预处理器 处理 defines 的东西 您可能不知道 XCode 中有一个方便的东西 右键单击您的一个开源文件的主体 然后向下靠近
  • Vertex AI - 部署的模型预测与评估结果的预测不同

    我使用 AutoML 训练了一个多标签文本分类模型 然后 我部署了模型并尝试测试我们在模型注册表的评估选项卡中提供的一些输入 我遇到的问题是 我通过两种测试方法获得的预测输出值与模型注册表的评估选项卡中显示的输出值不匹配 我已经包含了一个此
  • 使用 LINQ 在 ASP.NET MVC 中传递数据 - 疯狂

    首先请允许我说 我是 ASP NET MVC 方面的高手 我喜欢它 但我是个n00b 我正在尝试从 LINQ 查询传回 复杂 数据 我了解如何使用数据上下文 然后在发送回数据时强制转换该数据 但是当我执行返回匿名类型的更复杂的 LINQ 查
  • 关于 PermissionEx (WIX) 的问题

    我是 WIX 新手 我正在使用 util PermissionEx 创建 ACL 我可以成功设置读 写 读和执行等权限 但找不到有关设置修改权限的任何信息 我尝试使用 Append 属性来实现此功能 这似乎是唯一可以实现所需功能的属性 但是
  • CSS 转换导致 div 在 Safari 中重叠?

    为什么transform rotateY 导致 div 仅在 Safari 中重叠 以下是一些屏幕截图 可以更好地解释 它应该是什么样子 它不应该是什么样子 仅出现在 Safari 中 这是非常奇怪的行为 我已经删除了transform r
  • Discord.py on_member_join 没有响应

    我正在使用一个 client event为我的功能on member join事件 我希望它在用户加入时发送消息 但是控制台没有响应或错误 这是我当前尝试的代码 client event async def on member join m
  • Java 中的 String[] args 有什么意义?

    每当您在类中声明 main 方法时 您总是必须执行String名为 args 的数组 重点是什么 除非我生活在岩石下 否则 Java 中的命令行参数几乎不再使用 当我尝试运行这个时 this program won t compile pu
  • C# 给定货币代码格式化货币(如 USD / GBP / FRF)

    我正在与返回货币的数据库集成 System Decimal 和货币代码 货币代码是类似的字符串 USD GBP 乃至 FRF mscorlib 是否有内置的东西可以帮助我格式化这些货币 我首先想到的是在数据库货币代码和CultureInfo
  • 使用 vscode 作为 sops 的编辑器

    我似乎无法得到sops跟 共事Visual Studio Code作为其编辑 vscode已经在PATH 然而 sops在控制台上打印解密的内容 而不是打开编辑器 c gt code this opens the Visual Studio
  • Node.js 子进程问题与参数 - 引号问题?,FFMPEG 问题?

    我需要能够从 Node js 应用程序执行 FFMPEG 我相信这个问题可能与正确指定命令行参数有关 而不是特定于 FFMPEG 但由于我无法缩小问题范围 所以我提出了我的整个问题 我可以执行以下命令从命令提示符成功 C Brad ffmp
  • 如何在 LaTeX 中对段落进行编号?

    给出一堆段落 Para A Para B Para C 如何让 LaTeX 自动对它们进行编号 即 1 Para A 2 Para B 3 Para C 我看过以下建议 newcounter parnum newcommand N noin
  • python 日志文件中的“无”

    我正在使用loggingpython 中的模块 当我在命令行上使用错误的参数调用脚本时 日志文件在某些 时候包含单个单词 None 我不知道它来自哪里 这是我的代码切割 我在其中执行logging exception Show script
  • 有时不会调用 UISplitViewControllerDelegate 中的 willHideViewController

    我有个问题 我的应用程序是一个选项卡栏控制器 它的第一个视图控制器是一个分割视图控制器 这对苹果来说似乎不太好 因为文件说分割的 voew 控制器必须是根 所以也许这就是我的问题的原因 问题是 有时 不会调用 UISplitViewCont
  • 如何在 Kendo 网格中自动启用或禁用滚动条?

    这是我创建网格的代码 if Model GenericEntityList Count gt 0 Html Kendo Grid Model GenericEntityList Name screenNames ToString Colum
  • python 中 select 的 CPU 使用率达到 100%

    我在 python 中的 select 有问题 我有一段代码允许客户端从服务器接收数据 并通过在 stdin 上读取并在服务器套接字上写入来发送数据 readfds s sys stdin writefds s sys stdout my
  • 了解 JMH 输出

    因此 我对几种方法运行了 JMH 基准测试 并得到了如下响应 我无法理解到底是什么Score and Error值表示 有相同的参考文档吗 举个例子 ss stream带参数n 100000花了约 平均运行 30 次迭代需要 1 363 微
  • 如果将 UIView 添加为子视图,则 UIButton 目标不起作用

    我有一个UIButton 它有两个子视图 然后我打电话 createButton addTarget self action selector openComposer forControlEvents UIControlEventTouc
  • Java 重新排序会影响 System.currentTimeMillis() 吗?

    根据Java内存模型 指令可以重新排序 只要执行格式良好的 http docs oracle com javase specs jls se7 html jls 17 html jls 17 4 7 所以我想知道 以下代码是否可能产生以下输
  • Spark聚合函数——aggregateByKey是如何工作的?

    假设我有一个分布在 3 个节点上的系统 并且我的数据分布在这些节点之间 例如 我有一个 test csv 文件 该文件存在于所有 3 个节点上 并且包含 2 列 row id c row1 k1 c1 row2 k1 c2 row3 k1