通过 Spark 写入 HBase:任务不可序列化

2023-11-27

我正在尝试使用 Spark 1.0 在 HBase (0.96.0-hadoop2) 中写入一些简单的数据,但我不断遇到序列化问题。这是相关代码:

import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkContext
import java.util.Properties
import java.io.FileInputStream
import org.apache.hadoop.hbase.client.Put

object PutRawDataIntoHbase{
  def main(args: Array[String]): Unit = {
    var propFileName = "hbaseConfig.properties"
    if(args.size > 0){
      propFileName = args(0)
    }

    /** Load properties here **/
   val theData = sc.textFile(prop.getProperty("hbase.input.filename"))
     .map(l => l.split("\t"))
     .map(a => Array("%010d".format(a(9).toInt)+ "-" + a(0) , a(1)))

   val tableName = prop.getProperty("hbase.table.name")
   val hbaseConf = HBaseConfiguration.create()
   hbaseConf.set("hbase.rootdir", prop.getProperty("hbase.rootdir"))
   hbaseConf.addResource(prop.getProperty("hbase.site.xml"))
   val myTable = new HTable(hbaseConf, tableName)
   theData.foreach(a=>{
     var p = new Put(Bytes.toBytes(a(0)))
     p.add(Bytes.toBytes(hbaseColFamily), Bytes.toBytes("col"), Bytes.toBytes(a(1)))
      myTable.put(p)
    })
  }
}

运行代码的结果是:

Failed to run foreach at putDataIntoHBase.scala:79
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException:org.apache.hadoop.hbase.client.HTable

用 map 替换 foreach 不会崩溃,但我也不写。 任何帮助将不胜感激。


班上HBaseConfiguration表示与 HBase 服务器的连接池。显然,它无法被序列化并发送到工作节点。自从HTable使用这个池与HBase服务器通信,它也不能被序列化。

基本上,有三种方法可以处理这个问题:

在每个工作节点上打开一个连接。

注意使用foreachPartition method:

val tableName = prop.getProperty("hbase.table.name")
<......>
theData.foreachPartition { iter =>
  val hbaseConf = HBaseConfiguration.create()
  <... configure HBase ...>
  val myTable = new HTable(hbaseConf, tableName)
  iter.foreach { a =>
   var p = new Put(Bytes.toBytes(a(0)))
   p.add(Bytes.toBytes(hbaseColFamily), Bytes.toBytes("col"), Bytes.toBytes(a(1)))
    myTable.put(p)
  }
}

请注意,每个工作节点必须能够访问 HBase 服务器,并且必须预先安装或通过以下方式提供所需的 jar 文件:ADD_JARS.

另请注意,由于如果为每个分区打开连接池,因此最好将分区数量大致减少到工作节点数量(使用coalesce功能)。也可以共享一个HTable每个工作节点上都有一个实例,但这并不是那么简单。

将所有数据序列化到一个盒子并写入HBase

可以用一台计算机写入 RDD 中的所有数据,即使数据不适合内存。详细信息在这个答案中解释:Spark:从 RDD 检索大数据到本地机器的最佳实践

当然,它会比分布式写入慢,但它很简单,不会带来痛苦的序列化问题,并且如果数据大小合理,可能是最好的方法。

使用 HadoopOutputFormat

可以为 HBase 创建自定义 HadoopOutputFormat 或使用现有格式。我不确定是否有适合您需求的东西,但谷歌应该在这里提供帮助。

P.S.顺便说一句,map调用不会崩溃,因为它不会被评估:在调用具有副作用的函数之前,不会评估 RDD。例如,如果您致电theData.map(....).persist,它也会崩溃。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

通过 Spark 写入 HBase:任务不可序列化 的相关文章

