我预计 Spark 1.6+ 的新 mapWithState API 能够几乎立即删除超时的对象,但存在延迟。
我正在使用改编版本测试 APIJavaStatefulNetworkWordCount https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java below:
SparkConf sparkConf = new SparkConf()
.setAppName("JavaStatefulNetworkWordCount")
.setMaster("local[*]");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
ssc.checkpoint("./tmp");
StateSpec<String, Integer, Integer, Tuple2<String, Integer>> mappingFunc =
StateSpec.function((word, one, state) -> {
if (state.isTimingOut())
{
System.out.println("Timing out the word: " + word);
return new Tuple2<String,Integer>(word, state.get());
}
else
{
int sum = one.or(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<String, Integer>(word, sum);
state.update(sum);
return output;
}
});
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER_2)
.flatMap(x -> Arrays.asList(SPACE.split(x)))
.mapToPair(w -> new Tuple2<String, Integer>(w, 1))
.mapWithState(mappingFunc.timeout(Durations.seconds(5)));
stateDstream.stateSnapshots().print();
与数控一起(nc -l -p <port>
)
当我在 nc 窗口中输入一个单词时,我会看到该元组每秒都在控制台中打印。但似乎超时消息并没有像根据超时设置所预期的那样在 5 秒后打印出来。元组过期所需的时间似乎在 5 到 20 秒之间变化。
我是否缺少某些配置选项,或者超时可能仅与检查点同时执行?