如何使用反应式扩展通过最大窗口大小来限制事件?

2024-02-29

Scenario:

我正在构建一个 UI 应用程序,每隔几毫秒从后端服务获取通知。一旦收到新的通知,我想尽快更新用户界面。

由于我可以在短时间内收到大量通知,并且我总是只关心最新的事件,因此我使用风门()反应式扩展框架的方法。这使我可以忽略紧接着新通知的通知事件,因此我的 UI 保持响应。

Problem:

假设我将通知事件的事件流限制为 50 毫秒,并且后端每 10 毫秒发送一次通知,Thottle() 方法将永远不会返回事件,因为它会一次又一次地重置其滑动窗口。在这里,我需要一些额外的行为来指定诸如超时之类的内容,以便在事件吞吐量如此高的情况下,我可以每秒至少检索一个事件。我如何使用反应式扩展来做到这一点?


正如詹姆斯所说,Observable.Sample将为您提供最新的产生值。但是,它将在计时器上执行此操作,而不是根据油门中第一个事件发生的时间。然而,更重要的是,如果您的采样时间很长(例如十秒),并且您的事件在采样后立即触发,那么您将在近十秒内不会获得该新事件。

如果您需要更严格的功能,则需要实现自己的函数。我冒昧地这样做了。这段代码肯定需要一些清理,但我相信它可以满足您的要求。

public static class ObservableEx
{
    public static IObservable<T> ThrottleMax<T>(this IObservable<T> source, TimeSpan dueTime, TimeSpan maxTime)
    {
        return source.ThrottleMax(dueTime, maxTime, Scheduler.Default);
    }

    public static IObservable<T> ThrottleMax<T>(this IObservable<T> source, TimeSpan dueTime, TimeSpan maxTime, IScheduler scheduler)
    {
        return Observable.Create<T>(o =>
        {
            var hasValue = false;
            T value = default(T);

            var maxTimeDisposable = new SerialDisposable();
            var dueTimeDisposable = new SerialDisposable();

            Action action = () =>
            {
                if (hasValue)
                {
                    maxTimeDisposable.Disposable = Disposable.Empty;
                    dueTimeDisposable.Disposable = Disposable.Empty;
                    o.OnNext(value);
                    hasValue = false;
                }
            };

            return source.Subscribe(
                x =>
                {
                    if (!hasValue)
                    {
                        maxTimeDisposable.Disposable = scheduler.Schedule(maxTime, action);
                    }

                    hasValue = true;
                    value = x;
                    dueTimeDisposable.Disposable = scheduler.Schedule(dueTime, action);
                },
                o.OnError,
                o.OnCompleted
            );
        });
    }
}

还有一些测试...

[TestClass]
public class ThrottleMaxTests : ReactiveTest
{
    [TestMethod]
    public void CanThrottle()
    {

        var scheduler = new TestScheduler();
        var results = scheduler.CreateObserver<int>();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1)
            );

        var dueTime = TimeSpan.FromTicks(100);
        var maxTime = TimeSpan.FromTicks(250);

        source.ThrottleMax(dueTime, maxTime, scheduler)
            .Subscribe(results);

        scheduler.AdvanceTo(1000);

        results.Messages.AssertEqual(
            OnNext(200, 1)
            );
    }

    [TestMethod]
    public void CanThrottleWithMaximumInterval()
    {

        var scheduler = new TestScheduler();
        var results = scheduler.CreateObserver<int>();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(175, 2),
            OnNext(250, 3),
            OnNext(325, 4),
            OnNext(400, 5)
            );

        var dueTime = TimeSpan.FromTicks(100);
        var maxTime = TimeSpan.FromTicks(250);

        source.ThrottleMax(dueTime, maxTime, scheduler)
            .Subscribe(results);

        scheduler.AdvanceTo(1000);

        results.Messages.AssertEqual(
            OnNext(350, 4),
            OnNext(500, 5)
            );
    }

    [TestMethod]
    public void CanThrottleWithoutMaximumIntervalInterferance()
    {
        var scheduler = new TestScheduler();
        var results = scheduler.CreateObserver<int>();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(325, 2)
            );

        var dueTime = TimeSpan.FromTicks(100);
        var maxTime = TimeSpan.FromTicks(250);

        source.ThrottleMax(dueTime, maxTime, scheduler)
            .Subscribe(results);

        scheduler.AdvanceTo(1000);

        results.Messages.AssertEqual(
            OnNext(200, 1),
            OnNext(425, 2)
            );
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用反应式扩展通过最大窗口大小来限制事件? 的相关文章

