在 Spark mapPartitions 中使用 Java 8 并行流

2023-12-26

我试图了解 Spark 并行性中 Java 8 并行流的行为。当我运行下面的代码时,我期望输出大小为listOfThings与输入大小相同。但事实并非如此,我的输出中有时会缺少一些项目。这种行为并不一致。如果我只是遍历迭代器而不是使用parallelStream, 一切安好。每次都计算匹配数。

// listRDD.count = 10
JavaRDD test = listRDD.mapPartitions(iterator -> {
    List listOfThings = IteratorUtils.toList(iterator);
    return listOfThings.parallelStream.map(
        //some stuff here
    ).collect(Collectors.toList());
});
// test.count = 9
// test.count = 10
// test.count = 8
// test.count = 7

  1. 这是一个非常好的问题。
  2. 这是怎么回事Race Condition。当您并行化流时,然后将完整列表流拆分为几个相等的部分[基于可用线程和列表大小],然后它尝试在每个可用线程上独立运行子部分来执行工作。

但您还使用 apache Spark,它以更快的计算速度而闻名,即通用计算引擎。 Spark 使用相同的方法[并行化工作]来执行操作。

现在,在这个场景中,发生的是 Spark 已经并行化了整个工作,然后在这个场景中,您再次并行化工作,因为竞争条件开始,即 Spark 执行器开始处理工作,然后您并行化工作,然后流进程获取其他线程并开始处理如果正在处理流工作的线程在 SPARK 执行器完成其工作之前完成工作,则它会添加结果,否则 SPARK 执行器将继续向 Master 报告结果。

  1. 这不是重新并行化工作的好方法,它总是会给您带来痛苦,让 Spark 为您做这件事。

希望你明白这里发生了什么

