使用 IObservable 进行批处理

2024-01-11

我的服务器端向我发送批量消息。批次中的消息数量和频率是任意的。有时,我每隔 1 分钟就会收到一条消息,有时一小时内都没有收到消息。 1 到 10 条消息。

我当前的实现使用Observable.Buffer(TimeSpan.FromSeconds(5))将消息分组并发送给订阅者。

有没有一种方法可以配置 Observable 来表示,如果两条消息之间有 x 秒的延迟,则将缓冲的消息发送给订阅者,而不是每 5 秒检查一次。

如何避免不必要的计时器每 5 秒计时一次? (我愿意接受其他优化批处理的建议。)


使用 bufferClosingSelector 工厂方法

decPL 建议使用重载Buffer接受一个bufferClosingSelector- 在打开新缓冲区时调用的工厂函数。它产生一个流,其第一个OnNext() or OnCompleted()信号刷新当前缓冲区。 decPL 代码如下所示:

observable.Buffer(() => observable.Throttle(TimeSpan.FromSeconds(5)))

这在解决方案方面取得了相当大的进展,但也存在一些问题:

  • 在限制持续时间内持续发布消息的活动期间,服务器不会发送消息。这可能会导致大量且不经常发布的列表。
  • 源有多个订阅;如果天气冷,可能会产生意想不到的副作用。这bufferClosingSelector工厂被称为each缓冲区关闭,因此如果源很冷,它将从初始事件而不是最近的事件中进行限制。

防止无限期节流

我们需要使用额外的机制来限制缓冲区长度并防止无限期的限制。Buffer有一个重载,允许您指定最大长度,但不幸的是您不能将其与关闭选择器结合使用。

让我们调用所需的缓冲区长度限制n。回忆一下第一个OnNext关闭选择器的 足以关闭缓冲区,所以我们需要做的就是Merge带有发送计数流的节流阀OnNext after n来自源头的事件。我们可以用.Take(n).LastAsync()去做这个;采取第一个n事件,但忽略除最后一个之外的所有事件。这是 Rx 中非常有用的模式。

让源头变得“热”

为了解决该问题bufferClosingSelector工厂重新订阅源,我们需要使用通用模式.Publish().RefCount()在源上为我们提供一个仅向订阅者发送最新事件的流。这也是一个非常有用的模式,需要记住。

Solution

这是修改后的代码,其中节流持续时间与计数器合并:

var throttleDuration = TimeSpan.FromSeconds(5);
var bufferSize = 3;

// single subscription to source
var sourcePub = source.Publish().RefCount();

var output = sourcePub.Buffer(
    () => sourcePub.Throttle(throttleDuration) 
                   .Merge(sourcePub.Take(bufferSize).LastAsync()));

生产就绪代码和测试

这是一个带有测试的生产就绪实现(使用 nuget 包 rx-testing 和 nunit)。请注意调度程序的参数化以支持测试。

public static partial class ObservableExtensions
{
    public static IObservable<IList<TSource>> BufferNearEvents<TSource>(
        this IObservable<TSource> source,
        TimeSpan maxInterval,
        int maxBufferSize,
        IScheduler scheduler)
    {
        if (scheduler == null) scheduler = ThreadPoolScheduler.Instance;
        if (maxBufferSize <= 0)
            throw new ArgumentOutOfRangeException(
                "maxBufferSize", "maxBufferSize must be positive");

        var publishedSource = source.Publish().RefCount();

        return publishedSource.Buffer(
            () => publishedSource
                .Throttle(maxInterval, scheduler)
                .Merge(publishedSource.Take(maxBufferSize).LastAsync()));
    }
}

public class BufferNearEventsTests : ReactiveTest
{
    [Test]
    public void CloseEventsAreBuffered()
    {
        TimeSpan maxInterval = TimeSpan.FromTicks(200);
        const int maxBufferSize = 1000;

        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(300, 3));

        IList<int> expectedBuffer = new [] {1, 2, 3};
        var expectedTime = maxInterval.Ticks + 300;

        var results = scheduler.CreateObserver<IList<int>>();

