java.lang.IllegalStateException:不支持在启动上下文后添加新的输入、转换和输出操作

2024-04-04

当我尝试在 Spark 的函数调用中创建 dStream 时,出现以下异常。

我的通话方法:

@Override
public JavaRDD<Object> call(JavaRDD<Object> v1) throws Exception {
    Queue<JavaRDD<Object>> queue = new LinkedList<>();
    queue.add(v1);
    JavaDStream<Object> dStream = context.queueStream(queue);
    JavaDStream<Object> newDStream = dStream.map(AbstractProcessor.this);
    final JavaRDD<Object> rdd = context.sparkContext().emptyRDD();
    newDStream.foreachRDD(new SaxFunction<JavaRDD<Object>, Void>() {
        private static final long serialVersionUID = 672054140484217234L;

        @Override
        public Void execute(JavaRDD<Object> object) throws Exception {
            rdd.union(object);
            return null;
        }
    });
    return rdd;
}

例外 :

Caused by: java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported
    at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:220)
    at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
    at org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:42)
    at org.apache.spark.streaming.dstream.QueueInputDStream.<init>(QueueInputDStream.scala:29)
    at org.apache.spark.streaming.StreamingContext.queueStream(StreamingContext.scala:513)
    at org.apache.spark.streaming.StreamingContext.queueStream(StreamingContext.scala:492)
    at org.apache.spark.streaming.api.java.JavaStreamingContext.queueStream(JavaStreamingContext.scala:436)

有什么方法可以创建 dStream 并在运行时对其进行操作,或者可以在上下文启动后更新 DAG? 提前致谢。


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

java.lang.IllegalStateException:不支持在启动上下文后添加新的输入、转换和输出操作 的相关文章

随机推荐