Spark Streaming mapWithState 超时延迟?

2023-12-25

我预计 Spark 1.6+ 的新 mapWithState API 能够几乎立即删除超时的对象,但存在延迟。

我正在使用改编版本测试 APIJavaStatefulNetworkWordCount https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java below:

SparkConf sparkConf = new SparkConf()
    .setAppName("JavaStatefulNetworkWordCount")
    .setMaster("local[*]");

JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
ssc.checkpoint("./tmp");

StateSpec<String, Integer, Integer, Tuple2<String, Integer>> mappingFunc =  
    StateSpec.function((word, one, state) -> {
        if (state.isTimingOut())
        {
             System.out.println("Timing out the word: " + word);
             return new Tuple2<String,Integer>(word, state.get());
        }
        else
        {
            int sum = one.or(0) + (state.exists() ? state.get() : 0);
            Tuple2<String, Integer> output = new Tuple2<String, Integer>(word, sum);
            state.update(sum);
            return output;
        }
});

JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
    ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
     StorageLevels.MEMORY_AND_DISK_SER_2)
       .flatMap(x -> Arrays.asList(SPACE.split(x)))
       .mapToPair(w -> new Tuple2<String, Integer>(w, 1))
       .mapWithState(mappingFunc.timeout(Durations.seconds(5)));

stateDstream.stateSnapshots().print();

与数控一起(nc -l -p <port>)

当我在 nc 窗口中输入一个单词时,我会看到该元组每秒都在控制台中打印。但似乎超时消息并没有像根据超时设置所预期的那样在 5 秒后打印出来。元组过期所需的时间似乎在 5 到 20 秒之间变化。

我是否缺少某些配置选项,或者超时可能仅与检查点同时执行?


一旦事件超时NOT立即删除,但仅通过将其保存到“deltaMap”来标记为删除:

override def remove(key: K): Unit = {
  val stateInfo = deltaMap(key)
  if (stateInfo != null) {
    stateInfo.markDeleted()
  } else {
    val newInfo = new StateInfo[S](deleted = true)
    deltaMap.update(key, newInfo)
  }
}

然后,收集超时事件并将其发送到输出流仅在检查站。也就是说:在批次 t 超时的事件将仅在下一个检查点出现在输出流中 - 默认情况下,平均经过 5 个批次间隔后,即批次 t+5:

 override def checkpoint(): Unit = {
    super.checkpoint()
    doFullScan = true
  }

...

removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled

...

// Get the timed out state records, call the mapping function on each and collect the
    // data returned
    if (removeTimedoutData && timeoutThresholdTime.isDefined) {
...

实际上,只有当有足够的元素并且状态图被序列化时,元素才会被删除——目前这也只发生在检查点:

  /** Whether the delta chain length is long enough that it should be compacted */
  def shouldCompact: Boolean = {
    deltaChainLength >= deltaChainThreshold
  }
  // Write the data in the parent state map while copying the data into a new parent map for
    // compaction (if needed)
    val doCompaction = shouldCompact
...

默认情况下,检查点每 10 次迭代发生一次,因此在上面的示例中每 10 秒发生一次;由于超时时间为 5 秒,因此事件预计会在 5-15 秒内发生。

编辑:根据@YuvalItzchakov 的评论更正并详细阐述了答案

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark Streaming mapWithState 超时延迟? 的相关文章

随机推荐