使用反应式扩展按组缓冲,嵌套订阅

2024-05-12

我有一个事件源,它生成属于某些组的事件。我想缓冲这些组并将这些组(批量)发送到存储。到目前为止我有这个:

eventSource
    .GroupBy(event => event.GroupingKey)
    .Select(group => new { group.Key, Events = group })
    .Subscribe(group => group.Events
                            .Buffer(TimeSpan.FromSeconds(60), 100)
                            .Subscribe(list => SendToStorage(list)));

因此,有一个对组中事件的嵌套订阅。不知怎的,我认为有更好的方法,但我还没有弄清楚。


这是解决方案:

eventSource
    .GroupBy(e => e.GroupingKey)
    .SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100))
    .Subscribe(list => SendToStorage(list));

以下是一些可以帮助您“减少”的一般规则:

1) 嵌套订阅通常固定为Select嵌套订阅之前的所有内容,后跟Merge,然后是嵌套订阅。所以应用它,你会得到这个:

eventSource
    .GroupBy(e => e.GroupingKey)
    .Select(group => new { group.Key, Events = group })
    .Select(group => group.Events.Buffer(TimeSpan.FromSeconds(60), 100)) //outer subscription selector
    .Merge()
    .Subscribe(list => SendToStorage(list));

2)您显然可以组合两个连续的选择(并且由于您没有对匿名对象执行任何操作,因此可以将其删除):

eventSource
    .GroupBy(e => e.GroupingKey)
    .Select(group => group.Buffer(TimeSpan.FromSeconds(60), 100)) 
    .Merge()
    .Subscribe(list => SendToStorage(list));

3)最后,一个Select随后是一个Merge可以减少到SelectMany:

eventSource
    .GroupBy(e => e.GroupingKey)
    .SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100))
    .Subscribe(list => SendToStorage(list));
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用反应式扩展按组缓冲,嵌套订阅 的相关文章

随机推荐