如何在 apache Spark 作业中执行阻塞 IO?

2024-06-18

如果当我遍历 RDD 时,我需要通过调用外部(阻塞)服务来计算数据集中的值怎么办?您认为如何才能实现这一目标?

值:Future[RDD[Double]] = Future sequence tasks

我尝试创建一个 Futures 列表,但由于 RDD id 不可遍历,Future.sequence 不适合。

我只是想知道,如果有人遇到这样的问题,你是如何解决的? 我想要实现的是在单个工作节点上获得并行性,这样我就可以调用该外部服务3000次/次second.

可能还有另一种解决方案,更适合 Spark,例如在单个主机上拥有多个工作节点。

有趣的是,你如何应对这样的挑战?谢谢。


这是我自己的问题的答案:

val buckets = sc.textFile(logFile, 100)
val tasks: RDD[Future[Object]] = buckets map { item =>
  future {
    // call native code
  }
}

val values = tasks.mapPartitions[Object] { f: Iterator[Future[Object]] =>
  val searchFuture: Future[Iterator[Object]] = Future sequence f
  Await result (searchFuture, JOB_TIMEOUT)
}

这里的想法是,我们获得分区的集合,其中每个分区都被发送到特定的工作人员,并且是最小的工作。每一项工作都包含数据,可以通过调用本机代码并发送该数据来处理这些数据。

“values”集合包含从本机代码返回的数据,并且该工作是在整个集群中完成的。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 apache Spark 作业中执行阻塞 IO? 的相关文章

  • K均值||用于 Spark 上的情感分析

    我正在尝试编写基于Spark的情感分析程序 为此 我使用了 word2vec 和 KMeans 聚类 从 word2Vec 我在 100 维空间中得到了 20k 个单词 向量集合 现在我正在尝试对这个向量空间进行聚类 当我使用默认并行实现运
  • 将java线程分布在多个服务器上?

    我对 java 很陌生 并且非常喜欢学习它 我编写了一个运行良好的程序 但当我添加更多数据来处理时 它会花费一些时间 我把它做成了线程化 它确实加快了很多速度 但现在我正在考虑尝试加快它的速度 显然 它需要处理的数据越多 所需的时间就越长
  • 缓存隐式解析

    为了减少项目的编译时间 我缓存了通过隐式查找解析的某些类型类 但这看起来有点麻烦 因为直接的实现不起作用 scala gt implicit val x String implicitly String x String null 隐式查找
  • Scala 将字符串转换为映射

    转换这个最快的方法是什么 a ab b cd c cd d de e ef f fg 进入 scala 中的可变映射 我从 500MB 文件中读取了这个输入字符串 这就是我关心速度的原因 如果您的 JSON 像您的示例一样简单 即一系列键
  • 使用 pySpark 在 Azure Databricks 中使用来自 EventHub 的事件

    我可以看到 Spark 连接器和使用 Azure Databricks 中的 Scala 从事件中心消费事件的指南 但是 我们如何使用 pySpark 从 azure databricks 消费事件中心中的事件 任何建议 文档详细信息都会有
  • Python:对于优化问题,使用多处理比循环慢得多。我究竟做错了什么?

    必须保证在发布此内容之前我已经阅读了有关该主题的许多帖子 我知道多重处理需要固定成本 但据我所知 这似乎不是这里的问题 我基本上有许多单独的优化问题 并且想要并行解决它们 下面的代码是一个简单的例子 import psutil import
  • 正确地将 for 循环转换为并行循环

    我这里有这个数据集 例如 学生在几年内多次参加考试 要么通过 要么失败 我有兴趣研究上一次测试对下一次测试的影响 id sample int 10000 100000 replace TRUE res c 1 0 results sampl
  • 工作人员未正确返回的结果 - 雪 - 调试

    我正在使用snow在 R 中封装以在 a 上执行函数SOCK具有在 Linux 操作系统上运行的多台计算机 3 的集群 我尝试用两者运行代码parLapply and clusterApply 如果工作线程级别出现任何错误 工作节点的结果将
  • 当恰好有一个选项非空时执行某项操作

    如果两个选项之一非空 我想计算一些东西 显然这可以通过模式匹配来完成 但是有更好的方法吗 o1 o2 match case Some o None gt Some compute o case None Some o gt Some com
  • 如何在 Lift 中反序列化 DateTime

    我在将 org joda time DateTime 字段从 JSON 反序列化到案例类时遇到问题 JSON val ajson parse creationDate 2013 01 02T10 48 41 000 05 00 我还设置了这
  • 从 aws Glue 脚本调用存储过程

    ETL 作业完成后 在 AWS Glue 脚本中调用存储过程的最佳方式是什么 我正在使用 PySpark 从 S3 获取数据并将其存储在临时表中 在这个过程之后 需要调用一个存储过程 该存储过程将数据从临时表加载到相应的 MDS 表中 如果
  • 如何跟踪通过elastic4s客户端发送到Elasticsearch的json请求?

    假设我使用这样的代码 ElasticClient client client execute search in places gt cities query paris start 5 limit 10 如何查看发送到 Elasticse
  • 如何抑制 EMR 上运行的 Spark-sql 的 INFO 消息?

    我正在 EMR 上运行 Spark 如中所述在 Amazon Elastic MapReduce 上运行 Spark 和 Spark SQL https aws amazon com articles 4926593393724923 本教
  • 登录模块控制标志在 JAAS 配置中不可用 - Scala Kafka

    尝试使用 kerberos 身份验证连接到 Kafka 时遇到问题 使用 scala 和我的jaas config看起来像这样 KafkaClient com sun security auth module Krb5LoginModule
  • 在使用 Phoenix 4.5 的 CDH 5.4 上运行 Spark 作业时未找到 PhoenixOutputFormat

    我通过重新编译源代码设法在 Cloudera CDH 5 4 上配置 Phoenix 4 5 sqlline py效果很好 但火花有问题 spark submit class my JobRunner master yarn deploy
  • 如何区分spark中的操作是转换还是动作?

    最近在学习spark 对transformation和action操作很困惑 我阅读了spark文档和一些关于spark的书籍 我知道action会导致spark作业在集群中执行 而transformation则不会 但是spark的api
  • Scala Eclipse 自动完成功能损坏?

    我正在尝试让自动完成功能在 Eclipse 中用于 Scala 开发 我试图从 Scala 类引用 java 类 但自动完成功能从未找到它 例如 以这个 scala 类为例 object Main def main args Array S
  • 仅使用 Spark ML Pipelines 进行转换

    我正在开发一个项目 其中可配置的管道和 Spark DataFrame 更改的沿袭跟踪都是必不可少的 该管道的端点通常只是修改后的 DataFrame 将其视为 ETL 任务 对我来说最有意义的是利用现有的 Spark ML Pipelin
  • 在 Pandas UDF PySpark 中传递多列

    我想计算 PySpark DataFrame 两列之间的 Jaro Winkler 距离 Jaro Winkler 距离可通过所有节点上的 pyjarowinkler 包获得 pyjarowinkler 的工作原理如下 from pyjar
  • scala 中“迭代 Seq 或如果为空”的更好版本?

    是否有更短 更好的方法来执行以下操作 mySeq map elmt gt do stuff if mySeq isEmpty some other stuff Ps 我正在使用 PlayFramework 这意味着在模板中使用 所以如果我错

