丰富 SparkContext 而不会引发序列化问题

2023-12-21

我正在尝试使用 Spark 处理来自 HBase 表的数据。这篇博文 http://www.vidyasource.com/blog/Programming/Scala/Java/Data/Hadoop/Analytics/2014/01/25/lighting-a-spark-with-hbase给出了如何使用的示例NewHadoopAPI从任何 Hadoop 读取数据InputFormat.

我做了什么

由于我需要多次执行此操作,因此我尝试使用隐式来丰富SparkContext,这样我就可以从 HBase 中给定的一组列中获取 RDD。我编写了以下帮助程序:

trait HBaseReadSupport {
  implicit def toHBaseSC(sc: SparkContext) = new HBaseSC(sc)

  implicit def bytes2string(bytes: Array[Byte]) = new String(bytes)
}


final class HBaseSC(sc: SparkContext) extends Serializable {
  def extract[A](data: Map[String, List[String]], result: Result, interpret: Array[Byte] => A) =
    data map { case (cf, columns) =>
      val content = columns map { column =>
        val cell = result.getColumnLatestCell(cf.getBytes, column.getBytes)

        column -> interpret(CellUtil.cloneValue(cell))
      } toMap

      cf -> content
    }

  def makeConf(table: String) = {
    val conf = HBaseConfiguration.create()

    conf.setBoolean("hbase.cluster.distributed", true)
    conf.setInt("hbase.client.scanner.caching", 10000)
    conf.set(TableInputFormat.INPUT_TABLE, table)

    conf
  }

  def hbase[A](table: String, data: Map[String, List[String]])
    (interpret: Array[Byte] => A) =

    sc.newAPIHadoopRDD(makeConf(table), classOf[TableInputFormat],
      classOf[ImmutableBytesWritable], classOf[Result]) map { case (key, row) =>
        Bytes.toString(key.get) -> extract(data, row, interpret)
      }

}

它可以像这样使用

val rdd = sc.hbase[String](table, Map(
  "cf" -> List("col1", "col2")
))

在这种情况下,我们得到的 RDD 为(String, Map[String, Map[String, String]]),其中第一个组件是行键,第二个组件是一个映射,其键是列族,值是键是列、内容是单元格值的映射。

失败的地方

不幸的是,我的工作似乎得到了参考sc,它本身在设计上是不可序列化的。当我运行工作时我得到的是

Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)

我可以删除辅助类并在工作中使用相同的内联逻辑,一切都运行良好。但我想要得到一些可以重用的东西,而不是一遍又一遍地编写相同的样板。

顺便说一句,这个问题并不特定于隐式,即使使用函数sc表现出同样的问题。

为了进行比较,以下读取 TSV 文件的帮助程序(我知道它已损坏,因为它不支持引用等,没关系)似乎工作正常:

trait TsvReadSupport {
  implicit def toTsvRDD(sc: SparkContext) = new TsvRDD(sc)
}

final class TsvRDD(val sc: SparkContext) extends Serializable {
  def tsv(path: String, fields: Seq[String], separator: Char = '\t') = sc.textFile(path) map { line =>
    val contents = line.split(separator).toList

    (fields, contents).zipped.toMap
  }
}

如何封装从 HBase 读取行的逻辑,而不会无意中捕获 SparkContext?


只需添加@transient注释到sc多变的:

final class HBaseSC(@transient val sc: SparkContext) extends Serializable {
  ...
}

并确保sc未在以下范围内使用extract函数,因为它对工人不可用。

如果需要从分布式计算中访问 Spark 上下文,rdd.context可能会使用函数:

