我正在尝试使用 HBase 作为 Spark 的数据源。因此,第一步是从 HBase 表创建 RDD。由于 Spark 使用 hadoop 输入格式,我可以找到一种通过创建 rdd 来使用所有行的方法http://www.vidyasource.com/blog/Programming/Scala/Java/Data/Hadoop/Analytics/2014/01/25/lighting-a-spark-with-hbase http://www.vidyasource.com/blog/Programming/Scala/Java/Data/Hadoop/Analytics/2014/01/25/lighting-a-spark-with-hbase但是我们如何为范围扫描创建 RDD 呢?
欢迎所有建议。
以下是在 Spark 中使用 Scan 的示例:
import java.io.{DataOutputStream, ByteArrayOutputStream}
import java.lang.String
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Base64
def convertScanToString(scan: Scan): String = {
val out: ByteArrayOutputStream = new ByteArrayOutputStream
val dos: DataOutputStream = new DataOutputStream(out)
scan.write(dos)
Base64.encodeBytes(out.toByteArray)
}
val conf = HBaseConfiguration.create()
val scan = new Scan()
scan.setCaching(500)
scan.setCacheBlocks(false)
conf.set(TableInputFormat.INPUT_TABLE, "table_name")
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
rdd.count
您需要将相关库添加到 Spark 类路径并确保它们与您的 Spark 兼容。温馨提示:可以使用hbase classpath
找到他们。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)