Rx:如何立即响应并限制后续请求

2024-01-04

我想设置一个 Rx 订阅,它可以立即响应事件,然后忽略指定“冷却”期内发生的后续事件。

开箱即用的 Throttle/Buffer 方法仅在超时结束后才会响应,这并不是我所需要的。

这是一些设置场景并使用 Throttle 的代码(这不是我想要的解决方案):

class Program
{
    static Stopwatch sw = new Stopwatch();

    static void Main(string[] args)
    {
        var subject = new Subject<int>();
        var timeout = TimeSpan.FromMilliseconds(500);

        subject
            .Throttle(timeout)
            .Subscribe(DoStuff);

        var factory = new TaskFactory();
         
        sw.Start();

        factory.StartNew(() =>
        {
            Console.WriteLine("Batch 1 (no delay)");
            subject.OnNext(1);
        });

        factory.StartNewDelayed(1000, () =>
        {
            Console.WriteLine("Batch 2 (1s delay)");
            subject.OnNext(2);
        });
 
        factory.StartNewDelayed(1300, () =>
        {
            Console.WriteLine("Batch 3 (1.3s delay)");
            subject.OnNext(3);
        });

        factory.StartNewDelayed(1600, () =>
        {
            Console.WriteLine("Batch 4 (1.6s delay)");
            subject.OnNext(4);
        });

        Console.ReadKey();
        sw.Stop();
    }

    private static void DoStuff(int i)
    {
        Console.WriteLine("Handling {0} at {1}ms", i, sw.ElapsedMilliseconds);
    }
}

现在运行的输出是:

第 1 批(无延迟)

在 508ms 处处理 1

第 2 批(1 秒延迟)

第 3 批(1.3 秒延迟)

第 4 批(1.6 秒延迟)

在 2114ms 处处理 4

请注意,批次 2 未被处理(这很好!),因为由于节流的性质,我们在请求之间等待 500 毫秒。批次 3 也没有被处理(这不太好,因为它距离批次 2 发生了超过 500 毫秒),因为它接近批次 4。

我正在寻找的是更像这样的东西:

第 1 批(无延迟)

在 ~0ms 处处理 1

第 2 批(1 秒延迟)

在约 1000 秒内处理 2

第 3 批(1.3 秒延迟)

第 4 批(1.6 秒延迟)

在 ~1600s 处理 4

请注意,在这种情况下不会处理批次 3(这很好!),因为它发生在批次 2 的 500 毫秒内。

EDIT:

这是我使用的“StartNewDelayed”扩展方法的实现:

/// <summary>Creates a Task that will complete after the specified delay.</summary>
/// <param name="factory">The TaskFactory.</param>
/// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param>
/// <returns>A Task that will be completed after the specified duration.</returns>
public static Task StartNewDelayed(
    this TaskFactory factory, int millisecondsDelay)
{
    return StartNewDelayed(factory, millisecondsDelay, CancellationToken.None);
}

/// <summary>Creates a Task that will complete after the specified delay.</summary>
/// <param name="factory">The TaskFactory.</param>
/// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param>
/// <param name="cancellationToken">The cancellation token that can be used to cancel the timed task.</param>
/// <returns>A Task that will be completed after the specified duration and that's cancelable with the specified token.</returns>
public static Task StartNewDelayed(this TaskFactory factory, int millisecondsDelay, CancellationToken cancellationToken)
{
    // Validate arguments
    if (factory == null) throw new ArgumentNullException("factory");
    if (millisecondsDelay < 0) throw new ArgumentOutOfRangeException("millisecondsDelay");

    // Create the timed task
    var tcs = new TaskCompletionSource<object>(factory.CreationOptions);
    var ctr = default(CancellationTokenRegistration);

    // Create the timer but don't start it yet.  If we start it now,
    // it might fire before ctr has been set to the right registration.
    var timer = new Timer(self =>
    {
        // Clean up both the cancellation token and the timer, and try to transition to completed
        ctr.Dispose();
        ((Timer)self).Dispose();
        tcs.TrySetResult(null);
    });

    // Register with the cancellation token.
    if (cancellationToken.CanBeCanceled)
    {
        // When cancellation occurs, cancel the timer and try to transition to cancelled.
        // There could be a race, but it's benign.
        ctr = cancellationToken.Register(() =>
        {
            timer.Dispose();
            tcs.TrySetCanceled();
        });
    }

    if (millisecondsDelay > 0)
    {
        // Start the timer and hand back the task...
        timer.Change(millisecondsDelay, Timeout.Infinite);
    }
    else
    {
        // Just complete the task, and keep execution on the current thread.
        ctr.Dispose();
        tcs.TrySetResult(null);
        timer.Dispose();
    }

    return tcs.Task;
}

