我正在使用轮询方法定期获取数据。新数据可能随时到达。我想向我的客户公开一个反应式接口。因此,我想创建一个发布者(Flux?),它会在新数据可用时发布新数据并通知订阅者。我怎么做?我看到的所有 Flux 示例都是针对数据已知/可用的情况。实际上,我想要类似基于队列的 Flux 之类的东西,并且我的轮询线程在发现新数据时可以继续填充队列。
对于简单的事情,您可能想要使用DirectProcessor
。这不是最复杂的通量汇,但它会让您有所了解。
我写了一个简单的例子:
Flux<String> hot = DirectProcessor.create<String>()
hot.onNext("Hello")//not printed
hot.subscribe(it -> System.out.println(it))
hot.onNext("Goodbye")//printed
Thread.sleep(100)
hot.onNext("foo")//printed
DirectProcessor 实现了 Flux,因此您可以像 Flux 一样使用它。
正如您所看到的,在订阅热源之前添加的元素不会被传递到订阅。
查看其他帖子,Flux#create 和 Flux#generate 可能是不错的起点。Flux.create 和 Flux.generate 之间的区别 https://stackoverflow.com/questions/49951060/difference-between-flux-create-and-flux-generate
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)