我一直在尝试使用 Apache Spark 来解决一些查询,例如 top-k、skyline 等。
我做了一个包装纸,其中包含SparkConf
and JavaSparkContext
named SparkContext
。这个类也实现了可序列化,但是因为SparkConf
and JavaSparkContext
不可序列化,那么该类也不可序列化。
我有一个解决 topK 查询的类,名为TopK
,该类实现了可序列化,但该类还有一个SparkContext
不可序列化的成员变量(由于上述原因)。因此,每当我尝试执行时,我都会遇到异常TopK
方法从内部.reduce()
RDD 中的函数。
我找到的解决方案是SparkContext
短暂的。
我的问题是:我应该保留SparkContext
变量是瞬态的还是我犯了一个大错误?
SparkContext
class:
import java.io.Serializable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.*;
public class SparkContext implements Serializable {
private final SparkConf sparConf; // this is not serializable
private final JavaSparkContext sparkContext; // this is not either
protected SparkContext(String appName, String master) {
this.sparConf = new SparkConf();
this.sparConf.setAppName(appName);
this.sparConf.setMaster(master);
this.sparkContext = new JavaSparkContext(sparConf);
}
protected JavaRDD<String> textFile(String path) {
return sparkContext.textFile(path);
}
}
TopK
class:
public class TopK implements QueryCalculator, Serializable {
private final transient SparkContext sparkContext;
.
.
.
}
抛出的例子Task not serializable
例外。 getBiggestPointByXDimension
甚至不会被输入,因为为了让它在包含它的类的reduce函数中执行(TopK
) 必须是可序列化的。
private Point findMedianPoint(JavaRDD<Point> points) {
Point biggestPointByXDimension = points.reduce((a, b) -> getBiggestPointByXDimension(a, b));
.
.
.
}
private Point getBiggestPointByXDimension(Point first, Point second) {
return first.getX() > second.getX() ? first : second;
}