如何使用Spark的repartitionAndSortWithinPartitions?

2023-11-21

我正在尝试构建一个最小的工作示例repartitionAndSortWithinPartitions以便理解该函数。到目前为止我已经完成了(不起作用,不同的值会乱七八糟地乱七八糟)

def partval(partID:Int, iter: Iterator[Int]): Iterator[Tuple2[Int, Int]] = {
  iter.map( x => (partID, x)).toList.iterator
}

val part20to3_chaos = sc.parallelize(1 to 20, 3).distinct
val part20to2_sorted = part20to3_chaos.repartitionAndSortWithinPartitions(2)
part20to2_sorted.mapPartitionsWithIndex(partval).collect

但出现错误

Name: Compile Error
Message: <console>:22: error: value repartitionAndSortWithinPartitions is not a member of org.apache.spark.rdd.RDD[Int]
             val part20to2_sorted = part20to3_chaos.repartitionAndSortWithinPartitions(2)

我尝试使用scaladoc,但无法找到哪个类提供repartitionAndSortWithinPartitions。 (顺便说一句:这个 scaladoc 并不令人印象深刻:为什么MapPartitionsRDD丢失的?我如何寻找方法?)

意识到我需要一个分区对象,接下来我尝试

val rangePartitioner = new org.apache.spark.RangePartitioner(2, part20to3_chaos)
val part20to2_sorted = part20to3_chaos.repartitionAndSortWithinPartitions(rangePartitioner)
part20to2_sorted.mapPartitionsWithIndex(partval).collect

but got

Name: Compile Error
Message: <console>:22: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[Int]
 required: org.apache.spark.rdd.RDD[_ <: Product2[?,?]]
Error occurred in an application involving default arguments.
         val rPartitioner = new org.apache.spark.RangePartitioner(2, part20to3_chaos)

我如何让它编译?请问我可以得到一个工作示例吗?


你的问题是part20to3_chaos is an RDD[Int], while OrderedRDDFunctions.repartitionAndSortWithinPartitions是一种运行在RDD[(K, V)], where K是关键并且V是值。

repartitionAndSortWithinPartitions首先会重新分区基于提供的分区器的数据,然后按键排序:

/**
 * Repartition the RDD according to the given partitioner and, 
 * within each resulting partition, sort records by their keys.
 *
 * This is more efficient than calling `repartition` and then sorting within each partition
 * because it can push the sorting down into the shuffle machinery.
 */
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = 
  self.withScope {
    new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
}

所以看起来这并不完全是您要找的。

如果你想要一个普通的旧排序,你可以使用sortBy,因为它不需要密钥:

scala> val toTwenty = sc.parallelize(1 to 20, 3).distinct
toTwenty: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at distinct at <console>:33

scala> val sorted = toTwenty.sortBy(identity, true, 3).collect
sorted: Array[Int] = 
    Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