这是我的方法。它与之前的其他产品类似,但它不会遇到过度热心的窗口生产问题。

所需的功能很像Observable.Throttle但在合格事件到达后立即发出,而不是延迟节流或采样周期的持续时间。在排位赛结束后的给定时间内,后续赛事将被抑制。

作为可测试的扩展方法给出:

public static class ObservableExtensions
{
    public static IObservable<T> SampleFirst<T>(
        this IObservable<T> source,
        TimeSpan sampleDuration,
        IScheduler scheduler = null)
    {
        scheduler = scheduler ?? Scheduler.Default;
        return source.Publish(ps => 
            ps.Window(() => ps.Delay(sampleDuration,scheduler))
              .SelectMany(x => x.Take(1)));
    }
}

这个想法是使用重载Window使用创建非重叠窗口windowClosingSelector它使用由时移回来的源sampleDuration。因此,每个窗口将:(a)被其中的第一个元素关闭,并且(b)保持打开状态,直到允许新元素为止。然后我们只需从每个窗口中选择第一个元素。

Rx 1.x 版本

The Publish上面使用的扩展方法在 Rx 1.x 中不可用。这是一个替代方案:

public static class ObservableExtensions
{
    public static IObservable<T> SampleFirst<T>(
        this IObservable<T> source,
        TimeSpan sampleDuration,
        IScheduler scheduler = null)
    {
        scheduler = scheduler ?? Scheduler.Default;
        var sourcePub = source.Publish().RefCount();
        return sourcePub.Window(() => sourcePub.Delay(sampleDuration,scheduler))
                        .SelectMany(x => x.Take(1));
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Rx:如何立即响应并限制后续请求 的相关文章

  • 是否有与 posix_memalign 对应的 C++ 版本?

    当我打电话时posix memalign http man7 org linux man pages man3 posix memalign 3 html为类型的对象分配对齐的内存Foo在我的 C 代码中 我需要做一个reinterpret
  • 为什么在连接两个字符串时 Python 比 C 更快?

    目前我想比较 Python 和 C 用来处理字符串的速度 我认为 C 应该比 Python 提供更好的性能 然而 我得到了完全相反的结果 这是 C 程序 include
  • 使用 lambda 表达式注册类型

    我想知道如何在 UnityContainer 中实现这样的功能 container RegisterType
  • 如何捕获未发送到 stdout 的命令行文本?

    我在项目中使用 LAME 命令行 mp3 编码器 我希望能够看到某人正在使用什么版本 如果我只执行 LAME exe 而不带参数 我会得到 例如 C LAME gt LAME exe LAME 32 bits version 3 98 2
  • 在c#中执行Redis控制台命令

    我需要从 Redis 控制台获取 客户端列表 输出以在我的 C 应用程序中使用 有没有办法使用 ConnectionMultiplexer 执行该命令 或者是否有内置方法可以查找该信息 CLIENT LIST是 服务器 命令 而不是 数据库
  • ComboBox DataBinding 导致 ArgumentException

    我的几个类对象 class Person public string Name get set public string Sex get set public int Age get set public override string
  • IdentityServer 4 对它的工作原理感到困惑

    我阅读和观看了很多有关 Identity Server 4 的内容 但我仍然对它有点困惑 因为似乎有很多移动部件 我现在明白这是一个单独的项目 它处理用户身份验证 我仍然不明白的是用户如何注册它 谁存储用户名 密码 我打算进行此设置 Rea
  • 如何在C(Linux)中的while循环中准确地睡眠?

    在 C 代码 Linux 操作系统 中 我需要在 while 循环内准确地休眠 比如说 10000 微秒 1000 次 我尝试过usleep nanosleep select pselect和其他一些方法 但没有成功 一旦大约 50 次 它
  • 当一组凭据下的计划任务启动的进程在另一组凭据下运行另一个程序时,Windows 是否有限制

    所以我有一个简单的例子 其中我有应用程序 A 它对用户 X 本地管理员 有一些硬编码的凭据 然后它使用硬编码的绝对路径启动带有这些凭据的应用程序 B A 和 B 以及 dotnet 控制台应用程序 但是它们不与控制台交互 只是将信息写入文件
  • 从同一个类中的另一个构造函数调用构造函数

    我有一个带有两个构造函数的类 C 这是代码片段 public class FooBar public FooBar string s constructor 1 some functionality public FooBar int i
  • 从客户端访问 DomainService 中的自定义对象

    我正在使用域服务从 Silverlight 客户端的数据库中获取数据 在DomainService1 cs中 我添加了以下内容 EnableClientAccess public class Product public int produ
  • unordered_map 中字符串的 C++ 哈希函数

    看起来 C 标准库中没有字符串的哈希函数 这是真的 在任何 c 编译器上使用字符串作为 unordered map 中的键的工作示例是什么 C STL提供模板专业化 http en cppreference com w cpp string
  • File.AppendText 尝试写入错误的位置

    我有一个 C 控制台应用程序 它作为 Windows 任务计划程序中的计划任务运行 此控制台应用程序写入日志文件 该日志文件在调试模式下运行时会创建并写入应用程序文件夹本身内的文件 但是 当它在任务计划程序中运行时 它会抛出一个错误 指出访
  • C++ int 前面加 0 会改变整个值

    我有一个非常奇怪的问题 如果我像这样声明一个 int int time 0110 然后将其显示到控制台返回的值为72 但是当我删除前面的 0 时int time 110 然后控制台显示110正如预期的那样 我想知道两件事 首先 为什么它在
  • 打印大型 WPF 用户控件

    我有一个巨大的数据 我想使用 WPF 打印 我发现WPF提供了一个PrintDialog PrintVisual用于打印派生的任何 WPF 控件的方法Visual class PrintVisual只会打印一页 因此我需要缩放控件以适合页面
  • 实体框架中的“it”是什么

    如果以前有人问过这个问题 请原谅我 但我的任何搜索中都没有出现 它 我有两个数据库表 Person 和 Employee 对每个类型的表进行建模 例如 Employee is a Person 在我的 edmx 设计器中 我定义了一个实体
  • 如何在richtextbox中使用多颜色[重复]

    这个问题在这里已经有答案了 我使用 C windows 窗体 并且有 richtextbox 我想将一些文本设置为红色 一些设置为绿色 一些设置为黑色 怎么办呢 附图片 System Windows Forms RichTextBox有一个
  • GCC 的“-Wl,option”和“-Xlinker option”语法之间有区别吗?

    我一直在查看一些配置文件 并且看到它们都被使用 尽管在不同的体系结构上 如果您在 Linux 机器上使用 GCC 将选项传递给链接器的两种语法之间有区别吗 据我所知 阅读 GCC 手册时 他们的解释几乎相同 From man gcc Xli
  • Objective-C / C 给出枚举默认值

    我在某处读到过关于给枚举默认值的内容 如下所示 typedef enum MarketNavigationTypeNone 0 MarketNavigationTypeHeirachy 1 MarketNavigationTypeMarke
  • OpenCV SIFT 描述符关键点半径

    我正在深入研究OpenCV的SIFT描述符提取的实现 https github com Itseez opencv blob master modules nonfree src sift cpp 我发现了一些令人费解的代码来获取兴趣点邻域

随机推荐

  • 重试 Visual Studio C# 测试方法

    我很好奇是否有任何内置机制可以retry在 Visual Studio 2008 C 单元测试框架中进行测试 举个例子 我有一个 C 单元测试 如下所示 TestMethod public void MyMethod DoSomething
  • 从一个 dagger 2 模块如何访问另一个 dagger 2 模块中提供的 SharedPreferences

    从一个 dagger2 模块提供 SharedPreferences 后 在另一个 dagger2 模块中想要使用它 怎么做 下面的代码似乎不起作用 组件 Singleton Component modules arrayOf DataMa
  • Redis 6 可以利用多核 CPU 的优势吗?

    Since Redis 6支持多线程IO https redislabs com blog diving into redis 6 在超过2个核心的机器上部署Redis有意义吗 它是否能够利用额外的核心 或者 2 个核心仍然是理想的选择 一
  • 计算阿克曼函数的较大值

    我有一些代码 int CalculateAckermann int x int y if x return y if y return CalculateAckermann x 1 else return CalculateAckerman
  • 返回 Fortran 中不同长度的字符串数组

    我想创建一个类型来包含 Fortran 中的字符串数组 而无需显式分配长度 以便我可以从函数返回它 以下是我的类型 type returnArr Character dimension 4 array end type returnArr
  • 由于 JSON 中转义的单引号,jQuery.parseJSON 抛出“无效 JSON”错误

    我正在使用以下方式向我的服务器发出请求jQuery post 我的服务器正在返回 JSON 对象 例如 var value 但是 如果任何值包含单引号 正确转义 如 jQuery 无法解析有效的 JSON 字符串 这是我的意思的一个例子 在
  • Numpy 每行动态切片

    如何在不使用 for 循环的情况下动态地对给定开始和结束索引的每一行进行切片 我可以使用下面列出的循环来完成此操作 但是对于 x shape 0 gt 1 mill 的情况来说 它太慢了 x np arange 0 100 x x resh
  • 自动缩放 ImageIcon 以适应标签大小

    在我的 JFrame 上 我使用以下代码在面板上显示图像 ImageIcon img new ImageIcon res png jLabel setIcon img 我想 自动调整 标签中图片的大小 事实上 有时图像大小只有几个像素 有时
  • 清除命名图的最有效方法?

    我正在使用 Ontotext GraphDB 的一个实例 并且经常想要清除具有大量三元组的命名图 目前 我的技术涉及向图形服务器发出 SPARQL 命令 该命令搜索并匹配指定图形中每个三元组的三元组模式 DELETE GRAPH examp
  • 如何编写一个 clang 插件在编译时向原始代码注入一些代码

    我遇到了一个关于如何编写一个能够更改代码的 clang 插件的问题 我想向程序中注入一些代码 就像这样 在此处输入代码 the original code the filename is user code cpp int f1 retur
  • 现有应用程序的转换以兼容iphone5? [复制]

    这个问题在这里已经有答案了 可能的重复 如何开发或迁移适用于 iPhone 5 屏幕分辨率的应用程序 https stackoverflow com questions 12395200 how to develop or migrate
  • 如何在 activerecord 之外创建 activerecord 样式验证?

    我正在为我公司编写的软件开发一个测试框架 我们的产品是基于网络的 在运行 RESTful 请求后 我想处理结果 我希望能够在每个命令类中进行 activerecord 类型验证 以便在运行后 结果会自动针对所有 验证 进行测试 但是 我不知
  • 本地图像未在发布和 TestFlight 中渲染

    使用react native v0 63 4 并且所有本地图像在开发模式下加载良好
  • WPF - 从流加载字体?

    我有一个包含字体文件 ttf 内容的 MemoryStream 我希望能够从该流创建 FontFamily WPF 对象WITHOUT将流的内容写入磁盘 我知道这对于 System Drawing FontFamily 是可能的 但我不知道
  • Javascript背景颜色随着淡入淡出而闪烁

    我有一个 div 需要有持续闪烁的背景颜色 我想要它做的就是淡入透明的 to red并循环返回 我见过几个examples这样做 但它们都会影响整个内容div而不仅仅是background color 其他例子有闪烁background但这
  • UIWebView canGoBack 和 canGoForward 始终返回 NO

    我正在尝试将数据直接加载到UIWebView webView loadData data MIMEType text html textEncodingName utf 8 baseURL nil 数据是一些包含一些外部链接的html字符串
  • 去掉最后一个逗号

    当我的程序打印出整数 5 到 1 时 我需要帮助去掉最后一个逗号 int i for i 10 i gt 1 i if i 2 0 System out print i 2 它打印出来5 4 3 2 1 我想要打印出来5 4 3 2 1 一
  • System.Array 是否对值类型执行装箱?

    我最近做了一些粗略的性能测量List lt gt vs 对于一系列小型结构 System Array 似乎轻而易举地获胜了 所以我就这么做了 我才刚刚意识到 System Array 包含对象类型 所以用结构填充它肯定会导致装箱发生吗 然而
  • 如何在 Node.js 中使用 JQuery 选择器

    我正在尝试从硬盘驱动器中的 HTML 文件中提取电子邮件信息 如果我在 Firefox 中加载文件并运行 jQuerify bookmarklet 我可以成功使用以下选择器 函数 window jQuery a iEmail each fu
  • Rx:如何立即响应并限制后续请求

    我想设置一个 Rx 订阅 它可以立即响应事件 然后忽略指定 冷却 期内发生的后续事件 开箱即用的 Throttle Buffer 方法仅在超时结束后才会响应 这并不是我所需要的 这是一些设置场景并使用 Throttle 的代码 这不是我想要