要转换一个CompletableFuture<List<T>>
into a Flux<T>
您可以使用Mono#fromFuture https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#fromFuture-java.util.concurrent.CompletableFuture- with Mono#flatMapMany https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#flatMapMany-java.util.function.Function-:
var future = new CompletableFuture<List<Long>>();
future.completeAsync(() -> List.of(1L, 2L, 3L, 4L, 5L),
CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS));
Flux<Long> flux = Mono.fromFuture(future).flatMapMany(Flux::fromIterable);
flux.subscribe(System.out::println);
List<T>
在回调中异步接收也可以转换为Flux<T>
不使用CompletableFuture
。
您可以直接使用Mono#create https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#create-java.util.function.Consumer- with Mono#flatMapMany https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#flatMapMany-java.util.function.Function-:
Flux<Long> flux = Mono.<List<Long>>create(sink -> {
Callback<List<Long>> callback = new Callback<List<Long>>() {
@Override
public void onResult(List<Long> list) {
sink.success(list);
}
@Override
public void onError(Exception e) {
sink.error(e);
}
};
client.call("query", callback);
}).flatMapMany(Flux::fromIterable);
flux.subscribe(System.out::println);
或者简单地使用Flux#create https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#create-java.util.function.Consumer-一次通过多次发射:
Flux<Long> flux = Flux.create(sink -> {
Callback<List<Long>> callback = new Callback<List<Long>>() {
@Override
public void onResult(List<Long> list) {
list.forEach(sink::next);
}
@Override
public void onError(Exception e) {
sink.error(e);
}
};
client.call("query", callback);
});
flux.subscribe(System.out::println);