你经过的地方sortBy顺序(升序或降序)以及要创建的分区数量。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用Spark的repartitionAndSortWithinPartitions? 的相关文章

  • Scala 修饰符和类型参数化

    我正在创建一个记忆类 每个类都会记忆一个函数类型并具有以下定义 class MemoizedFunction1 T1 R f T1 gt R private this val cache mutable Map T1 R def apply
  • 在sbt的build.sbt文件中添加模块依赖信息

    我在 IntelliJ 中有一个多模块项目 如该屏幕截图所示 contexProcessor 模块依赖于 contextSummary 模块 一旦我在项目结构中设置了依赖项 IntelliJ 就会处理所有事情 然而 当我跑步时sbt tes
  • 如何抑制 EMR 上运行的 Spark-sql 的 INFO 消息?

    我正在 EMR 上运行 Spark 如中所述在 Amazon Elastic MapReduce 上运行 Spark 和 Spark SQL https aws amazon com articles 4926593393724923 本教
  • 在 URL 中嵌入 ETag

    有关 Play 中资产指纹识别的问题 如何要求 Play 在 URL 中嵌入 ETag 而不使用第三方插件 例如 如果 css resource cssETag 为1234 那么它就会变成 css responsive 1234 css 相
  • 为什么 Apache Spark 会读取嵌套结构中不必要的 Parquet 列?

    我的团队正在构建一个 ETL 流程 以使用 Spark 将原始分隔文本文件加载到基于 Parquet 的 数据湖 中 Parquet 列存储的承诺之一是查询将仅读取必要的 列条带 但我们看到意外的列被读取以获取嵌套模式结构 为了进行演示 下
  • Spark UDF 错误 - 不支持 Any 类型的架构

    我正在尝试创建一个 udf 它将列中的负值替换为 0 我的数据框名为 df 包含一列名为 avg x 这是我创建 udf 的代码 val noNegative udf avg acc x Double gt if avg acc x lt
  • 仅使用 Spark ML Pipelines 进行转换

    我正在开发一个项目 其中可配置的管道和 Spark DataFrame 更改的沿袭跟踪都是必不可少的 该管道的端点通常只是修改后的 DataFrame 将其视为 ETL 任务 对我来说最有意义的是利用现有的 Spark ML Pipelin
  • 在 Pandas UDF PySpark 中传递多列

    我想计算 PySpark DataFrame 两列之间的 Jaro Winkler 距离 Jaro Winkler 距离可通过所有节点上的 pyjarowinkler 包获得 pyjarowinkler 的工作原理如下 from pyjar
  • 特征/类类型参数优先于方法类型参数的规则是什么

    我已经使用 scala 一段时间了 我认为我真的开始理解一切 好吧 大多数事情 但我发现自己对 Map 类中的许多方法定义感到困惑 我知道 FoldLeft 等如何工作 但我感到困惑的是 Map 函数中使用的类型参数 我们以 FoldLef
  • JavaScript 中 Scala View 的等效项

    在斯卡拉中 view允许防止创建全新的集合 例如在Scala中 视图 有什么作用 https stackoverflow com questions 6799648 in scala what does view do JavaScript
  • 使用 Scala 在 Apache Spark 中拆分字符串

    我有一个数据集 其中包含以下格式的行 制表符分隔 Title lt t gt Text 现在对于每个单词Text 我想创建一个 Word Title 一对 例如 ABC Hello World gives me Hello ABC Worl
  • 如何访问jar中的静态资源(对应于src/main/resources文件夹)?

    我有一个火花流 https spark apache org streaming 使用 Maven 作为 jar 构建并使用spark submit脚本 应用程序项目布局遵循标准目录布局 myApp src main scala com m
  • Spark 有效地过滤大数据框中存在于小数据框中的条目

    我有一个 Spark 程序 它读取一个相对较大的数据帧 3 2 TB 其中包含 2 列 id name 和另一个相对较小的数据帧 20k 条目 其中包含单个列 id 我想做的是从大数据框中获取 id 和名称 如果它们出现在小数据框中 我想知
  • 我的sparkDF.persist(DISK_ONLY)数据存储在哪里?

    我想更多地了解spark中hadoop的持久化策略 当我使用 DISK ONLY 策略保存数据帧时 我的数据存储在哪里 路径 文件夹 我在哪里指定这个位置 对于简短的答案 我们可以看看文档 https spark apache org do
  • 在 Scala 中实现“.clone”

    我正在想办法 clone我自己的对象 在 Scala 中 这是为了模拟 因此可变状态是必须的 由此产生了克隆的全部需要 在提前模拟时间之前 我将克隆整个状态结构 这是我目前的尝试 abstract trait Cloneable A See
  • IntelliJ、Akka 和配置文件

    使用时akka http akka io 我放置akka conf in src main resources 当我run通过 sbt akka conf被正确识别 但当我运行 IntelliJ 时却没有 即使在gen idea 实现这一目
  • enableHiveSupport 在 java Spark 代码中引发错误[重复]

    这个问题在这里已经有答案了 我有一个非常简单的应用程序 尝试使用 Spark 从 src main resources 读取 orc 文件 我不断收到此错误 无法实例化具有 Hive 支持的 SparkSession 因为找不到 Hive
  • Shapeless 中 TypeClass 特征的 emptyCoproduct 和 coproduct 方法的用途是什么

    我并不完全清楚这样做的目的是什么emptyCoProduct and coproduct的方法TypeClass无形中的特质 什么时候会使用TypeClass特质而不是ProductTypeClass 这两种方法的实施方式有哪些示例 假设我
  • 如何拦截 Play 2 + Scala 中的所有控制器请求?

    在 Play 2 的 Java 风格中 有全局设置 onRequest http www playframework org documentation 2 0 4 JavaGlobal 可用于拦截所有传入控制器的请求 但在Scala 等效
  • Scala 不可变 Map 速度慢

    当我创建地图时 我有一段代码 val map gtfLineArr 8 split map split collect case Array k v gt k v toMap 然后我使用这张地图来创建我的对象 case class MyOb