Thanks

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Spark mapPartitions 中使用 Java 8 并行流 的相关文章

  • Spark on Hive SQL 查询错误 NoSuchFieldError: HIVE_STATS_JDBC_TIMEOUT

    针对 Hive 2 1 0 提交 Spark 1 6 0 SQL 应用程序时出现错误 Exception in thread main java lang NoSuchFieldError HIVE STATS JDBC TIMEOUT a
  • 为什么我不能将 Scala 的 Function1 隐式转换为 java.util.function.Function?

    我正在尝试创建 Scala Function1 到 java util function Function 的隐式转换 这是我的代码 object Java8ToScala extends App implicit def javaFunc
  • 在java流api中指定Collector初始容量的一种优雅方法

    我试图找到一种在java流api中设置收集器初始容量的好方法 最简单的例子是 data stream collect Collectors toList 我只想将一个具有列表大小的 int 传递到收集器中 以免调整内部数组的大小 第一个意图
  • Foreach循环无法找到对象

    我正在尝试将 foreach 与并行后端结合使用来加速计算 用于特征选择的 AUCRF 随机森林的交叉验证 如果这确实重要的话 在这样做的过程中 我需要获取向量的子集 向量的名称可以更改 但可以作为字符向量进行访问 我使用 eval par
  • Scala Sparkcollect_list() 与 array()

    有什么区别collect list and array 在 Spark 中使用 scala 我看到到处都有使用情况 但我不清楚用例来确定差异 尽管两者array https spark apache org docs latest api
  • tbb:并行查找第一个元素

    我遇到了这个问题 查找列表中满足给定条件的第一个元素 不幸的是 该列表相当长 100 000 个元素 并且使用单个线程评估每个元素的条件总共需要大约 30 秒 有没有办法干净地并行化这个问题 我浏览了所有tbb模式 但找不到任何合适的 UP
  • Spark Streaming 中是否需要检查点

    我注意到 Spark 流示例也有检查点代码 我的问题是检查点有多重要 如果是为了容错 那么在此类流应用程序中发生故障的频率是多少 这一切都取决于您的用例 假设您正在运行一个流作业 它仅从 Kafka 读取数据并计算记录数 如果您的应用程序在
  • 将 Spark 添加到 Oozie 共享库

    默认情况下 Oozie 共享 lib 目录提供 Hive Pig 和 Map Reduce 的库 如果我想在 Oozie 上运行 Spark 作业 最好将 Spark lib jar 添加到 Oozie 的共享库 而不是将它们复制到应用程序
  • AsyncTask的并行执行

    An 异步任务单击时执行 List
  • 我可以在 Java 8 中使用 Clojure 函数作为 Lambda 函数吗?

    我在 Clojure 中使用了许多库来生成符合 Clojure lang IFN https github com clojure clojure blob master src jvm clojure lang IFn java 界面 它
  • Scala 中的行聚合

    我正在寻找一种方法在 Scala 的数据框中获取一个新列来计算min max中的值col1 col2 col10对于每一行 我知道我可以使用 UDF 来做到这一点 但也许有一种更简单的方法 Thanks Porting 这个Python答案
  • 使用列的长度过滤 DataFrame

    我想过滤一个DataFrame使用与列长度相关的条件 这个问题可能很简单 但我在SO中没有找到任何相关问题 更具体地说 我有一个DataFrame只有一个Column哪一个ArrayType StringType 我想过滤DataFrame
  • Apache Spark 何时发生混洗?

    我正在优化 Spark 中的参数 并且想确切地了解 Spark 是如何对数据进行洗牌的 准确地说 我有一个简单的字数统计程序 并且想知道spark shuffle file buffer kb如何影响运行时间 现在 当我将此参数设置得非常高
  • Parallel.ForEach - 优雅取消

    关于等待任务完成和线程同步的主题 我目前有一个迭代 我已将其包含在 Parallel ForEach 中 在下面的示例中 我在评论中提出了一些关于如何最好地处理循环的优雅终止的问题 NET 4 0 private void myFuncti
  • 将 Apache Zeppelin 连接到 Hive

    我尝试将我的 apache zeppelin 与我的 hive 元存储连接起来 我使用 zeppelin 0 7 3 所以没有 hive 解释器 只有 jdbc 我已将 hive site xml 复制到 zeppelin conf 文件夹
  • 具有多个参数和返回值的两个并行函数

    我有两个独立的功能 每一个都需要相当长的时间来执行 def function1 arg do some stuff here return result1 def function2 arg1 arg2 arg3 do some stuff
  • 如何在 Spark 数据帧 groupBy 中执行 count(*)

    我的目的是做相当于基本sql的事情 select shipgrp shipstatus count cnt from shipstatus group by shipgrp shipstatus 我见过的 Spark 数据帧的示例包括其他列
  • 使用多处理和 PySftp 并行下载

    我正在尝试创建一个代码来使用 pysftp 和多处理库下载相同类型的 N 个文件 我做了一个基本的Python训练 得到了一些代码并将它们组合成一个 但我无法让它工作 如果有人帮助我 我将不胜感激 该错误发生在 vFtp close 命令之
  • 使用 pyspark awsglue 时显示 DataFrame

    如何使用 awsglue 的 job etl 显示 DataFrame 我尝试了下面的代码 但没有显示任何内容 df show code datasource0 glueContext create dynamic frame from c
  • Jack(Java Android 编译器套件)将如何影响 Scala 开发人员

    现在随着公告Jack https source android com source jack html谷歌阐明了 Java 与 Android 相关的可预见的未来 但这对 Scala 和其他基于 JVM 的语言开发人员有何影响 尤其 Sc