        source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
              .Subscribe(results);

        scheduler.AdvanceTo(1000);

        results.Messages.AssertEqual(
            OnNext<IList<int>>(expectedTime, buffer => CheckBuffer(expectedBuffer, buffer)));
    }

    [Test]
    public void FarEventsAreUnbuffered()
    {
        TimeSpan maxInterval = TimeSpan.FromTicks(200);
        const int maxBufferSize = 1000;

        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(1000, 1),
            OnNext(2000, 2),
            OnNext(3000, 3));

        IList<int>[] expectedBuffers =
        {
            new[] {1},
            new[] {2},
            new[] {3}
        };

        var expectedTimes = new[]
        {
            maxInterval.Ticks + 1000,
            maxInterval.Ticks + 2000,
            maxInterval.Ticks + 3000
        };  

        var results = scheduler.CreateObserver<IList<int>>();

        source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
              .Subscribe(results);

        scheduler.AdvanceTo(10000);

        results.Messages.AssertEqual(
            OnNext<IList<int>>(expectedTimes[0], buffer => CheckBuffer(expectedBuffers[0], buffer)),
            OnNext<IList<int>>(expectedTimes[1], buffer => CheckBuffer(expectedBuffers[1], buffer)),
            OnNext<IList<int>>(expectedTimes[2], buffer => CheckBuffer(expectedBuffers[2], buffer)));
    }

    [Test]
    public void UpToMaxEventsAreBuffered()
    {
        TimeSpan maxInterval = TimeSpan.FromTicks(200);
        const int maxBufferSize = 2;

        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(300, 3));

        IList<int>[] expectedBuffers =
        {
            new[] {1,2},
            new[] {3}
        };

        var expectedTimes = new[]
        {
            200, /* Buffer cap reached */
            maxInterval.Ticks + 300
        };

        var results = scheduler.CreateObserver<IList<int>>();

        source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
              .Subscribe(results);

        scheduler.AdvanceTo(10000);

        results.Messages.AssertEqual(
            OnNext<IList<int>>(expectedTimes[0], buffer => CheckBuffer(expectedBuffers[0], buffer)),
            OnNext<IList<int>>(expectedTimes[1], buffer => CheckBuffer(expectedBuffers[1], buffer)));
    }

    private static bool CheckBuffer<T>(IEnumerable<T> expected, IEnumerable<T> actual)
    {
        CollectionAssert.AreEquivalent(expected, actual);
        return true;
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 IObservable 进行批处理 的相关文章

  • 如何在 Visual Studio 2010 中增强 XAML 设计器?

    当我使用 XAML 设计器时 进入设计器和退出设计器是如此困难和缓慢 当我这样做时 Visual Studio 卡了一段时间 有什么方法可以增强 XAML 设计器和编辑器吗 Ant 保存 XAML 文件时非常慢 这通常意味着您可能有复杂的
  • 为 Visual Studio 2013 编译 Tesseract

    我正在尝试使用tesseract在 Visual Studio 2013 中 我在链接器 gt 输入 不是 libtesseract302 static lib 中使用 libtesseract302 lib 一切都正常 并且已编译并运行
  • 如何将 #ifdef DEBUG 添加到 Xcode?

    我的项目中有一些代码永远不应该在发布版本中使用 但在测试时很有用 我想做这样的事情 ifdef DEBUG Run my debugging only code endif 在 Xcode 4 中哪里添加 DEBUG 设置 我尝试将其放入
  • 单元测试一起运行时失败,单独运行时通过

    所以我的单元测试遇到了一些问题 我不能只是将它们复制并粘贴到这里 但我会尽力而为 问题似乎是 如果我一项一项地运行测试 一切都会按预期进行 但如果我告诉它一起运行测试 则 1 5 将通过 TestMethod public void Obj
  • 读取文件特定行号的有效方法。 (奖励:Python 手册印刷错误)

    我有一个 100 GB 的文本文件 它是来自数据库的 BCP 转储 当我尝试导入它时BULK INSERT 我在第 219506324 行上收到一个神秘错误 在解决此问题之前 我想看看这一行 但可惜的是我最喜欢的方法 import line
  • 如何从 .resx 文件条目获取注释

    资源文件中的字符串有名称 值和注释 The ResXResourceReader类让我可以访问名称和值 有办法看评论吗 你应该能够得到Comment via ResXDataNode class http msdn microsoft co
  • 生成(非常)大的非重复整数序列而不进行预洗牌

    背景 我编写了一个简单的媒体客户端 服务器 我想生成一个不明显的时间值 随从客户端到服务器的每个命令一起发送 时间戳中将包含相当多的数据 纳秒分辨率 即使它不是真正准确 因为现代操作系统中计时器采样的限制 等 我想做的 在 Linux 上
  • 如何在 C# 中定义文本框数组?

    您好 当我在 Windows 申请表上创建文本框时 我无法将其命名为 box 0 box 1 等 我这样做的目的是因为我想循环使用它们 其实我发现TextBox array firstTextBox secondTextBox 也有效
  • 单击 form2 上的按钮触发 form 1 中的方法

    我对 Windows 窗体很陌生 我想知道是否可以通过单击表单 2 中的按钮来触发表单 1 中的方法 我的表格 1 有一个组合框 我的 Form 2 有一个 保存 按钮 我想要实现的是 当用户单击表单 2 中的 保存 时 我需要检查表单 1
  • 将 Excel 导入到 Datagridview

    我使用此代码打开 Excel 文件并将其保存在 DataGridView 中 string name Items string constr Provider Microsoft Jet OLEDB 4 0 Data Source Dial
  • 如何将整数转换为 void 指针?

    在 C 中使用线程时 我面临警告 警告 从不同大小的整数转换为指针 代码如下 include
  • 如何使用 watin 中的 FileUploadDialogHandler 访问文件上传对话框

    我正在使用 IE8 和 watin 并尝试通过我的网页测试上传文件 我不能简单地使用 set 方法设置上传文件 例如 ie FileUpload Find ById someId Set C Desktop image jpg 因为上传文本
  • 如何使用 Mongodb C# 驱动程序连接多个集合

    我需要将 3 个集合与多个集合合并在一起 lookup我在 C 驱动程序中尝试过 它允许我 lookup用户采集但无法执行秒 lookup用于设置集合 有人可以帮忙吗 db Transactions aggregate lookup fro
  • 如何编写一个同时需要请求和响应Dtos的ServiceStack插件

    我需要提供本地化数据服务 所有本地化的响应 Dto 都共享相同的属性 IE 我定义了一个接口 ILocalizedDto 来标记那些 Dto 在请求端 有一个ILocalizedRequest对于需要本地化的请求 Using IPlugin
  • std::async 与重载函数

    可能的重复 std bind 重载解析 https stackoverflow com questions 4159487 stdbind overload resolution 考虑以下 C 示例 class A public int f
  • 如何从main方法调用业务对象类?

    我已将代码分为业务对象 访问层 如下所示 void Main Business object public class ExpenseBO public void MakeExpense ExpensePayload payload var
  • Process.Start() 方法在什么情况下返回 false?

    From MSDN https msdn microsoft com en us library e8zac0ca v vs 110 aspx 返回值 true 表示有新的进程资源 开始了 如果由 FileName 成员指定的进程资源 St
  • 如何在 C# 中调整图像大小同时保持高质量?

    我从这里找到了一篇关于图像处理的文章 http www switchonthecode com tutorials csharp tutorial image editing saving cropping and resizing htt
  • 防止在工厂方法之外实例化对象

    假设我有一个带有工厂方法的类 class A public static A newA Some code logging return new A 是否可以使用 a 来阻止此类对象的实例化new 那么工厂方法是创建对象实例的唯一方法吗 当
  • 如何正确使用 std::condition_variable?

    我很困惑conditions variables以及如何 安全 使用它们 在我的应用程序中 我有一个创建 gui 线程的类 但是当 gui 是由 gui 线程构造时 主线程需要等待 情况与下面的函数相同 主线程创建互斥体 锁和conditi

随机推荐