随机推荐

  • 在 emacs python shell 中重新加载更改的 python 文件

    在 emacs Python shell 我正在运行 2 Python 中 我正在导入一个正在使用的 py 文件并测试代码 但是 如果我更改代码 我不知道如何再次导入它 从我到目前为止的阅读看来 reload modulename 应该有效
  • 在 AsyncTask 中获取上下文

    我正在尝试获取名为 Opciones 的类的 AsyncTask 中的上下文 该类是唯一调用该任务的类 但我不知道该怎么做 我看到了一些如下代码 protected void onPostExecute Long result Toast
  • 如何指定 FCM 消息的优先级?

    我正在使用 React native firebase 来管理 React native 应用程序上的通知 当应用程序关闭时 我尝试在后台处理 FCM 消息而不显示通知 我正在使用 React native firebase 来管理 Rea
  • 按编号搜索并使用 ABAddressBook 获取图像

    我希望使用号码作为键通过我的应用程序在 iphone 地址簿中进行搜索 然后检索与该联系人关联的图像并将其显示在 UIImageView 上 我尝试使用 ABAddressBook 框架 但无法继续 任何人都可以建议我解决方案或我可以遵循的
  • 绝对导入会导致 ModuleNotFoundError

    Python 3 6 我已经编写了一些组件 并且正在尝试将其中一个组件导入另一个组件中 下面是我的项目结构 components init py extract python3 init py extract py transform py
  • Laravel 5.2 中 auth()->user() 为 null

    我刚刚将 Composer 更新到 Laravel 5 2 无法查看受密码保护的页面 基本上下面的代码行不起作用 auth gt user 有人可以建议为什么这不起作用吗 确保任何需要会话 Auth 使用的 的路由都位于 web 中间件组
  • 使用应用内购买来解锁功能与使用 iPhone 的免费和付费应用版本

    我有一个应用程序 我打算将其作为具有部分全部功能的免费 精简版 版本和具有高级功能的付费完整版本发布 现在 通过在应用程序内购买免费应用程序 我正在考虑走这条路 能够根据需要解锁功能 我不是在谈论过期的试用版 基本上 我希望人们能够试用该应
  • 创建未知类型的数组

    我有一个对象 我必须验证该问题的值 对象的一些属性是自定义对象的数组 这样它将涉及到对数组的各个元素进行一些深入研究 为每个元素执行 getter 例如 AttribGrp x Object getAttribGrp x i getSome
  • 在.net中加载dll而不锁定它

    我正在执行一项任务 其中我必须加载 dll 并从中获取一些信息 例如类名等 但是当我将该 dll 加载到我的代码中时 它被锁定并且无法从源代码构建 直到我关闭加载程序 我尝试了某些解决方案 但没有一个适合我 Shadowcopy 在这种情况
  • 如何在 AngularJS 中动态更改 CSS 属性

    现在我有一个背景图像 URL 硬编码到 CSS 中 我想使用 AngularJS 中的逻辑动态选择背景图像 这是我目前拥有的 HTML div class offer detail image div div CSS offer detai
  • IPython Notebook 错误:加载笔记本时出错

    在过去的几分钟内 我无法访问任何 ipynb以前用Python3 4创建的文件 我进入包含这些文件的子目录 输入 ipython3 notebook 当我打开 新 笔记本时 出现以下错误 Unexpected error while sav
  • 服务层中的授权和用户信息(.NET 应用程序)

    我目前正在 NET 环境 n 层 中使用企业应用程序 我想知道在我的 BusinessLayer BL 中管理身份验证 授权 数据过滤的最佳方法 我们将从多个接口 ASP NET 应用程序和 Web 服务 使用该 BL 我认为我的 Serv
  • 如何使用 MouseListener 查找网格中的特定单元格

    我正在尝试创建一个由单元格组成的 10 x 10 网格的 Java 游戏 网格看起来像这样 public class Grid extends JPanel implements MouseListener public static fi
  • PHP:在mysql中的时间戳值内分割日期和时间

    我在数据库表中有一个名为 时间戳 的字段 它以以下格式存储值 YYYY MM DD HH MM SS 我想分开 然后获取变量中的日期 YYYY MM DD 以及另一个变量中的时间 HH MM SS 例子 timestamp 2012 10
  • 跨内存管理器边界传递 Delphi const 字符串参数是否安全?

    主题 我想使用字符串而不是 PChar 因为这样可以省去很多转换 但如果我这样做 procedure SomeExternalProc s string external SOMEDLL DLL 然后使用非共享内存管理器在其他一些项目中实现
  • 如何在 Github actions 中设置 Dockerfile ARG

    我有一个适用于我的 Node js 服务之一的 Dockerfile 我尝试使用 Github 操作将其推送到我的 Digitalocean 注册表 我的 Node js 服务需要一个由我自己在 npm js 注册表上托管的私有包 在我的
  • 在 javascript 或 bootstrap 中创建可点击的工具提示

    制作如下图所示的可点击工具提示的最佳方法是什么 我应该使用 bootstrap 还是其他库 Thanks 干得好 Pops popover html true content function return popover content
  • Qt如何捕获指示所有gui元素已准备就绪的事件

    我想知道是否有可能捕获所有时生成的事件 Qt 对象已初始化并准备就绪 看来有些事情不能在window的构造函数中完成 它们在插槽实现中工作得很好 例如 当我想访问应用程序的根窗口时 我会这样做 in h MainWindow rootWin
  • Jquery:ajax post 和编码

    我无法理解为什么我无法从服务器答案中获得正确的 ISO 8859 1 字符集 由于这是一项针对遗留代码的工作 我几乎无法更改页面上的字符集编码 我使用 JQuery 调用 post server side code t ctext i io
  • 通过 Spark 写入 HBase:任务不可序列化

    我正在尝试使用 Spark 1 0 在 HBase 0 96 0 hadoop2 中写入一些简单的数据 但我不断遇到序列化问题 这是相关代码 import org apache hadoop hbase client import org