您可以使用window
and share
可观察源。还有一个小技巧bufferCount(2, 1)
:
const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();
source
.bufferCount(2, 1) // delay emission by one item
.map(arr => arr[0])
.window(source
.bufferCount(2, 1) // keep the previous and current item
.filter(([oldValue, newValue]) => oldValue !== newValue)
)
.concatMap(obs => obs.toArray())
.subscribe(console.log);
这打印(因为toArray()
):
[ 'a', 'a', 'a', 'a', 'a' ]
[ 'b', 'b', 'b', 'b' ]
[ 'c', 'c', 'c', 'c' ]
[ 'd', 'd', 'd' ]
[ 'e' ]
该解决方案的问题在于订阅的顺序source
。我们需要window
通知者在第一个之前订阅bufferCount
。否则,首先将一个项目进一步推送,然后检查它是否与前一个项目不同.filter(([oldValue, newValue]) ...)
.
这意味着之前需要将发射延迟一window
(这是第一个.bufferCount(2, 1).map(arr => arr[0])
.
或者也许我自己控制订阅顺序更容易publish()
:
const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();
const connectable = source.publish();
connectable
.window(source
.bufferCount(2, 1) // keep the previous and current item
.filter(([oldValue, newValue]) => oldValue !== newValue)
)
.concatMap(obs => obs.toArray())
.subscribe(console.log);
connectable.connect();
输出是相同的。