班上HBaseConfiguration
表示与 HBase 服务器的连接池。显然,它无法被序列化并发送到工作节点。自从HTable
使用这个池与HBase服务器通信,它也不能被序列化。
基本上,有三种方法可以处理这个问题:
在每个工作节点上打开一个连接。
注意使用foreachPartition
method:
val tableName = prop.getProperty("hbase.table.name")
<......>
theData.foreachPartition { iter =>
val hbaseConf = HBaseConfiguration.create()
<... configure HBase ...>
val myTable = new HTable(hbaseConf, tableName)
iter.foreach { a =>
var p = new Put(Bytes.toBytes(a(0)))
p.add(Bytes.toBytes(hbaseColFamily), Bytes.toBytes("col"), Bytes.toBytes(a(1)))
myTable.put(p)
}
}
请注意,每个工作节点必须能够访问 HBase 服务器,并且必须预先安装或通过以下方式提供所需的 jar 文件:ADD_JARS
.
另请注意,由于如果为每个分区打开连接池,因此最好将分区数量大致减少到工作节点数量(使用coalesce
功能)。也可以共享一个HTable
每个工作节点上都有一个实例,但这并不是那么简单。
将所有数据序列化到一个盒子并写入HBase
可以用一台计算机写入 RDD 中的所有数据,即使数据不适合内存。详细信息在这个答案中解释:Spark:从 RDD 检索大数据到本地机器的最佳实践
当然,它会比分布式写入慢,但它很简单,不会带来痛苦的序列化问题,并且如果数据大小合理,可能是最好的方法。
使用 HadoopOutputFormat
可以为 HBase 创建自定义 HadoopOutputFormat 或使用现有格式。我不确定是否有适合您需求的东西,但谷歌应该在这里提供帮助。
P.S.顺便说一句,map
调用不会崩溃,因为它不会被评估:在调用具有副作用的函数之前,不会评估 RDD。例如,如果您致电theData.map(....).persist
,它也会崩溃。