随机推荐

  • 如何在 Angular 2 的组件中使用管道?

    我有一个管道类 它根据您传递的参数返回数据 我知道如何在我的 HTML 模板中使用它 符号 但我也想在我的组件中使用它 有没有办法直接从 Angular 2 中的组件或服务内部调用管道 您可以使用以下命令直接在代码中调用管道 YourPip
  • 重新加载完成时的 jqgrid 事件?

    我正在使用jqgrid 我可以看到我有多少行 如下所示 grid getGridParam records 我可以像这样重新加载一些不同的数据 grid trigger reloadGrid 但是 一旦我触发重新加载 我如何知道它何时完成加
  • 如何在 sqlite where 子句中使用编辑距离函数?

    我正在尝试实现 你是说吗 一种搜索功能 我正在尝试执行一个使用 levenshtein 函数的查询 该函数是用 ruby 编写的 我想知道如何在 sqlite3 查询中使用这个函数 我想这可能是这样的 results the db wher
  • PostgreSQL 日期差异

    我有一个计算日期差的 PostgreSQL 函数 CREATE OR REPLACE FUNCTION testDateDiff RETURNS int AS BODY DECLARE startDate TIMESTAMP DECLARE
  • 将外部资源文件夹添加到 Spring Boot

    我想添加一个相对于 jar 位置的资源文件夹 除了我的 jar 中的打包资源之外 例如 Directory Application jar resources test txt 我尝试过以下方法 Override public void a
  • ILogger 不尊重 Application Insights 的日志级别

    我一直在尝试使用 ASP NET Core 2 0 应用程序设置 Application Insights 在本地运行我的应用程序时 日志按预期显示在 Application Insights 中 但是 当部署到 Azure 应用服务时 虽
  • 使用 Prophet 时,“StanModel”对象没有属性“fit_class”

    我正在尝试使用先知 我已经安装了所有必需的软件包 pip install pandas numpy jupyterlab seaborn conda install pywin32 conda install c anaconda pyst
  • Delphi 2010 密码学库 [关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 目前不接受答案 你能推荐一个开源的 Delphi 加密库吗 德尔福 2006 德尔福 2009 和德尔福 2010 算法需要 DES MD5 SHA 1 以下是我在
  • 我应该使用属性还是 getter 和 setter?

    我知道在 python 中使用 getter 和 setter 不是 pythonic 相反 应该使用属性装饰器 但我想知道以下场景 我有一个使用一些实例属性初始化的类 然后我需要向类添加其他实例属性 如果我不使用设置器 那么我必须写obj
  • Javascript:如何转换数组?

    我在 javascript var 上有这个 这是一个 http 返回的数据 我不知道它是数组还是字符串 我们如何才能看到它 更新 使用 typeof 返回 字符串 所以它是一个字符串 nomeDominio gggg fa nomeDom
  • ImageMagick 或 GhostScript:将多页 TIFF 转换为多页 PDF

    我需要将多页 TIFF 转换为多页 PDF 我可以访问 ImageMagick 和 GhostScript 在 nix 环境中 我该怎么做呢 谢谢 UPDATE 事实证明我的测试文件是错误的 它没有多个页面 这让我认为我的命令是错误的 这似
  • Postgres 视图的 Django 模型

    Edit 我的要求似乎有些混乱 该模型适用于Postgres 视图我在迁移 0009 中创建的 我的印象是 Django 不会为模型生成迁移 如果它具有managed False选项 然而 它仍在尝试创建它 另外 我使用 Django 1
  • JSP - 使用 Apache Commons 上传文件

    为什么我的文件上传代码不起作用 我正在使用 commons fileupload 1 1 1 jar 另外 我在 NetBeans 6 1 中的 isMultipartContent 第二行中看到了删除线 Check that we hav
  • 纬度和经度错误 - Google Maps API JS 3.0

    一段时间后 谷歌地图似乎将属性名称 Qa或Pa 更改为Na或其他名称 var map function initialize var myLatlng new google maps LatLng 25 363882 131 044922
  • 设置 Canvas.LineTo 的线端样式

    有没有办法为 TCanvas LineTo 方法设置 lineends 的样式 它似乎默认为圆角末端 当 Pen Width 设置为较大值 例如 9 时 对于不同颜色的行中的多条线来说 这看起来非常糟糕 它看起来像这样 圆角末端 其中 代表
  • postgresqlNewConnection(drv, ...) RS-DBI 驱动程序中出现错误:(无法在 dbname 上连接 postgres@local

    我是新来的R我正在尝试使用 RStudio 连接到 PostgreSQL 我已经安装了RPostgreSQL并尝试了以下代码 gt library DBI lib loc R win library 3 2 gt library RPost
  • 部署到已在 IntelliJ IDEA 外部运行的 Tomcat 服务器

    我已经在我的 PC 上安装了 Tomcat 服务器 并将其添加为service在Windows中 所以现在每次我启动计算机时它都会启动 现在 在 IntelliJ 中我已经配置了一个应用程序服务器 那就是我的 Tomcat 服务器 但是当尝
  • 如何减少 Visual Studio 2015 中 Xamarin Forms 应用程序的用户代码大小?

    我今天在 Windows 7 上安装了 Visual Studio 2015 RTM 并在 Xamarin 上注册了一个新帐户 入门许可证 创建项目后 我尝试运行该应用程序 之前创建的 AVD 我得到的只是这个错误 mandroid 错误
  • 在 werkzeug 请求中检索 url 锚点

    我有一个 DAV 协议 可以在 url 锚点中存储带外数据 例如这ghi in DELETE abc def ghi 服务器是 Flask 应用程序 我可以看到请求通过网络传入tcpdump 但是当我查看 werkzeug Request
  • 如何使用Spark的repartitionAndSortWithinPartitions?

    我正在尝试构建一个最小的工作示例repartitionAndSortWithinPartitions以便理解该函数 到目前为止我已经完成了 不起作用 不同的值会乱七八糟地乱七八糟 def partval partID Int iter It