Spark中如何获取map任务的ID?

2024-06-19

Spark中有没有办法获取map任务的ID?例如,如果每个映射任务都调用用户定义的函数,我可以从该用户定义的函数中获取该映射任务的 ID 吗?


我不确定您所说的地图任务 ID 是什么意思,但您可以使用以下方式访问任务信息TaskContext:

import org.apache.spark.TaskContext

sc.parallelize(Seq[Int](), 4).mapPartitions(_ => {
    val ctx = TaskContext.get
    val stageId = ctx.stageId
    val partId = ctx.partitionId
    val hostname = java.net.InetAddress.getLocalHost().getHostName()
    Iterator(s"Stage: $stageId, Partition: $partId, Host: $hostname")
}).collect.foreach(println)

Spark 2.2.0 中的 PySpark 添加了类似的功能(SPARK-18576 https://issues.apache.org/jira/browse/SPARK-18576):

from pyspark import TaskContext
import socket

def task_info(*_):
    ctx = TaskContext()
    return ["Stage: {0}, Partition: {1}, Host: {2}".format(
        ctx.stageId(), ctx.partitionId(), socket.gethostname())]

for x in sc.parallelize([], 4).mapPartitions(task_info).collect():
    print(x)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark中如何获取map任务的ID? 的相关文章

  • 与 aws-java-sdk 链接时读取 json 文件时 Spark 崩溃

    Let config json是一个小的 json 文件 toto 1 我编写了一个简单的代码来读取 json 文件sc textFile 因为文件可以在S3 本地或HDFS上 所以textFile很方便 import org apache
  • 如何将巨大的pandas数据帧保存到hdfs?

    我正在使用 pandas 和 Spark 数据框 数据帧总是非常大 gt 20 GB 标准 Spark 函数不足以满足这些大小 目前 我将 pandas 数据框转换为 Spark 数据框 如下所示 dataframe spark creat
  • 如何在 HBase 中预分割表

    我将数据存储在具有 5 个区域服务器的 HBase 中 我使用 url 的 md5 哈希作为我的行键 目前 所有数据仅存储在一台区域服务器中 所以我想预先分割区域 以便数据在所有区域服务器上统一传输 我希望通过行键的第一个字符将表分成五个区
  • R和spark:比较不同地理点之间的距离

    我正在处理纽约市出租车数据集 该数据集的列包括日期时间 接送纬度 经度 下车纬度 经度等 现在我想对纬度 经度进行反向地理编码以找到行政区 社区 我有两个数据框 1 第一个数据框包含我想要用最近的纽约社区名称进行分类的所有点 2 第二个数据
  • 从 Monoids 的 HList 类型派生 0 的 HList

    我正在学习 Shapeless 目前我正在尝试创建一个执行以下操作的函数 给定一个类型HList它返回HList of Nones 与Option对应于给定的类型HList type 例如 create String Int HNil re
  • 我可以使用从 Java 实现 java 接口的 scala 类吗?

    我正在学习 Scala 很好奇是否可以 创建一个在 Scala 中实现 Java 接口的对象 将对象编译成类文件并将其打包 使用 Java 中的对象 我想在 scala 中实现一个自定义的 lucene 查询解析器 并能够让其他人从 jav
  • Scala 插入列表中的特定位置

    这是我确实解决的问题 但是作为一个完全命令式的 Scala 菜鸟 我觉得我发现了一些完全不优雅的东西 任何改进的想法表示赞赏 val l1 4 1 2 3 4 Nil original list val insert List 88 99
  • HDFS:使用 HDFS API 附加到 SequenceFile

    我一直在尝试使用 Java API 在 HDFS 上创建和维护序列文件 而不运行 MapReduce 作业作为设置用于未来的 MapReduce 工作 我想将 MapReduce 作业的所有输入数据存储在单个序列文件中 但数据会随着时间的推
  • 如何抑制 EMR 上运行的 Spark-sql 的 INFO 消息?

    我正在 EMR 上运行 Spark 如中所述在 Amazon Elastic MapReduce 上运行 Spark 和 Spark SQL https aws amazon com articles 4926593393724923 本教
  • PHP中通过Hive/Thrift查询数据库不起作用

    我正在尝试通过 PHP 中的 Hive Thrift 查询数据库 但是 我不断收到错误 TSocket timed out reading 4 bytes from XYZ 我正在使用来自的代码 https cwiki apache org
  • Spark:连接两个相同分区的数据帧时防止洗牌/交换

    我有两个数据框df1 and df2我想在一个名为的高基数字段上多次加入这些表visitor id 我只想执行一次初始洗牌 并让所有连接发生 而无需在 Spark 执行器之间洗牌 交换数据 为此 我创建了另一个名为visitor parti
  • zip 样式 @repeat 嵌套形式

    repeat非常有用 然而 我遇到了嵌套表单的障碍 我需要制作一个比赛日程表 它有 2 个属性 日程数据 比赛日期 时间 地点 对手 和提交球队备注 例如 由于冬季风暴 1 月 7 日的比赛已移至1 月 9 日在 夏威夷 表单映射基于 ca
  • 在使用 Phoenix 4.5 的 CDH 5.4 上运行 Spark 作业时未找到 PhoenixOutputFormat

    我通过重新编译源代码设法在 Cloudera CDH 5 4 上配置 Phoenix 4 5 sqlline py效果很好 但火花有问题 spark submit class my JobRunner master yarn deploy
  • Spark UDF 错误 - 不支持 Any 类型的架构

    我正在尝试创建一个 udf 它将列中的负值替换为 0 我的数据框名为 df 包含一列名为 avg x 这是我创建 udf 的代码 val noNegative udf avg acc x Double gt if avg acc x lt
  • Spark sql 每组前 n 个

    我怎样才能获得每组的前n名 比如说前10名或前3名 spark sql http www xaprb com blog 2006 12 07 how to select the firstleastmax row per group in
  • 这个错误是什么意思(SimpleHttpConnectionManager 被错误使用)?

    我正在尝试从 ElasticSearch 中读取数据到 Spark conf es resource sflow sflow es nodes ES01 es query some query rdd sc newAPIHadoopRDD
  • Spark中分布式读取CSV文件

    我正在开发一个 Spark 处理框架 它读取大型 CSV 文件 将它们加载到 RDD 中 执行一些转换 最后保存一些统计数据 相关 CSV 文件平均大小约为 50GB 我正在使用 Spark 2 0 我的问题是 当我使用sparkConte
  • 从单个字符串创建 Spark DataFrame

    我正在尝试采用硬编码字符串并将其转换为 1 行 Spark DataFrame 具有单列类型StringType 这样 String fizz buzz 将得到一个 DataFrame 其 show 方法如下 fizz buzz 迄今为止我
  • mssql 的 UUID 疯狂

    我的数据库条目有一个 UUID 及其值 使用 Microsoft SQL Server Management Studio 提取 CDF86F27 AFF4 2E47 BABB 2F46B079E98B 将其加载到我的 Scala 应用程序
  • JavaFX 控制器如何访问其他服务?

    我将 JavaFX 2 与 Scala 一起使用 我有class Application extends javafx application Application它执行诸如读取应用程序配置等操作 然后它会启动主窗口 该主窗口需要连接到一

随机推荐

  • 如何使用 WKWebView 正确实施身份验证质询?

    我正在构建一个网络浏览器 但在网络方面我真的是新手 我想测试下面的代码示例 但我没有现实生活中的示例可以使用 void webView WKWebView webView didReceiveAuthenticationChallenge
  • 该变量未声明或从未分配警告

    这是基类 public class BaseClass UserControl protected ListView list protected TreeView tree public BaseClass 儿童班 public part
  • router.navigate 不起作用(Angular6,延迟加载)

    我是 Angular 4 的新手 目前使用 v 6 我一直在尝试使用this router navigate 登陆 从登录组件重定向到登陆组件的功能 它无法正常工作 它将显示登录页面一秒钟 然后再次重定向回登录页面 但是 例如 如果我尝试浏
  • 在 for 循环中修改列表元素

    我有一个清单a我想更改其元素a i j 根据一个函数f 我能比天真的方式做得更好吗 for index in range i j a index f a 我所说的更好是指更接近于map f a 或者更快的东西 您可以分配给切片 a i j
  • 用于验证 ip 列表中的 ip 范围的正则表达式

    我有正则表达式用于验证 50 个 ips 逗号分隔的列表 25 0 5 2 0 4 0 9 01 0 9 0 9 3 25 0 5 2 0 4 0 9 01 0 9 0 9 1 50 列表示例 10 10 10 1 127 0 0 1 现在
  • extern 关键字对 C 函数的影响

    在C中 我没有注意到任何影响extern在函数声明之前使用关键字 起初 我认为在定义时extern int f 在单个文件中forces您可以在文件范围之外实现它 然而我发现两者 extern int f int f return 0 an
  • 使用底格里斯河从纬度/经度获取人口普查区

    我有相对较多的坐标 我想获取其人口普查区 除了 FIPS 代码 我知道我可以使用以下命令查找各个纬度 经度对call geolocator latlon 已完成here https stackoverflow com questions 5
  • 查找 Ivy 中隐藏的依赖项

    我使用 Apache Ivy IvyDE 来获取项目的依赖项 它们是
  • UITableViewCell 的 viewDidAppear

    我通常使用viewDidAppear方法在视图完成出现后在视图上执行一些 UI 操作 我在各种情况下使用了此方法 它非常有用 但是 我需要在视图上进行一些 UI 更改UITableViewCell当它完成出现后 SDK中是否有任何可用的方法
  • CSS3 中均匀间隔的导航链接占据 ul 的整个宽度

    我想创建一个水平导航链接列表 其中导航链接均匀分布并占据封闭容器的整个宽度 ul 导航链接可以有不同的宽度 第一个和最后一个链接应与链接的开头和结尾对齐 ul 分别 意味着链接不居中 如下所示 left side right side li
  • UI图像位置

    我使用以下代码在 UIView 中放置一些图像 UIImage image UIGraphicsBeginImageContext CGSizeMake 480 320 int k 0 int posY 0 for int i 0 i lt
  • 您可以为 None 指定类型参数或告诉编译器它是一个 Option[String] 吗?

    我想知道我是否可以在我的代码中写这样的东西 None String 我很惊讶没有人提到它的存在Option empty scala gt Option empty String res0 Option String None 请注意 在许多
  • 无法找到请求的工厂 com.ctc.wstx.stax.WstxInputFactory

    我正在构建 Oracle Agile PLM CustomAction Px 我在Px内部调用了一个web服务来处理一些数据 我部署后 它给出了 类未找到异常 javax xml ws Service 所以我复制了jaxws api 2 1
  • Python中如何知道文件的编码? [复制]

    这个问题在这里已经有答案了 有谁知道如何在Python中获取文件的编码 我知道您可以使用编解码器模块打开具有特定编码的文件 但您必须提前知道它 import codecs f codecs open file txt r utf 8 有没有
  • 使用 shell 脚本将行附加到 /etc/hosts 文件

    我有一个新的 Ubuntu 12 04 VPS 我正在尝试编写一个安装脚本来完成整个 LAMP 安装 我遇到问题的地方是在 etc hosts文件 我当前的主机文件如下所示 127 0 0 1 localhost Venus The fol
  • python 排列有问题

    我在排列方面遇到一些问题 当谈到Python时 我真的是一个大菜鸟 所以任何帮助将不胜感激 假设我在文本文件中有一个范围为 1 6 的列表 例如 它看起来像 1 2 3 4 5 6 我想打开所述 txt 文件并计算这 6 个数字中 N 的所
  • Angular 中有主控制器好吗?

    我不知道这是否是一个好的做法 我在路由配置中定义了一个控制器 但是因为我的HomeCtrl is in ng if他听不到的声明loginSuccess所以我做了MainCtrl它监听loginSuccess并做出适当的反应 这段代码工作得
  • 如何将 DataFrame 作为输入传递给 Spark UDF?

    我有一个数据框 我想对每一行应用一个函数 该函数依赖于其他数据帧 简化的例子 我有如下三个数据框 df sc parallelize a b 1 c d 3 toDF feat1 feat2 value df other 1 sc para
  • 蓝图初始化,我可以在第一次请求蓝图之前运行函数吗

    是否可以在第一次请求特定对象之前运行一个函数blueprint my blueprint before first request def init my blueprint print yes 目前这会产生以下错误 AttributeEr
  • Spark中如何获取map任务的ID?

    Spark中有没有办法获取map任务的ID 例如 如果每个映射任务都调用用户定义的函数 我可以从该用户定义的函数中获取该映射任务的 ID 吗 我不确定您所说的地图任务 ID 是什么意思 但您可以使用以下方式访问任务信息TaskContext