如何在大窗口上优化窗口聚合?

2024-04-21

我在 Spark 2.4.4 中使用带有大窗口的窗口函数,例如。

Window
  .partitionBy("id")
  .orderBy("timestamp")

在我的测试中,我有大约 70 个不同的 ID,但我可能有大约 200 000 行 ID。如果没有进一步的配置,我必须为我的执行器分配大量内存以避免这种 OOM:

org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 16384 bytes of memory, got 0
at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98)
at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.<init>(UnsafeInMemorySorter.java:128)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:161)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:128)
at org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.add(ExternalAppendOnlyUnsafeRowArray.scala:115)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.fetchNextPartition(WindowExec.scala:345)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.next(WindowExec.scala:371)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.next(WindowExec.scala:303)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage15.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$1.hasNext(WholeStageCodegenExec.scala:631)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.fetchNextRow(WindowExec.scala:314)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.<init>(WindowExec.scala:323)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11.apply(WindowExec.scala:303)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11.apply(WindowExec.scala:302)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

查看源代码,我发现了这个参数,根本没有记录:

spark.sql.windowExec.buffer.in.memory.threshold

给它一个大的尺寸(例如1.000.000),我不再需要那么多的内存。据我了解,这是缓冲的行数;我想增加这个参数不会重复执行程序内存中的行,但这对我来说并不是很清楚。

有人可以准确地解释一下执行器端如何处理窗口吗?为什么数据会重复?如何避免这种重复并使过程更快,每个窗口中有许多行?可以使用哪些参数?

Thx.


我发现了这个参数,它根本没有记录:

它是一个内部配置属性。

通过阅读源代码,我设法“收集”以下内容:

Spark.sql.windowExec.buffer.in.memory.threshold(内部)保证在内存中保存的行数阈值WindowExec物理操作员。

默认:4096

Use SQLConf.windowExecBufferInMemoryThreshold方法来访问当前的 价值。

说到内部属性WindowExec操作员,您可能还需要另一个来进行性能调整:

Spark.sql.windowExec.buffer.spill.threshold(内部)缓冲行数的阈值WindowExec物理操作员。

默认:4096

Use SQLConf.windowExecBufferSpillThreshold方法来访问当前值。

