我是这个领域的初学者,所以我无法理解它......
- HBase 版本:0.98.24-hadoop2
- 火花版本:2.1.0
以下代码尝试将从 Spark Streming-Kafka 生产者接收的数据放入 HBase 中。
-
Kafka输入数据格式是这样的:
线路1,标签1,123
线路1,标签2,134
Spark-streaming进程通过分隔符','分割接收行,然后将数据放入HBase。
但是,我的应用程序在调用 htable.put() 方法时遇到错误。
任何人都可以帮助解释为什么下面的代码会抛出错误吗?
谢谢。
JavaDStream<String> records = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 7113426295831342436L;
HTable htable;
public HTable set() throws IOException{
Configuration hconfig = HBaseConfiguration.create();
hconfig.set("hbase.zookeeper.property.clientPort", "2222");
hconfig.set("hbase.zookeeper.quorum", "127.0.0.1");
HConnection hconn = HConnectionManager.createConnection(hconfig);
htable = new HTable(hconfig, tableName);
return htable;
};
@Override
public Iterator<String> call(String x) throws IOException {
////////////// Put into HBase /////////////////////
String[] data = x.split(",");
if (null != data && data.length > 2 ){
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
String ts = sdf.format(new Date());
Put put = new Put(Bytes.toBytes(ts));
put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("LINEID"), Bytes.toBytes(data[0]));
put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("TAGID"), Bytes.toBytes(data[1]));
put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("VAL"), Bytes.toBytes(data[2]));
/*I've checked data passed like this
{"totalColumns":3,"row":"20170120200927",
"families":{"TAGVALUE":
[{"qualifier":"LINEID","vlen":3,"tag[], "timestamp":9223372036854775807},
{"qualifier":"TAGID","vlen":3,"tag":[],"timestamp":9223372036854775807},
{"qualifier":"VAL","vlen":6,"tag" [],"timestamp":9223372036854775807}]}}*/
//********************* ERROR *******************//
htable.put(put);
htable.close();
}
return Arrays.asList(COLDELIM.split(x)).iterator();
}
});
错误代码 :
Exception in thread "main" org.apache.spark.SparkException: Job
aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 23, localhost, executor driver): java.lang.NullPointerException
at org.test.avro.sparkAvroConsumer$2.call(sparkAvroConsumer.java:154)
at org.test.avro.sparkAvroConsumer$2.call(sparkAvroConsumer.java:123)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:171)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:171)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1353)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1353)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)