val rdd = sc.newAPIHadoopRDD(...)
rdd map {
  case (k, v) => 
    val ctx = rdd.context
    ....
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

丰富 SparkContext 而不会引发序列化问题 的相关文章

  • 使用 Cassandra 进行单元版本控制

    我的应用程序使用 AbstractFactory 作为 DAO 层 因此一旦实现了 HBase DAO 系列 创建 Cassandra DAO 系列并从多个角度查看差异对我来说将是非常好的 不管怎样 试图做到这一点 我看到 Cassandr
  • andThen 类型不匹配的 Scala 链接函数

    我有一堆函数可以清理文本并将它们分成单词 最小的例子 val txt Mary had a little nlamb val stopwords Seq a def clean text String String text replace
  • scala中的反引号有什么用[重复]

    这个问题在这里已经有答案了 我在一本书上找到了以下代码 val list List 5 4 3 2 1 val result 0 list running total next element running total next elem
  • Spark DataFrame 序列化为无效 json

    TL DR 当我倾倒 Spark 时DataFrame作为 json 我总是得到类似的结果 key1 v11 key2 v21 key1 v12 key2 v22 key1 v13 key2 v23 这是无效的 json 我可以手动编辑转储
  • 将 IndexToString 应用于 Spark 中的特征向量

    Context 我有一个数据框 其中所有分类值都已使用 StringIndexer 进行索引 val categoricalColumns df schema collect case StructField name StringType
  • 如何在 scala repl 和 sbt 控制台中关闭/打开 typer 阶段

    是否可以在不退出当前会话的情况下切换阶段 我尝试进入 power 模式 但它仍然不打印类型 在SBT中只需添加以下设置 set scalacOptions in Compile console Xprint typer 在 REPL 中你可
  • 如何在 sbt 控制台中加载 scala 文件? [复制]

    这个问题在这里已经有答案了 可能的重复 将 Scala 文件加载到解释器中以使用函数 https stackoverflow com questions 7383436 load scala file into interpreter to
  • 错误:无法找到或加载主类 org.apache.spark.launcher.Main [重复]

    这个问题在这里已经有答案了 如果有人能帮我解决以下路径问题 我将不胜感激 我非常怀疑这与缺少路径设置有关 但不知道如何修复它 rxie ubuntu Downloads spark echo PATH usr bin java usr lo
  • 根据 pyspark 中的条件从数据框中删除行

    我有一个包含两列的数据框 col1 col2 22 12 2 1 2 1 5 52 1 2 62 9 77 33 3 我想创建一个新的数据框 它只需要行 col1 的值 gt col2 的值 就像注释一样col1 很长类型和col2 有双
  • Spark 2.2 无法将 df 写入 parquet

    我正在构建一个聚类算法 我需要存储模型以供将来加载 我有一个具有以下架构的数据框 val schema new StructType add StructField uniqueId LongType add StructField tim
  • Scala 使用的 Redis 客户端库建议

    我正在计划使用 Scala 中的 Redis 实例进行一些工作 并正在寻找有关使用哪些客户端库的建议 理想情况下 如果存在一个好的库 我希望有一个为 Scala 而不是 Java 设计的库 但如果现在这是更好的方法 那么仅使用 Java 客
  • 纱线上的火花,连接到资源管理器 /0.0.0.0:8032

    我正在我的开发机器 Mac 上编写 Spark 程序 hadoop的版本是2 6 spark的版本是1 6 2 hadoop集群有3个节点 当然都在linux机器上 我在idea IDE中以spark独立模式运行spark程序 它运行成功
  • Kafka 分区键无法正常工作

    我正在努力解决如何正确使用分区键机制的问题 我的逻辑是设置分区号为3 然后创建三个分区键为 0 1 2 然后使用分区键创建三个KeyedMessage 例如 KeyedMessage 主题 0 消息 KeyedMessage 主题 1 消息
  • 如何在 Scala 2.11 中查找封闭源文件的名称

    在编译时 如何在 scala 2 11 中检索当前源文件 编写代码的位置 的名称 这是一种实际有效的方法 val srcFile new Exception getStackTrace head getFileName println sr
  • Scala 和 Python 的通行证

    我想知道 是否有相当于 python 的 pass 表达式 这个想法是编写没有实现的方法签名 并编译它们只是为了对某些库原型的这些签名进行类型检查 我能够使用以下方法模拟这种行为 def pass A A throw new Excepti
  • 如何关闭 Scala 中因方法重载而导致代码无法编译的特定隐式?

    我正忙着尝试自己回答这个问题 Scala Play 2 4 x 通过 anorm MySQL 处理扩展字符到 Java Mail https stackoverflow com questions 31417718 scala play 2
  • 如何在不从 DataFrame 转换并访问它的情况下向数据集添加列?

    我知道使用以下方法将新列添加到 Spark 数据集的方法 withColumn and a UDF 它返回一个 DataFrame 我还知道 我们可以将生成的 DataFrame 转换为 DataSet 我的问题是 如果我们仍然遵循传统的
  • 使用 Akka 1.3 的 actor 时,我需要注意生产者-消费者速率匹配吗?

    使用 Akka 1 3 时 我是否需要担心当生成消息的 Actor 生成消息的速度比使用消息的 Actor 的处理速度快时会发生什么 如果没有任何机制 在长时间运行的进程中 队列大小将增大以消耗所有可用内存 The doc http doc
  • 了解 Spark 中的 DAG

    问题是我有以下 DAG 我认为当需要洗牌时 火花将工作划分为不同的阶段 考虑阶段 0 和阶段 1 有些操作不需要洗牌 那么为什么 Spark 将它们分成不同的阶段呢 我认为跨分区的实际数据移动应该发生在第 2 阶段 因为这里我们需要cogr
  • Spark:查找前 n 个值的高性能方法

    我有一个很大的数据集 我想找到具有 n 个最高值的行 id count id1 10 id2 15 id3 5 我能想到的唯一方法是使用row number没有分区就像 val window Window orderBy desc coun

随机推荐

  • 使用 .html() 时,jQuery 是否会从字符串中删除一些 html 元素?

    我有一个包含完整 html 页面的 var 包括 head html body 等 当我将该字符串传递到 html 函数时 jQuery 会删除所有这些元素 例如 body html head 等 这是我不想要的 我的数据变量包含 那么我的
  • C# 中简单线程池的代码 [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 在 gelf 连接失败时使用 Symfony 2 / Monolog 防止内部服务器错误

    我正在尝试使用 gelf 格式将日志从 symfony 2 应用程序流式传输到graylog 2 服务器 我的独白配置如下 monolog handlers 8 lt gt 8 graylog type gelf publisher hos
  • DWR 的缺点是什么?

    在内网中使用DWR时 会出现性能或安全问题等缺点吗 直接 Web 远程处理是一种使用 Ajax 请求从 js 文件联系服务器的工具 我要注意的一件事是 与 正常 全页 HTTP 交付相比 您的服务器很可能会受到更多 HTTP 请求的攻击 让
  • 为什么在测试 MediaRecorder 示例时出现致命异常?

    我使用 Android Studio 在真实设备中测试示例 E Android SDK samples android 22 media MediaRecorder 出现以下错误 为什么 样本中是否存在一些错误 顺便说一句 我的 Andro
  • 返回按钮和上次活动

    我的应用程序链接了一些活动 如果您按后退按钮 您会返回到旧的活动 然后您会突然退出应用程序 所以如果它是堆栈上的最后一个活动 我需要显示一条消息 例如 你真的想退出吗 我知道如何覆盖后退按钮 但我不知道如何知道历史中有多少活动 Overri
  • 为 SQL Server 中的字段生成唯一哈希

    我正在编写一个会员提供程序 以便与我们现有的会员基础一起使用 我使用 EF4 1 进行所有数据库访问 我遇到的问题之一是 当数据库最初设置时 关系是以编程方式完成的 而不是在数据库中完成的 一种是如果需要在并非所有用户都需要的列上建立关系
  • 如何在WebRTC对等连接中创建数据通道?

    我正在尝试学习如何创建一个RTCPeerConnection这样我就可以使用DataChannelAPI 这是我根据我的理解尝试过的 var client new mozRTCPeerConnection var server new mo
  • PHP、PDO 和异常

    我目前对于 PDO 有点进退两难 我最近转而从我自己的自定义数据库类中使用它 因为我想利用事务 我面临的问题是如何从已经用 PDO 的 try catch 包装的代码块内部抛出异常 这是一个例子 try PDO code Transacti
  • 如何在React hooks中自动停止setInterval?

    我想构建一个循环进度条 计数到 60 然后自动停止 但它无法停止 我想使用 React hooks 和 useEffect 我的代码在这里 https codesandbox io s nostalgic khorana lijdyo fi
  • 如何在 iOS 15 Xcode 13 中将 TabView tabItem 上的填充图标更改为不填充?

    如何将 iOS 15 Xcode 13 中的 TabView tabItem 上的填充图标更改为不填充 现在看来图标是默认填充的 我的代码 import SwiftUI struct Test Home V View var body so
  • Promise.catch() 在 AngularJS 单元测试中没有捕获异常

    我正在 Typescript 中为我的应用程序编写 Jasmine 单元测试 并通过 Resharper 运行它们 如果处理程序抛出异常 它应该执行一个操作 describe Q Service Test gt var q ng IQSer
  • 边缘会话.cookie_lifetime

    我的网站功能在 Edge 上损坏 尤其是登录 有人告诉我这与session cookie lifetime 对于此事的任何帮助 我将不胜感激 请注意 我不是开发人员 没有任何代码知识 我的一些 php ini 文件 如果有帮助的话 sess
  • 查找上个月

    我见过一些使用的方法dateutil模块来执行此操作 但是有没有一种方法可以在不使用内置库的情况下执行此操作 例如 当前月份是七月 我可以使用datetime now 功能 python 返回上个月的最简单方法是什么 这很简单 gt gt
  • 是否可以在函数内使用 Excel 数组中的单个单元格引用?

    我有一些数据想要放入 LOGEST 函数中 e g x values 0 463 0 609 0 887 y values 0 05 0 1 0 2 For this example I have put the data into the
  • asyncio 和 trio 之间的核心区别是什么?

    今天 我发现了一个名为trio http trio readthedocs io en latest index html它说它自己是一个供人类使用的异步 API 这些词有点相似requests As requests确实是一个不错的库 我
  • 如何在队列上设置 ActiveMQ redeliveryPolicy?

    如何在队列上的 ActiveMQ 中设置 redeliveryPolicy 1 在文档中 请参阅 activeMQ 重新传送 http activemq apache org redelivery policy html 说明您应该在 Co
  • 如何使用 Microsoft.Graph 将文件附加到 Sharepoint 中的项目

    Microsoft Graph Sharepoint api 允许更新列表项https developer microsoft com en us graph docs api reference beta api listitem upd
  • 1 和 0 的大字符串到 BitSet

    我有一个非常大的字符串 64 个字符 其中包含 1 和 0 样本 1001111111101010011101101011100101001010111000101111011110001000 我想要的只是将其转换为 BitSet var
  • 丰富 SparkContext 而不会引发序列化问题

    我正在尝试使用 Spark 处理来自 HBase 表的数据 这篇博文 http www vidyasource com blog Programming Scala Java Data Hadoop Analytics 2014 01 25