好的,这应该可以。迟到的答案总比没有好。我认为没有比你使用更好的方法了Buffer
运营商。
从本质上讲,问题是一个状态机问题,这意味着您想要一个Scan
解决方案。问题是,您有两个不同的来源可以改变您的状态:新项目和超时。Scan
并不真正适用于两个多个源,因此我们必须以某种方式将这两种事件类型合并为一种。
I did 相似的东西之前与受歧视的工会,这个概念应该在这里起作用。首先是解决方案(使用 Nuget 包System.Collections.Immutable
):
public static class X
{
public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source, Func<TSource, int> sizeSelector, int maxSize, TimeSpan bufferTimeSpan)
{
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit()); //our time-out mechanism
return source
.Publish(_source => _source
.Union(queue.Delay(bufferTimeSpan))
.ScanUnion(
(list: ImmutableList<TSource>.Empty, size: 0, emitValue: (ImmutableList<TSource>)null),
(state, item) =>
{ // item handler
var itemSize = sizeSelector(item);
var newSize = state.size + itemSize;
if (newSize > maxSize)
{
queue.OnNext(Unit.Default);
return (ImmutableList<TSource>.Empty.Add(item), itemSize, state.list);
}
else
return (state.list.Add(item), newSize, null);
},
(state, _) =>
{ // time out handler
queue.OnNext(Unit.Default);
return (ImmutableList<TSource>.Empty, 0, state.list);
}
)
.Where(t => t.emitValue != null)
.Select(t => t.emitValue.ToList())
.TakeUntil(_source.IgnoreElements().Delay(bufferTimeSpan).Materialize())
);
}
}
解释:Union
将两个不同类型的流合并为一个流,其中项目可以是类型 A 或类型 B。ScanUnion
就像Scan
,但提供了两个函数来处理两种不同类型的项目。
The BehaviorSubject
每当新的缓冲区窗口打开时就会被命中,Delay
操作员确保Scan
在定义的时间跨度后获取它。里面的状态Scan
保存当前缓冲项目的列表和“大小”。这emitValue
当缓冲区窗口关闭时使用,并传递值。
这是受歧视联盟帮助程序代码:
public static class DUnionExtensions
{
public class DUnion<T1, T2>
{
public DUnion(T1 t1)
{
Type1Item = t1;
Type2Item = default(T2);
IsType1 = true;
}
public DUnion(T2 t2, bool ignored) //extra parameter to disambiguate in case T1 == T2
{
Type2Item = t2;
Type1Item = default(T1);
IsType1 = false;
}
public bool IsType1 { get; }
public bool IsType2 => !IsType1;
public T1 Type1Item { get; }
public T2 Type2Item { get; }
}
public static IObservable<DUnion<T1, T2>> Union<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
{
return a.Select(x => new DUnion<T1, T2>(x))
.Merge(b.Select(x => new DUnion<T1, T2>(x, false)));
}
public static IObservable<TState> ScanUnion<T1, T2, TState>(this IObservable<DUnion<T1, T2>> source,
TState initialState,
Func<TState, T1, TState> type1Handler,
Func<TState, T2, TState> type2Handler)
{
return source.Scan(initialState, (state, u) => u.IsType1
? type1Handler(state, u.Type1Item)
: type2Handler(state, u.Type2Item)
);
}
}