我正在尝试迭代 JavaPairRDD 并使用 JavaPairRDD 的键和值执行一些计算。然后将每个 JavaPair 的结果输出到处理后的数据 list.
我已经尝试过的:将我在 lambda 函数内部使用的变量设为静态。
make 方法,我从 lambda foreach 循环调用静态方法。
添加了可序列化的实现
这是我的代码:
List<String> processedData = new ArrayList<>();
JavaPairRDD<WebLabGroupObject, Iterable<WebLabPurchasesDataObject>> groupedByWebLabData.foreach(data ->{
JavaRDD<WebLabPurchasesDataObject> oneGroupOfData = convertIterableToJavaRdd(data._2());
double opsForOneGroup = getOpsForGroup(oneGroupOfData);
double unitsForOneGroup = getUnitsForGroup(oneGroupOfData);
String combinedOutputForOneGroup = data._1().getProductGroup() + "," + opsForOneGroup + "," + unitsForOneGroup;
processedData.add(combinedOutputForOneGroup);
});
private JavaRDD<WebLabPurchasesDataObject> convertIterableToJavaRdd(Iterable<WebLabPurchasesDataObject> groupedElements)
{
List<WebLabPurchasesDataObject> list = new ArrayList<>();
groupedElements.forEach(el -> list.add(el));
return this.context.parallelize(list);
}
这是异常本身:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:797)
at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:312)
at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:46)
at com.amazon.videoads.emr.spark.WebLabDataAnalyzer.processWebLabData(WebLabDataAnalyzer.java:121)
at com.amazon.videoads.emr.spark.WebLabMetricsApplication.main(WebLabMetricsApplication.java:110)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala).Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext . Serialization stack:
- object not serializable (class: org.apache.spark.api.java.JavaSparkContext, value: org.apache.spark.api.java.JavaSparkContext@395e9596)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 2)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class com.amazon.videoads.emr.spark.WebLabDataAnalyzer$$Lambda$14/1536342848, com.amazon.videoads.emr.spark.WebLabDataAnalyzer$$Lambda$14/1536342848@5acc8c7c)
- field (class: org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1, name: f$14, type: interface org.apache.spark.api.java.function.VoidFunction)
- object (class org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 16 more
TL;DR:你正在尝试使用JavaSparkContext在你的里面按网络实验室数据分组RDD:你不能这样做,因为 JavaSparkContext 不可序列化。
堆栈跟踪在这里非常有用:
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala).Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext . Serialization stack:
这意味着
- 您正在尝试序列化无法序列化的内容
- 这个东西是一个JavaSparkContext
这是由这两行引起的:
JavaPairRDD<WebLabGroupObject, Iterable<WebLabPurchasesDataObject>> groupedByWebLabData.foreach(data ->{
JavaRDD<WebLabPurchasesDataObject> oneGroupOfData = convertIterableToJavaRdd(data._2());
because
convertIterableToJavaRdd
它由 RDD 的每个元素调用,使用
this.context.parallelize(list)
即它使用JavaSparkContext:您正在尝试在执行器上使用 JavaSparkContext (其中数据使您的按网络实验室数据分组RDD 还活着)。那么你不能这样做,因为 JavaSparkContext 不可序列化。
这里你正在做的事情可能可以通过UDF您可以收集结果(如果不是太大)。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)