文章目录
- 一、异常触发SQL
- 二、异常处理
- 三、Hive on Spark依赖的Hive jar包部署
继上一篇之后,又发现了一种新的报错位置。本篇对这种情况进行处理,并验证这种处理方式是否适用于Hive on Spark环境。
一、异常触发SQL
构造测试数据
(1) 建表,插入数据
create table t1(id float,content string) stored as parquet;
insert into t1 vlaues(1.1,'content1'),(2.2,'content2');
create table error_type(id int,content string) stored as parquet;
(2) 拷贝文件到类型不兼容的表
hdfs dfs -cp /user/hive/warehouse/testdb.db/t1/000000_0 /user/hive/warehouse/testdb.db/error_type/
在前面两步之后,执行如下SQL:
select * from error_type where content='content1';
报错并有如下错误日志:
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row [Error getting row data with exception java.lang.ClassCastException: org.apache.hadoop.io.FloatWritable cannot be cast to org.apache.hadoop.io.IntWritable
at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector.get(WritableIntObjectInspector.java:36)
at org.apache.hadoop.hive.serde2.SerDeUtils.buildJSONString(SerDeUtils.java:227)
at org.apache.hadoop.hive.serde2.SerDeUtils.buildJSONString(SerDeUtils.java:364)
at org.apache.hadoop.hive.serde2.SerDeUtils.getJSONString(SerDeUtils.java:200)
at org.apache.hadoop.hive.serde2.SerDeUtils.getJSONString(SerDeUtils.java:186)
at org.apache.hadoop.hive.ql.exec.MapOperator.toErrorMessage(MapOperator.java:520)
at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:489)
at org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:133)
at org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48)
at org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27)
at org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:85)
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127)
at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127)
at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232)
at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
]
at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:494) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:133) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:85) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42) ~[scala-library-2.11.12.jar:?]
at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[scala-library-2.11.12.jar:?]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[scala-library-2.11.12.jar:?]
at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.scheduler.Task.run(Task.scala:121) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
... 3 more
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.FloatWritable cannot be cast to org.apache.hadoop.io.IntWritable
at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector.get(WritableIntObjectInspector.java:36) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.serde2.lazy.LazyUtils.writePrimitiveUTF8(LazyUtils.java:251) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serialize(LazySimpleSerDe.java:292) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serializeField(LazySimpleSerDe.java:247) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.doSerialize(LazySimpleSerDe.java:231) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.serde2.AbstractEncodingAwareSerDe.serialize(AbstractEncodingAwareSerDe.java:55) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:732) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:882) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:95) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:882) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.FilterOperator.process(FilterOperator.java:126) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:882) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:130) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:146) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:484) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:133) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:85) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42) ~[scala-library-2.11.12.jar:?]
at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[scala-library-2.11.12.jar:?]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[scala-library-2.11.12.jar:?]
at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.scheduler.Task.run(Task.scala:121) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
... 3 more
二、异常处理
其中org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.doSerialize(LazySimpleSerDe.java:231)
函数中有序列化每个字段的逻辑:
@Override
public Writable doSerialize(Object obj, ObjectInspector objInspector)
throws SerDeException {
if (objInspector.getCategory() != Category.STRUCT) {
throw new SerDeException(getClass().toString()
+ " can only serialize struct types, but we got: "
+ objInspector.getTypeName());
}
StructObjectInspector soi = (StructObjectInspector) objInspector;
List<? extends StructField> fields = soi.getAllStructFieldRefs();
List<Object> list = soi.getStructFieldsDataAsList(obj);
List<? extends StructField> declaredFields = (serdeParams.getRowTypeInfo() != null && ((StructTypeInfo) serdeParams.getRowTypeInfo())
.getAllStructFieldNames().size() > 0) ? ((StructObjectInspector) getObjectInspector())
.getAllStructFieldRefs()
: null;
serializeStream.reset();
serializedSize = 0;
for (int i = 0; i < fields.size(); i++) {
if (i > 0) {
serializeStream.write(serdeParams.getSeparators()[0]);
}
ObjectInspector foi = fields.get(i).getFieldObjectInspector();
Object f = (list == null ? null : list.get(i));
if (declaredFields != null && i >= declaredFields.size()) {
throw new SerDeException("Error: expecting " + declaredFields.size()
+ " but asking for field " + i + "\n" + "data=" + obj + "\n"
+ "tableType=" + serdeParams.getRowTypeInfo().toString() + "\n"
+ "dataType="
+ TypeInfoUtils.getTypeInfoFromObjectInspector(objInspector));
}
serializeField(serializeStream, f, foi, serdeParams);
}
serializeCache
.set(serializeStream.getData(), 0, serializeStream.getLength());
serializedSize = serializeStream.getLength();
lastOperationSerialize = true;
lastOperationDeserialize = false;
return serializeCache;
}
for循环中的最后一行serializeField(serializeStream, f, foi, serdeParams);
调用的即是异常堆栈中的org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serializeField(LazySimpleSerDe.java:247)
函数:
protected void serializeField(ByteStream.Output out, Object obj, ObjectInspector objInspector,
LazySerDeParameters serdeParams) throws SerDeException {
try {
serialize(out, obj, objInspector, serdeParams.getSeparators(), 1, serdeParams.getNullSequence(),
serdeParams.isEscaped(), serdeParams.getEscapeChar(), serdeParams.getNeedsEscape());
} catch (IOException e) {
throw new SerDeException(e);
}
}
serializeField中调用的serialize函数为异常堆栈中的org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serialize(LazySimpleSerDe.java:292)
函数:
public static void serialize(ByteStream.Output out, Object obj,
ObjectInspector objInspector, byte[] separators, int level,
Text nullSequence, boolean escaped, byte escapeChar, boolean[] needsEscape)
throws IOException, SerDeException {
if (obj == null) {
out.write(nullSequence.getBytes(), 0, nullSequence.getLength());
return;
}
char separator;
List<?> list;
switch (objInspector.getCategory()) {
case PRIMITIVE:
LazyUtils.writePrimitiveUTF8(out, obj,
(PrimitiveObjectInspector) objInspector, escaped, escapeChar,
needsEscape);
return;
case LIST:
......
return;
case MAP:
......
return;
case STRUCT:
......
return;
case UNION:
......
return;
default:
break;
}
throw new RuntimeException("Unknown category type: "
+ objInspector.getCategory());
}
在这个函数中,调用后续针对特定类型的函数对特定类型进行序列化,类型不兼容时则抛出异常。可以看到当前字段数据为空时,有如下逻辑:
if (obj == null) {
out.write(nullSequence.getBytes(), 0, nullSequence.getLength());
return;
}
所以还是可以在LazySimpleSerDe.doSerialize
函数中处理每个字段的逻辑中,捕获ClassCastException,并参考serialize函数这种逻辑写入空值,将LazySimpleSerDe.doSerialize
中
serializeField(serializeStream, f, foi, serdeParams);
改成
try {
serializeField(serializeStream, f, foi, serdeParams);
} catch (ClassCastException | UnsupportedOperationException e) {
serializeStream.write(serdeParams.getNullSequence().getBytes(), 0, serdeParams.getNullSequence().getLength());
}
三、Hive on Spark依赖的Hive jar包部署
上面代码修改后,用前一篇文章中的copy_jars.sh脚本将hive*.jar部署后,Hive默认的MR执行引擎已经可以执行本文开始提到的会报错的SQL,但是当Hive使用Spark作为执行引擎时(如beeline中可通过set hive.execution.engine=spark;设置),仍然会报错,猜测Spark使用的Hive依赖包在另外的位置也存放了一份。
从前面的日志可以看出,一部分日志后面都显示了hive-exec-2.1.1-cdh6.3.0.jar这个jar包名,在部署了CDH的主机上搜索这个jar包:
[root@dev-master2 tmp]
/opt/cloudera/cm/cloudera-navigator-server/libs/cdh6/hive-exec-2.1.1-cdh6.3.0.jar
/opt/cloudera/cm/cloudera-scm-telepub/libs/cdh6/hive-exec-2.1.1-cdh6.3.0.jar
/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/jars/hive-exec-2.1.1-cdh6.3.0.jar
/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/hive/hive-exec-2.1.1-cdh6.3.0.jar
/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hive/lib/hive-exec-2.1.1-cdh6.3.0.jar
/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/oozie/embedded-oozie-server/webapp/WEB-INF/lib/hive-exec-2.1.1-cdh6.3.0.jar
看起来/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/hive/hive-exec-2.1.1-cdh6.3.0.jar这个就是Spark使用的Hive依赖包存放位置,且这个目录下只有一个jar包:
[root@dev-master2 tmp]
total 35M
-rw-r--r--. 1 root root 35M Jul 19 2019 hive-exec-2.1.1-cdh6.3.0.jar
所以在copy_jars.sh中添加一句
cp $current_dir/hive-exec-2.1.1-cdh6.3.0.jar /opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/hive/
再重新部署,经测试,Hive on Spark已经可以查询类型不兼容的类型,结果显示为空值。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)