唉,我无法完全解释其内部原理。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在大窗口上优化窗口聚合? 的相关文章

  • 在 Spark 中分发 scikit learn 分类器的推荐方法是什么?

    我已经使用 scikit learn 构建了一个分类器 现在我想使用 Spark 在大型数据集上运行 Predict proba 我目前使用以下方法对分类器进行腌制 import pickle pickle dump clf open cl
  • 为什么构造函数参数要成为案例类的成员?

    class MyClass name String val x new MyClass x println x name Error name is not a member of MyClass but abstract class Ba
  • 为什么 VectorBuilder 位于 scala.collections.immutable 包中?

    VectorBuilder在同一源文件中定义为Vector Vector是不可变的并且在scala collections immutable包 因此构建器位于同一个包中 据我所知 CanBuildFrom uses a VectorBui
  • Scala:如何将“MatchesRegex”细化与包含反引号的正则表达式(细化库)一起使用?

    The refined https github com fthomas refined库允许定义与给定匹配的细化regex 如图所示Readme import eu timepit refined import eu timepit re
  • 如何在 Spark 中创建空数据帧

    我有一组基于 Avro 的配置单元表 我需要从中读取数据 由于Spark SQL使用hive serdes从HDFS读取数据 因此比直接读取HDFS慢很多 因此 我使用数据块 Spark Avro jar 从底层 HDFS 目录读取 Avr
  • Scala 中的多个类型下限

    我注意到tuple productIterator总是返回一个Iterator Any 想知道是否无法设置多个下限 因此它可能是最低公共超类型的迭代器 我尝试并搜索了一下 但只发现this https stackoverflow com q
  • 重写继承的构造函数字段时的差异?

    考虑这个简单的 Scala 类 class A val d Int Scala 之间是否存在差异 无论是行为还是生成的字节码 class B d Int extends A d and class B override val d Int
  • SBT - 运行任务来设置SettingKey

    所以我的一般问题是我想根据任务的结果设置版本密钥 但是版本密钥是在任务运行之前设置的 据我了解 一旦设置了键的值 我就无法更改它 因此我无法在我的任务中更改它 我想要做的是将任务作为发布任务的依赖项运行并更改版本的值 我觉得一定有办法做到这
  • Delta Lake 独立于 Apache Spark?

    我一直在探索数据湖屋概念和 Delta Lake 它的一些功能看起来真的很有趣 就在项目主页上https delta io https delta io 有一个图表显示 Delta Lake 运行在 您现有的数据湖 上 但没有提及 Spar
  • 我们可以在比赛中重用后卫内部的中间变量吗?

    说我有方法foo as def foo i Int Option Int some code 现在我想在一个Seq of Int如下 Seq 1 2 map case int gt foo int collect case Some int
  • 计算行的排名

    我想根据一个字段对用户 ID 进行排名 对于相同的字段值 排名应该相同 该数据位于 Hive 表中 e g user value a 5 b 10 c 5 d 6 Rank a 1 c 1 d 3 b 4 我怎样才能做到这一点 可以使用ra
  • Spark 中的广播 Annoy 对象(对于最近邻居)?

    由于 Spark 的 mllib 没有最近邻居功能 我正在尝试使用Annoy https github com spotify annoy为近似最近邻 我尝试广播 Annoy 对象并将其传递给工人 然而 它并没有按预期运行 下面是可重复性的
  • 在 Spark Dataframe 中提取数组索引

    我有一个带有数组类型列的数据框 例如 val df List a Array 1d 2d 3d b Array 4d 5d 6d toDF ID DATA df org apache spark sql DataFrame ID strin
  • Spark SQL / PySpark 中的逆透视

    我手头有一个问题陈述 其中我想在 Spark SQL PySpark 中取消透视表 我已经浏览了文档 我可以看到仅支持pivot 但到目前为止还不支持取消透视 有什么方法可以实现这个目标吗 让我的初始表如下所示 When I pivotPy
  • 实施策略模式的函数式方法

    我正在尝试解决一个处理从一种温度单位到另一种温度单位 摄氏度 开尔文 华氏度 转换的问题 在Java中 我需要创建一个接口并提供多个实现来封装输入类型并将结果作为输出类型的单元返回 例如开尔文到摄氏度或摄氏度到华氏度等 我已经在 scala
  • 如何从命令行运行scala文件?

    scala是否支持scala run xxx scala go语言支持这样运行 go my go 并且Python支持 python my py 但看来 scala xxx scala 仅进行语法检查 未观察到任何输出或运行行为 那么有没有
  • 如何过滤 pyspark 列表中值的列?

    我有一个数据框原始数据 我必须在 X 列上应用值 CB CI 和 CR 的过滤条件 所以我使用了下面的代码 df dfRawData filter col X between CB CI CR 但我收到以下错误 Between 恰好需要 3
  • Scala SBT 和 JNI 库

    我正在编写一个简单的应用程序Scala通过以下方式使用 leveldb 数据库leveldbjni图书馆 我的build sbt文件看起来像这样 name Whatever version 1 0 scalaVersion 2 10 2 l
  • 选项包装值是一个好的模式吗?

    我最近写了以下 Scala 代码 val f File pretend this file came from somewhere val foo toFoo io Source fromFile f mkString 我真的不喜欢这种方式
  • Scala 2.10,它对 JSON 库和案例类验证/创建的影响

    显然 在 Scala 2 10 中我们得到了改进的反射 这将如何影响 lift json jerkson sjson 和朋友 此外 我们能否期望在不久的将来 Scala 中会出现内置的 JSON 语言功能 如 Groovy 的出色 GSON

