Spark中高效计算top-k元素

2024-01-11

我有一个类似于以下内容的数据框:

+---+-----+-----+
|key|thing|value|
+---+-----+-----+
| u1|  foo|    1|
| u1|  foo|    2|
| u1|  bar|   10|
| u2|  foo|   10|
| u2|  foo|    2|
| u2|  bar|   10|
+---+-----+-----+

并希望得到以下结果:

+---+-----+---------+----+
|key|thing|sum_value|rank|
+---+-----+---------+----+
| u1|  bar|       10|   1|
| u1|  foo|        3|   2|
| u2|  foo|       12|   1|
| u2|  bar|       10|   2|
+---+-----+---------+----+

目前,有类似的代码:

val df = Seq(("u1", "foo", 1), ("u1", "foo", 2), ("u1", "bar", 10), ("u2", "foo", 10), ("u2", "foo", 2), ("u2", "bar", 10)).toDF("key", "thing", "value")

 // calculate sums per key and thing
 val aggregated = df.groupBy("key", "thing").agg(sum("value").alias("sum_value"))

 // get topk items per key
 val k = lit(10)
 val topk = aggregated.withColumn("rank", rank over  Window.partitionBy("key").orderBy(desc("sum_value"))).filter('rank < k)

不过这段代码很效率低下。窗口函数生成一个总订单的项目并导致巨大的洗牌.

如何更有效地计算 top-k 项? 也许使用近似函数,即类似于草图https://datasketches.github.io/ https://datasketches.github.io/ or https://spark.apache.org/docs/latest/ml-frequent-pattern-mining.html https://spark.apache.org/docs/latest/ml-frequent-pattern-mining.html


这是推荐系统的经典算法。

case class Rating(thing: String, value: Int) extends Ordered[Rating] {
  def compare(that: Rating): Int = -this.value.compare(that.value)
}

case class Recommendation(key: Int, ratings: Seq[Rating]) {
  def keep(n: Int) = this.copy(ratings = ratings.sorted.take(n))
}

val TOPK = 10

df.groupBy('key)
  .agg(collect_list(struct('thing, 'value)) as "ratings")
  .as[Recommendation]
  .map(_.keep(TOPK))

您还可以在以下位置查看源代码:

  • Spotify 大数据 Rosetta 代码 /TopItemsPerUser.scala https://github.com/spotify/big-data-rosetta-code/blob/master/src/main/scala/com/spotify/bdrc/pipeline/TopItemsPerUser.scala,这里有几个针对 Spark 或 Scio 的解决方案
  • Spark MLLib /TopByKeyAggregator.scala https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/ml/recommendation/TopByKeyAggregatorSuite.scala,被认为是使用他们的推荐算法时的最佳实践,看起来他们的例子仍然使用RDD though.
import org.apache.spark.mllib.rdd.MLPairRDDFunctions._

sc.parallelize(Array(("u1", ("foo", 1)), ("u1", ("foo", 2)), ("u1", ("bar", 10)), ("u2", ("foo", 10)),
  ("u2", ("foo", 2)), ("u2", ("bar", 10))))
  .topByKey(10)(Ordering.by(_._2))

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark中高效计算top-k元素 的相关文章

  • 如何从 Databricks Delta 表中删除列?

    我最近开始发现 Databricks 并遇到了需要删除增量表的特定列的情况 当我使用 PostgreSQL 时 它就像 ALTER TABLE main metrics table DROP COLUMN metric 1 我正在浏览 Da
  • Spark:并行转换多个数据帧

    了解如何在并行转换多个数据帧时实现最佳并行性 我有一系列路径 val paths Array path1 path2 我从每个路径加载数据帧 然后转换并写入目标路径 paths foreach path gt val df spark re
  • Spark:出现心跳错误后丢失数据

    我有一个在 Spark 集群上运行的 Python 程序 有四个工作线程 它处理一个包含大约 1500 万条记录的巨大 Oracle 表 检查结果后发现大约有600万条记录没有插入 我的写入功能如下 df write format jdbc
  • 如何从字符串列中提取数字?

    我的要求是从列中的评论列中检索订单号comment并且总是开始于R 订单号应作为新列添加到表中 输入数据 code id mode location status comment AS SD 101 Airways hyderabad D
  • 如何将模型结果保存到文本文件?

    我正在尝试将从模型生成的频繁项集保存到文本文件中 该代码是 Spark ML 库中 FPGrowth 示例的示例 Using saveAsTextFile直接在模型上写入 RDD 位置而不是实际值 import org apache spa
  • HashPartitioner 是如何工作的?

    我阅读了文档HashPartitioner http spark apache org docs 1 3 1 api java index html org apache spark HashPartitioner html 不幸的是 除了
  • 如何读取一次流数据集并输出到多个接收器?

    我有 Spark 结构化流作业 它从 S3 读取数据 转换数据 然后将其存储到一个 S3 接收器和一个 Elasticsearch 接收器 目前 我正在做readStream一次然后writeStream format start 两次 这
  • 如何使用 Spark 2 屏蔽列?

    我有一些表 我需要屏蔽其中的一些列 要屏蔽的列因表而异 我正在读取这些列application conf file 例如 对于员工表如下所示 id name age address 1 abcd 21 India 2 qazx 42 Ger
  • Spark:如何使用crossJoin

    我有两个数据框 df1有 100000 行并且df2有 10000 行 我想创建一个df3这是两者的交叉连接 val df3 df1 crossJoin df2 这将产生 10 亿行 尝试在本地运行它 但似乎需要很长时间 您认为本地可以实现
  • Scala:什么是 CompactBuffer?

    我试图弄清楚 CompactBuffer 的含义 和迭代器一样吗 请解释其中的差异 根据 Spark 的文档 它是 ArrayBuffer 的替代方案 可以提供更好的性能 因为它分配的内存更少 以下是 CompactBuffer 类文档的摘
  • 如何将 Pyspark Dataframe 标题设置到另一行?

    我有一个如下所示的数据框 col1 col2 col3 id name val 1 a01 X 2 a02 Y 我需要从中创建一个新的数据框 使用 row 1 作为新的列标题并忽略或删除 col1 col2 等行 新表应如下所示 id na
  • 返回年份数组作为年份范围

    我正在尝试查询一个包含以下内容的表character varying 年份列 并将这些年份作为逗号分隔的年份范围字符串返回 年份范围将由数组中存在的连续年份确定 不连续的年份 年份范围应以逗号分隔 数据类型的原因是character var
  • Spark 中的 Distinct() 函数如何工作?

    我是 Apache Spark 的新手 正在学习基本功能 有一个小疑问 假设我有一个元组 键 值 的 RDD 并且想从中获取一些唯一的元组 我使用distinct 函数 我想知道该函数基于什么基础认为元组是不同的 是基于键 值还是两者 di
  • Spark/Gradle -- 在 build.gradle 中获取 IP 地址以用于启动 master 和worker

    我在基本层面上了解 build gradle 构建脚本的各个移动部分 但无法将它们全部结合在一起 在 Apache Spark 独立模式下 只需尝试从 build gradle 在同一个机器上启动 master 和worker 稍后将使用
  • Spark scala 模拟 Spark.implicits 用于单元测试

    当尝试使用 Spark 和 Scala 简化单元测试时 我使用 scala test 和mockito scala 以及mockito Sugar 这只是让你做这样的事情 val sparkSessionMock mock SparkSes
  • 如何根据条件添加新列(而不面临 JaninoRuntimeException 或 OutOfMemoryError)?

    尝试根据这样的条件创建具有多个附加列的 Spark 数据框 df withColumn name1 someCondition1 withColumn name2 someCondition2 withColumn name3 someCo
  • 如何引用下一行的数据?

    我正在 PostgreSQL 9 2 中编写一个函数 对于股票价格和日期的表 我想计算每个条目较前一天的百分比变化 对于最早一天的数据 不会有前一天 因此该条目可以简单地为 Nil 我知道WITH声明可能不应该高于IF陈述 到目前为止 这就
  • 使用 Python 计算 Spark 中成对 (K,V) RDD 中每个 KEY 的平均值

    我想与 Python 共享这个特定的 Apache Spark 解决方案 因为它的文档非常贫乏 我想通过 KEY 计算 K V 对 存储在 Pairwise RDD 中 的平均值 示例数据如下所示 gt gt gt rdd1 take 10
  • pyspark 将 twitter json 流式传输到 DF

    我正在从事集成工作spark streaming with twitter using pythonAPI 我看到的大多数示例或代码片段和博客是他们从Twitter JSON文件进行最终处理 但根据我的用例 我需要所有字段twitter J
  • 如果两个阶段使用相同的 DataFrame,spark 是否会读取同一文件两次?

    以下代码读取相同的 csv 两次 即使只调用一个操作 端到端可运行示例 import pandas as pd import numpy as np df1 pd DataFrame np arange 1 000 reshape 1 1

随机推荐

  • 人工智能与模式识别的社会影响与应用

    1 背景介绍 人工智能 Artificial Intelligence AI 和模式识别 Pattern Recognition PR 是计算机科学领域的两个重要分支 人工智能研究如何让计算机具有类似人类智能的能力 如学习 推理 理解自然语
  • 线性代数在深度学习中的角色

    1 背景介绍 深度学习是一种人工智能技术 它主要通过神经网络来学习和模拟人类大脑的思维过程 线性代数是一门数学分支 它研究的是向量和矩阵的运算 在深度学习中 线性代数起着非常重要的作用 因为它为神经网络提供了数学模型和计算方法 在这篇文章中
  • 慢思维的力量:如何解决复杂问题

    1 背景介绍 在当今的快速发展和竞争激烈的环境中 我们需要更有效地解决复杂问题 这需要我们具备一种称为慢思维的思考方式 它可以帮助我们更好地理解问题 制定更好的解决方案 本文将介绍慢思维的核心概念 算法原理 具体操作步骤以及数学模型公式 并
  • 强化学习求解TSP(二):Qlearning求解旅行商问题TSP(提供Python代码)

    一 Qlearning简介 Q learning是一种强化学习算法 用于解决基于奖励的决策问题 它是一种无模型的学习方法 通过与环境的交互来学习最优策略 Q learning的核心思想是通过学习一个Q值函数来指导决策 该函数表示在给定状态下
  • 线性代数在数据挖掘中的应用

    1 背景介绍 线性代数是数学的一个分支 主要研究的是线性方程组和向量的相关概念和方法 在数据挖掘领域 线性代数的应用非常广泛 包括数据处理 特征提取 模型训练等方面 本文将从以下几个方面进行阐述 背景介绍 核心概念与联系 核心算法原理和具体
  • 6 - 数据备份与恢复|innobackupex

    数据备份与恢复 innobackupex 数据备份与恢复 数据备份相关概念 物理备份与恢复 逻辑备份 推荐 使用binlog日志文件实现对数据的时时备份 使用日志 恢复数据
  • 心灵与大脑的沟通:如何让大脑更好地理解我们的情感

    1 背景介绍 心理学和人工智能之间的界限已经不断模糊化 尤其是在情感智能方面 情感智能是一种新兴的人工智能技术 旨在让计算机更好地理解和回应人类的情感 这篇文章将探讨如何让大脑更好地理解我们的情感 以及在这个过程中涉及的核心概念 算法原理
  • AI大模型应用入门实战与进阶:Part 7 Transformer模型解析

    1 背景介绍 自从2020年的大模型如GPT 3等开始引起广泛关注 人工智能领域的研究和应用得到了重大推动 在这一波技术创新中 Transformer模型发挥着关键作用 这篇文章将深入探讨Transformer模型的核心概念 算法原理和实例
  • 强化学习求解TSP(一):Qlearning求解旅行商问题TSP(提供Python代码)

    一 Qlearning简介 Q learning是一种强化学习算法 用于解决基于奖励的决策问题 它是一种无模型的学习方法 通过与环境的交互来学习最优策略 Q learning的核心思想是通过学习一个Q值函数来指导决策 该函数表示在给定状态下
  • 如何成为一名数据科学家:必须掌握的技能和知识

    1 背景介绍 数据科学家是一种新兴的职业 它结合了计算机科学 统计学 数学和领域知识等多个领域的知识和技能 以解决实际问题 数据科学家的主要任务是收集 清洗 分析和解释大量数据 从中挖掘有价值的信息和知识 并将其应用于决策和预测 数据科学家
  • 机器智能与人类智能的竞争:技术创新的驱动力

    1 背景介绍 人工智能 Artificial Intelligence AI 和机器学习 Machine Learning ML 是最近几年最热门的技术领域之一 随着数据量的增加和计算能力的提高 机器学习技术的发展得到了极大的推动 机器学习
  • 电脑快速打开计算器的方法

    大家好 我是爱你三千遍斯塔克 我们平常在运算时 经常要要使用计算器 那么计算器有什么快速打开方法吗 这里有一些参考方法 可供大家进行参考 希望对大家有帮助 希望你喜欢我的内容 记得关注我哦 我会继续为大家带来更好的作 1 win R 打开运
  • 机器学习中的知识共享:模型与数据的交流与协作

    1 背景介绍 机器学习 Machine Learning 是一种通过数据学习模式和规律的计算机科学领域 在过去的几年里 机器学习技术在各个领域得到了广泛应用 如图像识别 自然语言处理 推荐系统等 随着数据规模的不断增长 单个机器学习模型的复
  • 人工智能与机器学习:未来的编程范式

    1 背景介绍 人工智能 Artificial Intelligence AI 和机器学习 Machine Learning ML 是现代计算机科学的重要领域之一 它们旨在让计算机能够自主地学习 理解和进化 以解决复杂的问题 随着数据量的增加
  • JDK21和 Flowable 7.0.0

    JDK21和 Flowable 7 0 0 一 Flowable 二 项目搭建 1 依赖包 2 数据库 3 资源文件 1 YML配置文件 2 Drools kbase
  • 序列模型(4)—— Scaling Laws

    本文介绍 LLM 训练过程中重要的 Scaling Laws 这是一个 经验规律 指出了 固定训练成本 总计算量FLOPs C C C 时 如何调配模型规模 参数量 N N
  • golang中有没有一种有效的方法来计算执行时间?

    我正在寻找计算 go 执行时间的最佳方法 func main start time Now time Sleep time Second 2 something doing here elapsed time Since start fmt
  • PHP 正则表达式:找不到结束分隔符“^”

    我在使用正则表达式时遇到了一些麻烦 这是我的代码 pattern 0 9 if preg match pattern input echo yes else echo nope 我运行它并得到 警告 preg match function
  • 正则表达式检查 Facebook 视频 URL

    我尝试使用正则表达式检查 Facebook 视频网址 这是有效的 Facebook 视频 URL 示例 https www facebook com video php v 100000000000000 https www faceboo
  • Spark中高效计算top-k元素

    我有一个类似于以下内容的数据框 key thing value u1 foo 1 u1 foo 2 u1 bar 10 u2 foo 10 u2 foo 2 u2 bar 10 并希望得到以下结果 key thing sum value r