从定期异步请求创建 observable

2023-11-24

我想要一种将异步方法转换为可观察方法的通用方法。就我而言,我正在处理使用的方法HttpClient从 API 获取数据。

假设我们有方法Task<string> GetSomeData()需要成为一个单一的Observable<string>其中值是作为以下组合生成的:

  • 重复定期调用GetSomeData()(例如每 x 秒)
  • 手动触发调用GetSomeData()在任何给定时间(例如当用户点击刷新时)。

由于有两种方法可以触发执行GetSomeData()并发可能是一个问题。为了避免这样的要求GetSomeData()是线程安全的,我想限制并发性,以便只有一个线程同时执行该方法。因此,我需要使用某种策略来处理重叠的请求。我制作了一个(某种)弹珠图,试图描述问题和想要的结果

marble diagram

我的直觉告诉我有一个简单的方法可以实现这一目标,所以请给我一些见解:)

这是我到目前为止得到的解决方案。不幸的是它并没有解决并发问题。

    public class ObservableCreationWrapper<T>
    {
        private Subject<Unit> _manualCallsSubject = new Subject<Unit>();
        private Func<Task<T>> _methodToCall;
        private IObservable<T> _manualCalls;

        public IObservable<T> Stream { get; private set; }

        public ObservableCreationWrapper(Func<Task<T>> methodToCall, TimeSpan period)
        {
            _methodToCall = methodToCall;
            _manualCalls = _manualCallsSubject.AsObservable()
                .Select(x => Observable.FromAsync(x => methodToCall()))
                .Merge(1);

            Stream = Observable.FromAsync(() => _methodToCall())
                .DelayRepeat(period)
                .Merge(_manualCalls);
        }

        public void TriggerAdditionalCall()
        {
            _manualCallsSubject.OnNext(Unit.Default);
        }
    }

延迟重复的扩展方法:

static class Extensions
{
    public static IObservable<T> DelayRepeat<T>(this IObservable<T> source, TimeSpan delay) => source
        .Concat(
            Observable.Create<T>(async observer =>
            {
                await Task.Delay(delay);
                observer.OnCompleted();
            }))
        .Repeat();
}

包含生成可观察值的方法的服务示例

class SomeService
{
    private int _ticks = 0;

    public async Task<string> GetSomeValueAsync()
    {
        //Just a hack to dermine if request was triggered manuall or by timer
        var initiatationWay = (new StackTrace()).GetFrame(4).GetMethod().ToString().Contains("System.Threading.CancellationToken") ? "manually" : "by timer";

        //Here we have a data race! We would like to limit access to this method 
        var valueToReturn = $"{_ticks} ({initiatationWay})";

        await Task.Delay(500);
        _ticks += 1; 
        return valueToReturn;
    }
}

像这样使用(会发生数据竞争):

static async Task Main(string[] args)
{
    //Running this program will yield non deterministic results due to data-race in GetSomeValueAsync
    var someService = new SomeService();
    var stopwatch = Stopwatch.StartNew();
    var observableWrapper = new ObservableCreationWrapper<string>(someService.GetSomeValueAsync, TimeSpan.FromMilliseconds(2000));
    observableWrapper.Stream
        .Take(6)
        .Subscribe(x => 
            {
                Console.WriteLine($"{stopwatch.ElapsedMilliseconds} | Request: {x} fininshed");
            });

    await Task.Delay(4000);
    observableWrapper.TriggerAdditionalCall();
    observableWrapper.TriggerAdditionalCall();
    Console.ReadLine();
}

这是我对这个问题的看法:


Update:通过借鉴 Enigmativity 的想法,我能够大大简化我建议的解决方案answer. The Observable.StartAsync方法自动处理取消的混乱业务,并且可以简单地通过使用SemaphoreSlim.

/// <summary>
/// Creates an observable sequence containing the results of an asynchronous
/// function that is invoked periodically and manually. Overlapping invocations
/// are prevented. Timer ticks that would cause overlapping are ignored.
/// Manual invocations cancel previous invocations, and restart the timer.
/// </summary>
public static IObservable<T> PeriodicAndManual<T>(
    Func<bool, CancellationToken, Task<T>> functionAsync,
    TimeSpan period,
    out Action manualInvocation)
{
    // Arguments validation omitted
    var manualSubject = new Subject<bool>();
    manualInvocation = () => manualSubject.OnNext(true);
    return Observable.Defer(() =>
    {
        var semaphore = new SemaphoreSlim(1, 1); // Ensure no overlapping
        return Observable
            .Interval(period)
            .Select(_ => false) // Not manual
            .Merge(manualSubject)
            .TakeUntil(isManual => isManual) // Stop on first manual
            .Repeat() // ... and restart the timer
            .Prepend(false) // Skip the initial interval delay
            .Select(isManual =>
            {
                if (isManual)
                {
                    // Triggered manually
                    return Observable.StartAsync(async ct =>
                    {
                        await semaphore.WaitAsync(ct);
                        try { return await functionAsync(isManual, ct); }
                        finally { semaphore.Release(); }
                    });
                }
                else if (semaphore.Wait(0))
                {
                    // Triggered by the timer and semaphore acquired synchronously
                    return Observable
                        .StartAsync(ct => functionAsync(isManual, ct))
                        .Finally(() => semaphore.Release());
                }
                return null; // Otherwise ignore the signal
            })
            .Where(op => op != null)
            .Switch(); // Pending operations are unsubscribed and canceled
    });
}