随机推荐

  • Apache Camel 根据请求使用文件内容丰富消息

    我正在实现 RESTful 服务 使用 CXFRS 组件 它应该返回某些请求的文件 每个文件都通过其 id 和扩展名来获取 即restfulservice com path file 1 pdf 每个文件一旦添加就不会改变 文件在获取后不应
  • 如何为 2 个不同的视图重用一个控制器?

    我定义了一个控制器 并将其应用于两个有细微差别的视图 角度代码 app controller MyCtrl function scope scope canSave false scope demo files filename aaa h
  • 无法在 Istio 代理后面的 k8s 中建立与 VerneMQ 集群的 mqtt 连接

    我正在设置 k8s 本地 k8s 集群 对于测试 我在使用 kubeadm 设置的虚拟机上使用单节点集群 我的要求包括在 k8s 中运行 MQTT 集群 vernemq 并通过 Ingress istio 进行外部访问 无需部署 ingre
  • @GenerateValue with Strategy=GenerationType.AUTO 重启后生成重复值

    我有一个 ID 配置为的休眠实体 Id GeneratedValue strategy GenerationType AUTO private Long id 新元素的创建在第一次运行时工作正常 但是 如果我重新启动应用程序并检索记录 下次
  • 如何配置 prometheus-operator 从 Kubernetes 上的 cAdvisor 收集?

    我在用普罗米修斯操作员 https github com coreos prometheus operator管理一个普罗米修斯 https prometheus io 部署在我的库伯内斯 https kubernetes io 簇 该设置
  • VSTO - 存储 Excel 工作簿设置的最佳位置

    我有一个用 VBA 实现的旧版 Excel AddIn 我正在使用 VSTO 慢慢将其移植到 net 使用此插件 我将每个工作簿的设置存储在隐藏工作表中 我想知道是否有更好的方法使用 VSTO 来做到这一点 您可以使用自定义文档属性 htt
  • 解析器中的运算符优先级和结合性 (Haskell)

    我正在尝试扩展递归下降解析器来处理新运算符并使它们正确关联 最初只有四个运算符 并且它们都具有相同的优先级 我正在查看的函数是 parseExpRec 函数 parseExpRec Exp gt Token gt Exp Token par
  • MySQL - 选择最近 10 位作者的最新帖子

    我有一个包含许多不同作者的博客文章的表 我想做的是显示 10 位最新作者各自的最新帖子 每个作者的帖子只是按顺序添加到表中 这意味着单个作者可能会发布多篇帖子 我花了很多时间想出一个查询来做到这一点 这给了我最后 10 个唯一的作者 ID
  • 使用 Gradle,如何打印每个任务执行所需的时间?

    现在 对于频繁运行的 gradle 目标之一 输出如下所示 DataPlanner clean common clean server clean simulator clean util clean util compileJava ut
  • 美丽汤无法“获取”完整网页

    我正在使用 BeautifulSoup 来解析来自的一堆链接但它并没有提取我想要的所有链接 为了尝试找出原因 我将 html 下载到 web page html 并运行 soup BeautifulSoup open web page ht
  • 在 Flash AS3 中捕获未处理的 IOErrorEvent

    错误 2044 未处理的 IOErrorEvent text 错误 2036 从不加载 完全的 这就是我每次尝试使用加载器加载不存在的图像时看到的情况 我正在获取 URL 列表 但无法验证它们是否指向任何有用的内容 每当遇到 404 时 它
  • URL 重写破坏了 CSS 链接

    我使用以下设置进行网址重写 RewriteEngine On RewriteCond REQUEST FILENAME d RewriteCond REQUEST FILENAME f RewriteRule index php url 1
  • October CMS:如何扩展后端用户的角色范围

    我已经能够延长Backend Models User类并添加一个范围查询方法以仅检索超级用户 public function boot User extend function model model gt addDynamicMethod
  • 如何自动链接本地npm包?

    我正在构建两个相互依赖的私有 npm 包 说我有 project my commons package json name my commons version 0 0 1 my server package json dependenci
  • Beaglebone Black 上的 GPIO

    我目前遇到了 Beaglebone black GPIO 引脚的问题 我正在寻找一种正确的方法来读取 C 中的 GPIO 引脚 p8 4 的值 如果我理解正确的话 我尝试使用一个库 该库使用了在引入设备树之前不支持的旧方法 我尝试寻找其他解
  • 如果方法只需要 ajax 调用,会返回什么错误?

    如果操作期望仅通过 AJAX 使用 但在没有正确的 ajax 标头的情况下调用 则操作应返回什么 HTTP 状态 我觉得我应该指出一些错误 但我真的找不到合适的错误 我想最好是 405 Method not allowed 但是如果例如 a
  • PowerShell 中的“net use”不指定驱动器

    通过 net use 您可以执行以下操作 net use server user domian username 然后 它会提示输入密码 并且使用任何程序 cmd Explorer Word 等 与该服务器建立的任何进一步 CIFS 连接都
  • 验证 TextBox 中的文本更改

    我已经在 WinForm 中的文本框上实现了验证规则 并且效果很好 但是 只有当我跳出该字段时 它才会检查验证 我希望它在框中输入任何内容以及每次内容发生变化时立即进行检查 我还希望它在 WinForm 打开后立即检查验证 我记得最近通过设
  • 一页上有多个夏季笔记 div

    我正在尝试获取特定夏季笔记 div 的代码 其中单个页面上有多个笔记 div 我的暑假笔记是用 php 从数据库创建的 如下所示 div class tab content div class tab pane div class summ
  • 如何在大窗口上优化窗口聚合?

    我在 Spark 2 4 4 中使用带有大窗口的窗口函数 例如 Window partitionBy id orderBy timestamp 在我的测试中 我有大约 70 个不同的 ID 但我可能有大约 200 000 行 ID 如果没有