我有一个 Observable,它发出许多对象,我想使用以下方法对这些对象进行分组:window
or buffer
运营。但是,不是指定count
用于确定窗口中应有多少对象的参数我希望能够使用自定义标准。
例如,假设可观察对象正在发出 a 的实例Message
像下面这样的类。
class Message(
val int size: Int
)
我想根据它们的消息实例来缓冲或窗口size
变量不仅仅是它们的计数。例如,获取总大小最多为 5000 的消息窗口。
// Something like this
readMessages()
.buffer({ message -> message.size }, 5000)
是否有捷径可寻?
首先我必须承认,我不是 RxJava 专家。
我只是发现你的问题具有挑战性,并试图找到解决方案。
有一个window()
带参数的函数boundaryIndicator
。你必须创建一个Publisher
/ Flowable
如果达到窗口大小,则发出一个项目。
在示例中我创建了一个对象windowManager
用作boundaryIndicator
。在里面onNext
回调我调用windowManager
并给它一个打开新窗口的机会。
val windowManager = object {
lateinit var emitter: FlowableEmitter<Unit>
var windowSize: Long = 0
fun createEmitter(emitter: FlowableEmitter<Unit>) {
this.emitter = emitter
}
fun openWindowIfRequired(size: Long) {
windowSize += size
if (windowSize > 5) {
windowSize = 0
emitter.onNext(Unit)
}
}
}
val windowBoundary = Flowable.create<Unit>(windowManager::createEmitter, BackpressureStrategy.ERROR)
Flowable.interval(1, TimeUnit.SECONDS).window(windowBoundary).subscribe {
it.doOnNext {
windowManager.openWindowIfRequired(it)
}.doOnSubscribe {
println("Open window")
}.doOnComplete {
println("Close window")
}.subscribe {
println(it)
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)