我使用 vertX 和 RxJava 启动了一个项目,但遇到了一个问题,但没有找到解决方案。
我有一个 Observable,它为传入通信发出 WebSocketFrame,
每个 WebSocketFrame 由有效负载(ByteBuffer)和指示它是消息的第一帧还是最后一帧的标志组成。
我想对此 Observable 进行操作,将其转换为发出 ByteBufferd 的 Observable,其中包含每条消息的所有帧。
我尝试过buffer
方法,但它似乎被设计为通过任意标准(时间或其他可观察的)重新组合项目。
另一种方法似乎使用compose
订阅 WebSocketFrame observable,在非结束帧上添加到缓冲区,并在结束帧上“馈送”ByteBuffer Observable。但我不知道如何手动创建和提供缓冲区。
因此,如果有人已经看到这个问题(恕我直言,这似乎很常见)并且对 RxJava 有足够的了解来提出实现方案,我将不胜感激。
感谢您的阅读。
我想你必须使用buffer http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(rx.Observable,%20rx.functions.Func1)运算符(也许你可以使用更简单的buffer http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(rx.functions.Func0),但我对此不确定)。也可以看看这另一个问题 https://stackoverflow.com/questions/31314345/rxjava-buffer-window-until-element-suffers-condition涵盖大致相同的主题并且这个 GitHub 页面 https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md进行更多讨论。希望这对您有帮助!
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)