随机推荐

  • 如何使用 Windows API 捕获屏幕和鼠标指针?

    我使用下面的代码以位图形式捕获屏幕 屏幕已被捕获 但我无法将鼠标指针放在屏幕上 你能建议一些替代方法来捕获鼠标吗 private Bitmap CaptureScreen Size size is how big an area to ca
  • SQL 增加一个数字

    Problem 我想根据表格增加一个数字 例如 如果一个表包含 排 1 1 2 3 4 4 4 5 mytable 列应在此基础上增加 取上一列中的 max row 1 所以结果应该是这样的 6 6 7 8 9 9 9 10 这是到目前为止
  • 如何获取 PFX 密钥的容器名称?

    不久前 我使用如下命令将 PFX 密钥安装到容器中 sn i mykey pfx VS XXX 但两个月后我忘记了容器名称 VS XXX 所以我的问题是 如何取回比名字 我知道密钥名称 我有这个密钥 我知道密钥密码短语 下载Keypal h
  • 将材料设计与 VueJS 结合使用

    我正在使用 VueJs 构建一个 Web 应用程序 并且需要一个 css 框架来设计一些东西 而不是从头开始 我找到了material design lite www getmdl io 但我无法让它与vue router一起正常工作 我的
  • 跨多个范围的多个条件格式规则?

    我需要执行以下操作 如果单元格 E 包含大于 30 的数字且单元格 L 包含大于 100 的数字 则突出显示一行 此规则需要应用于所有行 你能帮忙吗 你应该使用条件格式 http office microsoft com en us exc
  • Electron:打开默认电子邮件客户端

    我正在使用选举框架来构建 exe 和 dmg 文件 在应用程序中 我有一个按钮 单击该按钮必须打开系统上安装的默认电子邮件应用程序才能发送电子邮件 以下是打开电子邮件客户端的代码 shell openExternal mailto emai
  • 本地硬重置后无法推送到原点

    我最近对本地 git 存储库进行了硬重置 换句话说 我将其重置为较早的时间点 现在当我尝试向上推时origin它告诉我不能 因为origin包含比我的存储库晚的工作 这是有道理的 但我不关心源在我的本地存储库之后所做的工作 如果我先pull
  • @protocol 与类簇

    那些主要是什么pro and contra for protocol and 类簇 http developer apple com library mac documentation Cocoa Conceptual CocoaFunda
  • 将 Jenkins 与 Gitlab 集成

    我需要在 Jenkins 中设置构建配置 以便每当触发构建时 我都会从 Gitlab 获取最新的脚本并将它们复制到目标系统并在目标上运行该脚本 我找不到任何将 Gitlab 集成到 Jenkins 的相关信息 有我可以使用的特定插件吗 我使
  • 检查哪些参数(组合)为空

    假设我有 4 个变量 String a String b String c String d 我想检查单个变量或变量组合是否不为空并采取相应的行动 例如 一种方法是使用 if else 这种方式 if a null b null c nul
  • VM 上的 Azure SQL Server 可以作为生产数据库吗?

    我试图找出 VM 上的 Azure SQL Server 和 Azure SQL Server 数据库之间的区别 我知道一个是 IaaS 另一个是 PaaS 服务 但有一件事我仍然不明白 哪一个可以用于开发 测试 哪一个可以用于生产 或者说
  • 在 python 中读取 csv 文件并将每个行项目作为脚本中的值进行迭代?

    编辑是因为我似乎太模糊或没有进行足够的研究 我很抱歉 这里是新手 我正在尝试读取 csv 文件并将每个新行分配为一个值 以迭代写入 API 的脚本 我的 csv 中没有标题数据 我将添加正则表达式搜索 然后使用正则表达式表达式后面的数据 并
  • Twitter Bootstrap - 模态格式的表单 - ASP.NET

    我有一个模式形式的表单 如果主窗口不太宽 标签位于字段上方 则该表单可以完美格式化 但是 如果浏览器最大化或足够大 它会更改模式内内容的布局 以便标签现在位于字段的左侧 这意味着每行一个单词 看起来非常混乱 知道我如何克服这种行为 以便类
  • 设置重复本地通知的日期

    我想设置从日期开始重复的本地通知 例如 开始日期 2018 年 6 月 25 日 今天日期 2018 年 6 月 21 日 我被困在这里了 下面的代码可以工作 但它从今天开始而不是从 2018 年 6 月 25 日开始触发本地通知 请看一下
  • 在 flutter 中更新我的应用程序时,您的 Android App Bundle 使用错误的密钥进行签名错误

    我只是不太明白为什么这种情况发生在我身上 我遵循了所有步骤https flutter dev docs deployment android https flutter dev docs deployment android上传我的第一个版
  • 在 Woocommerce 商店页面上获取产品变体图像

    我想在商店页面上显示产品变体图像 每个变体的特定图像 我使用下面的代码成功地获取了变体的名称 放入 content product php 中 不幸的是 里面什么也没有 colouvalues数组 它是变体图像 url 或与图像相关的任何内
  • CSS 三角形 + “之后”实施

    我尝试用 CSS 创建一个三角形 它看起来不错 但是我现在在一个盒子之后实现它时遇到了问题 看看我的例子 你就会明白我的意思 https jsfiddle net TTVuS https jsfiddle net TTVuS 好像是后面的三
  • 计算一个字符串在另一个字符串中出现的次数 (Perl)

    计算某个字符串在较大字符串中出现的次数的最快方法是什么 我最好的猜测是将该字符串的所有实例替换为空 计算长度差并除以子字符串的长度 但这似乎相当慢 而且我需要分析大量数据 您可以捕获字符串 然后对它们进行计数 可以通过将列表上下文应用于捕获
  • 为什么我在使用 Rspec 和 Capybara 编写测试用例时无法获取 current_user

    我必须为我的一个功能列表页面编写集成测试用例 并且该功能索引方法具有如下代码 def index food categories current user food categories end 现在 当我尝试为此编写测试用例时 它会抛出错
  • 如何使用反应式扩展通过最大窗口大小来限制事件?

    Scenario 我正在构建一个 UI 应用程序 每隔几毫秒从后端服务获取通知 一旦收到新的通知 我想尽快更新用户界面 由于我可以在短时间内收到大量通知 并且我总是只关心最新的事件 因此我使用风门 反应式扩展框架的方法 这使我可以忽略紧接着