随机推荐

  • Swift 一次删除多个对象 Parse 服务器

    我向服务器查询如下 let query PFQuery className posts query whereKey uuid equalTo Ncell uuidLbl text query findObjectsInBackground
  • 从远程 SQLite 数据库复制表?

    有没有办法将数据从一个远程 SQLite 数据库复制到另一个 我在两台服务器上完成了文件复制 但是 一些更改会记录在每个服务器本地的 SQLite 数据库中 为了使文件复制正常工作 我需要复制一个表的内容并将其输入到另一系统上的表中 我知道
  • 如何保护database.yml?

    在 Ruby on Rails 应用程序中 database yml 是一个存储数据库凭据的纯文本文件 当我部署 Rails 应用程序时 我的 Capistrano 中有一个部署后回调 在应用程序的 config 目录中创建到 databa
  • 如何在Ubuntu中生成核心转储文件[重复]

    这个问题在这里已经有答案了 我想知道如何在 Ubuntu 中生成核心转储文件 我使用的是 Ubuntu 8 04 1 和 gcc 编译器 4 2 3 我编写了一个简单的 C 程序来生成核心转储 我已经编译了该程序 如 gcc g badpo
  • 从 StructureMap 获取的 HttpContext 上的空用户

    好吧 我之前的问题 设置有太多变量 所以我将其精简为最基本的组件 给出使用 StructureMap3 的以下代码 IoC setup For
  • 回发或回调参数无效。为什么?

    所以我得到了例外 回发或回调参数无效 使用启用事件验证 在配置或 在 页 出于安全目的 这 功能验证参数 回发或回调事件发起 从服务器控制 最初渲染它们 如果数据 是有效且预期的 使用 ClientScriptManager Registe
  • libreoffice 大量文本颜色更改

    有没有办法更改 LibreOffice 或 Openoffice 中文本中所有出现的特定颜色 是 光标位于find box hit more options 在里面Search Replace对话 点击 Format select Font
  • 如何在Windows 7中设置Python路径[重复]

    这个问题在这里已经有答案了 我尝试在 Windows 7 中设置 python 的路径 但我不能这样做 我去 My Computer gt Properties gt Advanced gt Environment Variables 但我
  • 使用空格而不是制表符进行缩进的客观原因是什么?

    根据 PSR 2 标准使用空格而不是制表符来缩进文件是否有客观原因 有人可以提供 facts 参考 具体的专业知识 PSR 2 标准基于哪个 PSR 2 标准的作者考虑的不仅仅是 外观和感觉 不仅仅是基于意见的东西 而且很多人很难理解为什么
  • 将 Excel 导入 Rails 应用程序

    我正在创建一个供个人使用的小型 Rails 应用程序 并且希望能够上传 Excel 文件以便稍后进行验证并添加到数据库中 我之前曾对 csv 文件进行过此操作 但此后这已变得不切实际 有谁知道使用 roo 或电子表格 gem 上传文件 向用
  • 带颜色编码的 vb.net/C# 代码编辑器[关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 是否有任何带有颜色编码的 winforms 源代码编辑器控件 首选 开源 我好像记得以前遇到过类似的事情
  • PHP套接字服务器,检查客户端是否还活着

    我有一个 php 服务器正在监听 1 个 c 客户端 当连接建立后 它会一直保持活动状态 直到客户端发送 退出 命令来终止 PHP 服务器 但是 当 C 客户端在没有 退出 命令 即 单击 Windows 窗体中的关闭 x 按钮 的情况下断
  • 要包含或包含自动生成的依赖项?

    我喜欢用g MM自动构建我的依赖项的功能 我这样做的方法如下 include ALLOBJ o d d cxx echo making dependencies for lt g MM CXXFLAGS lt o sed i s o g 基
  • 分配时出现 JPEG 错误 #42

    为什么我不能直接将 MemoryStream 分配给图片 下面我发布了两种将 MemoryStream 分配给 TImage 的方法 方法 1 不起作用 方法 2 起作用 为什么 谢谢 山姆 方法 1 此方法返回 JPEG 错误 42 Va
  • jax-ws webservice 的端点始终是 localhost

    我真的需要你的帮助 我读到 jax ws web 服务的 wsdl 将为每个请求动态生成 这样 soap 端点等地址将被调整为请求 url 就我而言 无论是内部请求还是外部请求 地址始终引用 localhost 8080 某人知道我该如何处
  • C++ Linux 最快的时间测量方法(比 std::chrono 更快)?包含基准

    include
  • 为大型 URI 配置 Nginx

    我有一个很大的 URI 我正在尝试配置 Nginx 来接受它 URI参数长度为52000个字符 大小为52kb 我尝试过在没有 Nginx 的情况下访问 URI 效果很好 但是当我使用 Nginx 时 它给了我一个错误 414 请求 URI
  • 如何生成 3 列列表?

    我必须生成一个 3 列的项目列表 类似于此页面上不同群体 主要是银行和金融机构 可以看到的内容 http funds ft com FundDirectory aspx http funds ft com FundDirectory asp
  • 如何将 RDF 文件导入 Apache Solr

    我是 Apache Solr 的新手 我想将 rdf 文件导入 solr 进行索引 我用 google 搜索了它 但没有找到任何有用的东西 请给我一些指示 Solr 接受 JSON 文档 您可以将 RDF 文档转换为JSON LD http
  • 在 Spark mapPartitions 中使用 Java 8 并行流

    我试图了解 Spark 并行性中 Java 8 并行流的行为 当我运行下面的代码时 我期望输出大小为listOfThings与输入大小相同 但事实并非如此 我的输出中有时会缺少一些项目 这种行为并不一致 如果我只是遍历迭代器而不是使用par