随机推荐

  • 使用 geom_tile() 的 R ggplot 热图:如何按年份排序并在 y 轴上显示所有年份?

    在研究了哈德利的书并在这里搜索后 我创建了一个由年和月矩阵组成的热图 颜色根据时间序列变量的百分比变化而变化 热图和我用来获取它的代码如下所示 我还有一些我自己无法解决的问题 1 如何对矩阵进行排序 排序 以便 y 轴中的年份从最早到最晚
  • .Net Standard项目,如何使用System.ServiceModel类

    我已将 PCL 项目转换为 Net Standard 1 5 项目 我已经从 nuget 安装了 System ServiceModel 包 但找不到使用 System ServiceModel 命名空间的方法 任何帮助表示赞赏 安装 Sy
  • linux-x64 二进制文件无法在 linuxmusl-x64 平台上使用错误

    我正在安装Sharp用于使用 package json 的 Nodejs 项目的 docker 映像上的映像压缩包 当我创建容器时 我收到有关 Sharp 包的以下错误 app node modules sharp lib libvips
  • 在 Rails 中呈现路由错误的 404 页面

    我试图在 Rails 中渲染集成的 404 页面作为例外 我尝试了这个 但仍然收到路由错误页面 posts controller rb def destroy if current user username post email post
  • 使用 PropertyEditor (ControlsFX) 的属性表示例

    我一直在寻找使用 ControlsFX 属性表的任何好例子 但除了这个之外找不到任何东西 在此示例中 包含 NameItem 对象的 ObservableList 项被添加到其构造函数中的 PropertySheet 对象中 就像文档所述一
  • Ubuntu 16 LTS - Eclipse 窗口首选项无法正常工作

    我刚刚安装了 Ubuntu 16 04 LTS 我于 3 月 2 日下载了适用于 Linux 64 位的全新 Eclipse 我使用的是最新的 Oracle 热点 JDK 1 8 update 91 版本 在尝试启用 Eclipse 常规首
  • 如何在 Snowflake 中编写等效的 IF ELSE 即席 SQL 查询

    我正在尝试创建一个与以下基于 T SQL 的即席查询等效的 Snowflake T SQL version Declare i int 0 If i 0 PRINT 0 Else Begin PRINT 1 RETURN PRINT 2 E
  • 使用 Hibernate 防止 SQL 注入

    我正在使用 Hibernate 我知道你可以使用 HQL 来防止 SQL 注入 String query1 from Obj where id id String query2 from Obj where id id query1不安全
  • 使用 webkit 转换 Html 到 PDF

    从 Html 生成 PDF 时 webkit 转换不起作用 我需要将 div 旋转 45 度 使用 webkit 变换后 它在屏幕上看起来没问题 但使用 winnovatives Html 到 PDF 转换器时 输出是平坦的而不是旋转的 有
  • 订阅的角度替代方案?

    我使用的是异步订阅 因此计数器变量在更新之前被返回 是否有另一种订阅方法 允许我仅在计数器变量更新为从后端获取的值后才返回计数器变量 makeOffer product string offer number number let form
  • redux 和 React 中减速器的先前状态

    这是我的减速器的样子 export default function catalogReducer state initialState catalogItems action switch action type case types L
  • 将 HTML 转换为 Excel 的最佳方法是什么

    我有一个 HTML 页面 其中包含 Flash 图表 FusionCharts 和 HTML 表格 我需要将这整个事情转换成Excel HTML 表格应显示在 Excel 工作表的单元格中 Flash 图表可以显示为图像 我们可以使用任何开
  • 使用.NET Core 3.0添加swagger时出现异常

    我正在尝试将 swagger 融入ASP NET Core 3 0项目 它在中抛出异常ConfigureServices method 我在用Swashbuckle AspNetCore 4 0 1 我也检查过这个issue https s
  • TortoiseGit - 更改默认合并消息

    系统描述 Windows 7的 git版本2 10 1 windows 1 乌龟Git 2 3 0 0 I want 合并提交消息在不同的情况下有所不同fully自动方式 no manual amend Summary 在windows上
  • C# 控制台应用程序 - cmd.exe 挂起

    我在 Visual Studio 2013 中运行简单的 C 控制台应用程序时遇到问题 我的问题的详细信息 我成功运行控制台应用程序 默认的 按任意键继续 显示在最后 突然 它开始表现出不同的行为 并出现以下症状 一个新的命令窗口 cmd
  • android studio 和 netbeans 中输入扫描器和解析的不同行为

    我正在使用 NetBeans 测试基本代码 和 Android Studio 实际应用程序 读取相同的管道分隔文件 并得到不同的结果 这是有问题的代码 String URL http CalendarUTF8 Dec2016 txt try
  • Facebook C# SDK,用图片创建事件

    我想创建一个事件 但我只是不知道如何更改事件图片 我知道这是一个非常老的问题 但我仍然找不到任何解决方案 我很快就会放弃 请至少告诉我这是来自 Facebook 的错误还是其他任何东西 这是我的代码 Facebook FacebookCli
  • 正则表达式:如何匹配所有大于 954 的数字?

    I tried 0 9 d d 4 但它没有给出正确的结果 I 不会使用正则表达式因为你会陷入丑陋的模式链中 但是 如果仍然必须或想要使用它 您可以使用如下正则表达式 1 9 d 3 9 6 9 d 9 5 9 2 工作演示 https r
  • 如何在node.js中同步读取文件或Stream?

    请不要讲授关于我应该如何异步完成所有事情的讲座 有时我想以简单明显的方式做事 这样我就可以继续其他工作 由于某种原因 以下代码不起作用 它与我在 a 上找到的代码匹配最近的问题 https stackoverflow com questio
  • 如何在 apache Spark 作业中执行阻塞 IO?

    如果当我遍历 RDD 时 我需要通过调用外部 阻塞 服务来计算数据集中的值怎么办 您认为如何才能实现这一目标 值 Future RDD Double Future sequence tasks 我尝试创建一个 Futures 列表 但由于