The out Action manualInvocation参数是触发手动调用的机制。

使用示例:

int ticks = 0;
var subscription = PeriodicAndManual(async (isManual, token) =>
{
    var id = $"{++ticks} " + (isManual ? "manual" : "periodic");
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Begin {id}");
    await Task.Delay(500, token);
    return id;
}, TimeSpan.FromMilliseconds(1000), out var manualInvocation)
.Do(x => Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Received #{x}"))
.Subscribe();

await Task.Delay(3200);
manualInvocation();
await Task.Delay(200);
manualInvocation();
await Task.Delay(3200);

subscription.Dispose();

Output:

19:52:43.684 Begin 1 periodic
19:52:44.208 Received #1 periodic
19:52:44.731 Begin 2 periodic
19:52:45.235 Received #2 periodic
19:52:45.729 Begin 3 periodic
19:52:46.232 Received #3 periodic
19:52:46.720 Begin 4 periodic
19:52:46.993 Begin 5 manual
19:52:47.220 Begin 6 manual
19:52:47.723 Received #6 manual
19:52:48.223 Begin 7 periodic
19:52:48.728 Received #7 periodic
19:52:49.227 Begin 8 periodic
19:52:49.730 Received #8 periodic
19:52:50.226 Begin 9 periodic

使用技术ScanDistinctUntilChanged操作符是为了在上一个异步操作运行时删除元素,借用自this问题。

¹ It seems that the Rx library does not handle this messy business satisfactory though, since it just omits disposing of the CancellationTokenSources it creates.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从定期异步请求创建 observable 的相关文章

  • MVC Core IActionResult 含义

    什么是IActionResult 我尝试查看 MSDN 和其他网站 但需要通用 常见 易于理解的答案 MSDN IActionResult https learn microsoft com en us dotnet api microso
  • C++ 模板中的名称查找

    我有一些 C 代码 如果没有 fpermissive 选项 就无法再编译 这是我无法分享的专有代码 但我认为我已经能够提取一个简单的测试用例来演示该问题 这是 g 的输出 template eg cpp In instantiation o
  • gtest 和 gmock 有什么区别?

    我试图理解的目的google mock Google 的 C 模拟框架 https github com google googletest blob master googlemock README md 我已经与gtest较早 但我还是
  • 泛型与接口的实际优势

    在这种情况下 使用泛型与接口的实际优势是什么 void MyMethod IFoo f void MyMethod
  • 使用静态类型代替变量

    当您的项目不使用命名空间时 有什么方法可以告诉编译器使用静态类型而不是变量吗 例如 我有一个名为 User 的类 它具有各种静态和非静态方法 假设调用了其中一个静态方法GetUser 我想称之为User GetUser 方法来自一个方法 该
  • C++ 非类型参数包扩展

    我正在编写由单一类型参数化的模板函数 并且具有可变数量的相同类型 而不是不同类型 的参数 它应该检查第一个值是否在其余值中 我想这样写 include
  • 有没有办法使 C90 标准中的枚举无符号? (符合 MISRA-C 2004 标准)

    我正在尝试找到一种使枚举 无符号 的方法 enum x1 0 x2 x3 uint8 t x2 lt PC LINT MISRA C 2004 will complain about mixing signed and unsigned h
  • 使用成员作为实现者来实现接口

    我有实现 IA 的 A 类 现在我需要创建也应该实现 IA 的类 B B 类有 A 类的实例作为成员 有什么方法可以定义A的实例实现B类中的IA吗 interfase IA void method1 void method2 void me
  • 枚举器上的 [[maybe_unused]]

    查看规格 maybe unused http en cppreference com w cpp language attributes 它指出 出现在类 typedef 变量 非静态数据成员 函数 枚举或枚举器的声明中 如果编译器对未使用
  • 在 ncurses 中使用退格键

    我设置了一个简单的 ncurses 程序 它使用 getch 一次读取一个字符并将它们复制到缓冲区中 我遇到的问题是检测到按下退格键 这是相关代码 while buffer i c getch EOF i if c n break else
  • C# 的空条件委托调用线程安全吗? [复制]

    这个问题在这里已经有答案了 这就是我一直以来编写事件引发者的方式 例如属性更改 public event PropertyChangedEventHandler PropertyChanged private void RaisePrope
  • 如何使用 libpq 获取双精度值?

    The examples http www postgresql org docs 9 3 interactive libpq example htmllibpq 文档中展示了如何通过将整数值转换为主机字节序表示来获取整数值 我很好奇必须做
  • 如何解释“错误C2018:未知字符'0x40'?[关闭]

    Closed 这个问题需要调试细节 help minimal reproducible example 目前不接受答案 在编译一些代码时 我收到以下信息 错误 C2018 未知字符 0x40 我想知道如何解决这样的问题 这是我要开始的地方
  • C 中什么函数可以替换字符串中的子字符串?

    给定一个 char 字符串 我想查找所有出现的子字符串并将其替换为备用字符串 我没有看到任何简单的函数可以实现这一点
  • 需要使用 openssl 加密和解密文件的示例 C 代码

    我正在用 Linux C 编写代码 我需要使用以下命令来加密和解密文件 openssl 目前 我使用系统命令 des3 e nosalt k 0123456789012345 in inp file out out file 进行加密 使用
  • win32 API 和 .NET 框架之间的选择

    我必须开发一个适用于 Windows 的应用程序 该应用程序将能够通过网络摄像头识别手势来控制鼠标 我将使用 vc 2008 进行开发 但我很困惑是使用 NET 框架还是核心 win32 API 性能对于我的应用程序非常重要 根据 Ivor
  • double 类型的静态类成员的常量表达式初始值设定项

    在 C 11 和 C 14 中 为什么我需要constexpr在下面的代码片段中 class Foo static constexpr double X 0 75 而这会产生编译器错误 class Foo static const doub
  • 如何将 Metro 应用部署到桌面?

    我正在尝试将我的 C 应用程序部署到我的 Windows 8 Metro 桌面 我可以在 bin 文件夹中看到部署的文件 但是当我尝试打开它们时 出现以下错误 该应用程序只能在 AppContainer 的上下文中运行 我检查了属性上下文菜
  • 致命错误 C1001:编译器中发生内部错误(编译器文件“msc1.cpp”,第 1325 行)

    当我编译代码时 错误指向以下类 该错误在两行上突出显示 如下所示 tm validFrom tm validUntil struct t SslCertData final struct t Contact TCHAR Organizati
  • gjs 如何使用 g_data_input_stream_read_line_async 在 Gnome Shell 扩展中读取套接字流

    我正在尝试编写一个 Gnome Shell 扩展 通过 Socket 服务器与 Arduino 进行通信 服务器和 Arduino 运行良好 但我陷入了监听传入服务器消息的扩展代码 因为我需要一种非阻塞方法 所以使用异步读取行 https

随机推荐

  • Java int 到 byte 的隐式转换

    我即将开始处理需要读取字节和创建字符串的事情 正在读取的字节表示 UTF 16 字符串 因此 为了测试一下 我想将 UTF 16 编码的简单字节数组转换为字符串 数组中的前 2 个字节必须表示字节顺序 因此必须是 0xff 0xfe 或 0
  • 告诉 gcc 专门展开循环

    我如何告诉 GCC 展开特定循环 我使用了 CUDA SDK 可以使用以下命令手动展开循环 pragma unroll gcc 有类似的功能吗 我用谷歌搜索了一下但找不到任何东西 GCC 8 获得了一个新的编译指示 允许您控制循环展开的方式
  • .gitattributes 中的 `* text=auto` 和 `* text eol=lf` 有什么区别?

    我一遍又一遍地查看文档 gitattributes但我无法找到关于这两者之间有什么区别的明确答案 text auto text eol lf Also is text auto仅适用于 或者它也可以与特定的扩展一起使用 在这种情况下有什么区
  • 绝对定位及其父元素

    我总是听说 当您使用绝对定位时 您想要充当其父级的元素需要有一个position of relative 我试图构建一个 CSS 下拉菜单 当我将其父元素设置为时 我正在努力让下拉菜单项拉伸超出主菜单项的宽度relative 下拉菜单项中的
  • 用户不活动时屏幕变暗

    我有一个应用程序 可以使用 4 个小时 但用户只需每 5 分钟需要进行一次输入或阅读屏幕 让手机进入睡眠状态并锁定屏幕有点烦人 所以我有两个选择 getWindow addFlags WindowManager LayoutParams F
  • svg 到 png 不起作用,怀疑 svg 元素差异

    我无法弄清楚为什么两个不同的 svg 会导致我的 javascript 在一个实例中工作 但在另一个实例中不起作用 我只交换了两个示例中的 svg 元素 一个有效 一个无效 这是两个 jsFiddles 中的代码 我从中得到的工作示例her
  • 在 Groovy/Java 中比较两个 XML 字符串/文件

    我正在编写单元测试来检查一些 XML 构建器 现在我遇到了预期结果和实际结果之间的语法差异问题 尽管它们的语义相同 Example 预期结果 您可以使用 GroovyXMLUnit像这样的实用程序 XMLUnit setIgnoreWhit
  • 如何根据选定的注释移动 MKMapView

    我有一个 MKMapView 它填充了我的整个视图 但是当选择一个图钉时 我会在地图顶部向上滑动另一个视图 我想移动地图 以便图钉出现在地图可见区域的中心 很难解释 但希望它是有道理的 提前致谢 您可以尝试从visibleMapRect对于
  • 将 jquery 函数包装在闭包中有什么好处?

    嗨 我一直忙于将我的 JQuery 知识提升到一个新的水平 到目前为止 我认为我已经理解了所有内容 但是当我冒险学习更高级的教程时 我注意到有几个实例 其中 JQuery 例程被包装在一个闭包 见下文 然而 让我困惑的是它传递一个 并返回
  • SQL 将结果连接到 codeigniter 中的对象中

    好的 一些背景知识 刚刚进入 codeigniter 不喜欢 sql 和服务器端脚本 我知道什么是连接 我第一次拥有多对多数据库 这是因为连接通常会产生以下示例 但我想解析它 而不必构建代码来忽略重复 这是一个 3 表连接示例 当我加入更多
  • 合并同一项目的两个git存储库

    我目前是唯一一位从事我从前任接手的项目的开发人员 当我接手这个项目时 它还没有受到源代码控制 因此 我创建了一个新的 git 存储库 对状态进行了初始提交 并从此开始对其进行处理 但最近我在备份中发现了同一个项目的一个古老版本 它实际上是一
  • 通过Windows C++让鼠标通过

    我正在开发一个 Win32 C 应用程序 我想忽略鼠标事件并让其传递到我的窗口下方的窗口 基本上我下面的窗口将处理鼠标事件 我不想使用 SendMessage 将鼠标消息发送到我下面的窗口或使用 SetCapture 有没有一种方法可以基本
  • 在 Dask 中排序

    我想找到替代方案pandas dataframe sort value在 dask 中运行 我走过来了设置索引 但它会按单个列排序 如何对 Dask 数据框的多列进行排序 目前为止Dask似乎还不支持多列排序 但是 创建一个新列来连接已排序
  • 找不到文件:mainwindow.obj

    我创建了一个 GUI 应用程序 gt QMainWindow 我在菜单 插槽中添加了 1 项 我创建了一个新项目 gt QDialog 我使用插槽方法尝试显示创建的对话框 但出现以下错误 mainwindow obj 1 错误 LNK201
  • 有没有办法了解“平台”访问网页的硬件资源?

    我希望能够从网页中了解浏览器的硬件资源 或者至少是一个粗略的估计 即使您检测到现代技术的存在 例如csstransforms3d csstransitions requestAnimationFrame 在浏览器中通过类似的工具Modern
  • Python,多线程太慢,多进程

    我是多处理新手 我了解一些有关线程的知识 但我需要提高计算速度 希望通过多重处理 示例说明 将字符串发送到线程 更改字符串 基准测试 将结果发回打印 from threading import Thread class Alter Thre
  • 将多维Json数组解析为Python

    我第一次尝试解析 JSON 并处理多维数组 这让我不知所措 secret Hidden minutes 20 link http www 1 com bookmark collection free link name free link
  • 在 Guice 中绑定,无需

    我有个问题 通常在Guice中我使用bind class to 另一个类实现 但是我在代码源中发现他们仅使用了bind class 没有 to another class Implementation 部分 这是什么意思 bind clas
  • 如何用关系代数求 MAX?

    使用数据库时 如何使用关系代数求 MAX 假设您有一个关系 A 具有单个属性 a 减少一个更复杂的关系是关系代数中的一个简单任务 我确信您已经做到了这一点 所以现在您想要找到最大值A 中的值 一种方法是找到 A 与其自身的叉积 请务必重命名
  • 从定期异步请求创建 observable

    我想要一种将异步方法转换为可观察方法的通用方法 就我而言 我正在处理使用的方法HttpClient从 API 获取数据 假设我们有方法Task