我有一个能发出一些光的通量Date
. This Date
映射到我在某些设备上运行的 1024 个模拟 HTTP 请求Executer
.
我想做的是等待所有 1024 个 HTTP 请求,然后再发出下一个请求Date
.
目前运行时,onNext()
被调用多次,然后稳定在某个稳定的速率上。
我怎样才能改变这种行为?
附:如果需要的话,我愿意转向架构。
private void run() throws Exception {
Executor executor = Executors.newFixedThreadPool(2);
Flux<Date> source = Flux.generate(emitter ->
emitter.next(new Date())
);
source
.log()
.limitRate(1)
.doOnNext(date -> System.out.println("on next: " + date))
.map(date -> Flux.range(0, 1024))
.flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
.subscribeOn(Schedulers.fromExecutor(executor)))
.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
}
HTTP请求模拟:
private static String simulateHttp() {
try {
System.out.println("start http call");
Thread.sleep(3_000);
} catch (Exception e) {}
return "HTML content";
}
编辑:改编自答案的代码:
- 首先,我的代码中有一个错误(另一个
flatMap
需要)
-
其次,我补充说concurrency
的参数1
二者皆是flatMap
(貌似两者都需要)
Executor executor = Executors.newSingleThreadExecutor();
Flux<Date> source = Flux.generate(emitter -> {
System.out.println("emitter called!");
emitter.next(new Date());
});
source
.limitRate(1)
.map(date -> Flux.range(0, 16))
.flatMap(Function.identity(), 1) # concurrency = 1
.flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
.subscribeOn(Schedulers.fromExecutor(executor)), 1) # concurrency = 1
.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
您应该看看这些方法:
- Flux.flatMap(Function, int, int) https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#flatMap-java.util.function.Function-int-int-
-
Flux.concatMap(Function, int) https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#concatMap-java.util.function.Function-int-.
concatMap
确保在算子内按顺序处理通量上的元素:
内部消息的生成和订阅:该操作员正在等待一个
在生成下一个之前完成内部并订阅
它。
flatMap
让你通过暴露来做同样的事情concurrency
and prefetch
参数可以让您更好地控制此行为:
并发参数允许控制可以有多少个发布者
并行订阅和合并。反过来,该论点表明
向上游发出的第一个 Subscription.request(long) 的大小。这
prefetch 参数允许给定任意预取大小
合并的发布者(换句话说,预取大小意味着
第一个 Subscription.request(long) 到合并的发布者)。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)