我正在使用 io.projectreactor 3 (reactor-core 3.2.6.RELEASE),我注意到错误处理方面存在一些差异。不幸的是,官方文档没有提供足够的细节来解决我的问题。
我有以下 4 个片段。在某些情况下,异常将被忽略,而在其他情况下,它将进一步抛出。实际产生和消费异常的方式是怎样的呢?
片段1
在这种情况下,异常将被忽略,并且 main() 将完成而不会收到异常。
import reactor.core.publisher.Flux;
class Scratch {
public static void main(String[] args) throws Throwable {
Flux.push(sink -> {
sink.next(1);
sink.next(2);
}).doOnNext(e -> {
throw new RuntimeException("HELLO WORLD");
}).subscribe(System.out::println, e -> {
throw new RuntimeException(e);
});
System.out.println("DONE");
}
}
Output:
DONE
片段2
与上面的示例类似,只是我们不使用 Flux.push 而是使用 Flux.just。 Main() 将收到异常。
import reactor.core.publisher.Flux;
class Scratch {
public static void main(String[] args) throws Throwable {
Flux.just(
1
).doOnNext(e -> {
throw new RuntimeException("HELLO WORLD");
}).subscribe(System.out::println, e -> {
throw new RuntimeException(e);
});
System.out.println("DONE");
}
}
Output:
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: HELLO WORLD
at Scratch.lambda$main$1(scratch_15.java:10)
...
片段3
我们通过调用sink.error来发出异常信号。 Main() 不会收到异常。
import reactor.core.publisher.Flux;
class Scratch {
public static void main(String[] args) throws Throwable {
Flux.push(sink -> {
sink.next(1);
sink.next(2);
sink.error(new RuntimeException("HELLO WORLD"));
}).subscribe(System.out::println, e -> {
throw new RuntimeException(e);
});
System.out.println("DONE");
}
}
Output:
1
2
DONE
片段4
我们直接抛出异常。 Main() 将收到异常。
import reactor.core.publisher.Flux;
class Scratch {
public static void main(String[] args) throws Throwable {
Flux.push(sink -> {
sink.next(1);
sink.next(2);
throw new RuntimeException("HELLO WORLD");
}).subscribe(System.out::println, e -> {
throw new RuntimeException(e);
});
System.out.println("DONE");
}
}
Output
1
2
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: HELLO WORLD
at Scratch.lambda$main$1(scratch_15.java:10)
...
使用反应核心时处理异常的正确方法是什么?唯一可靠的方法似乎根本不使用错误回调,而是用 try/catch 包围 Flux.subscribe。但在那种情况下我总是收到UnsupportedOperationException
而不是原来的异常,然后我需要使用Exceptions.isErrorCallbackNotImplemented
要检查它是否来自反应式,请提取嵌套异常然后抛出它。
这当然可以做到,但需要在我们订阅 Flux 的每个地方一致地完成。这对我来说看起来不太好。我在这里缺少什么?