按时间或运行总和进行缓冲以进行反应式扩展

2023-12-15

我对反应式扩展相当陌生,想要根据时间缓冲流,或者根据不超过阈值的运行总和(每个项目的大小由 lambda 指定)以先发生者为准,就像现有的Buffer按计数或时间。

目前我已经编写了自己的实现Buffer方法按预期工作,使用IScheduler用于触发超时,然后管理内存中自己的缓冲区,并在累积总和超过阈值时发出它们,但这感觉有点低级,我认为必须有一个更优雅的解决方案来使用现有的反应操作来表达它某种方式,也许使用TBufferClosing超载的Buffer反而。

到目前为止,我想出的最佳解决方案如下,但它的缺点是包含最后一项,导致阈值导致总和大于请求的最大总和:

    public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source, Func<TSource, int> sizeSelector, int maxSize, TimeSpan bufferTimeSpan) 
    {
        var shared = source.Publish().RefCount();
    
        return shared.Buffer(() => Observable.Amb(
            Observable.Timer(timeSpan)
                .Select(_ => Unit.Default),
            shared.Select(sizeSelector)
                .Scan((a, b) => a + b)
                .SkipWhile(accumulated => accumulated < maxSize)
                .Select(_ => Unit.Default))
            );
    }

这是否可以与现有的操作员合作(通过调整我的上面的版本或完全以其他方式),或者我是否被迫保留我的自定义Buffer自己实现处理计时器和缓冲区?


好的,这应该可以。迟到的答案总比没有好。我认为没有比你使用更好的方法了Buffer运营商。

从本质上讲,问题是一个状态机问题,这意味着您想要一个Scan解决方案。问题是,您有两个不同的来源可以改变您的状态:新项目和超时。Scan并不真正适用于两个多个源,因此我们必须以某种方式将这两种事件类型合并为一种。

I did 相似的东西之前与受歧视的工会,这个概念应该在这里起作用。首先是解决方案(使用 Nuget 包System.Collections.Immutable):

public static class X
{
    public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source, Func<TSource, int> sizeSelector, int maxSize, TimeSpan bufferTimeSpan)
    {
        BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit()); //our time-out mechanism

        return source
            .Publish(_source => _source
                .Union(queue.Delay(bufferTimeSpan))
                .ScanUnion(
                    (list: ImmutableList<TSource>.Empty, size: 0, emitValue: (ImmutableList<TSource>)null),
                    (state, item) =>
                    { // item handler
                        var itemSize = sizeSelector(item);
                        var newSize = state.size + itemSize;
                        if (newSize > maxSize)
                        {
                            queue.OnNext(Unit.Default);
                            return (ImmutableList<TSource>.Empty.Add(item), itemSize, state.list);
                        }
                        else
                            return (state.list.Add(item), newSize, null);
                    },
                    (state, _) =>
                    { // time out handler
                        queue.OnNext(Unit.Default);
                        return (ImmutableList<TSource>.Empty, 0, state.list);
                    }
                )
                .Where(t => t.emitValue != null)
                .Select(t => t.emitValue.ToList())
                .TakeUntil(_source.IgnoreElements().Delay(bufferTimeSpan).Materialize())
        );
    }
}

解释:Union将两个不同类型的流合并为一个流,其中项目可以是类型 A 或类型 B。ScanUnion就像Scan,但提供了两个函数来处理两种不同类型的项目。

The BehaviorSubject每当新的缓冲区窗口打开时就会被命中,Delay操作员确保Scan在定义的时间跨度后获取它。里面的状态Scan保存当前缓冲项目的列表和“大小”。这emitValue当缓冲区窗口关闭时使用,并传递值。

这是受歧视联盟帮助程序代码:

public static class DUnionExtensions
{
    public class DUnion<T1, T2>
    {
        public DUnion(T1 t1)
        {
            Type1Item = t1;
            Type2Item = default(T2);
            IsType1 = true;
        }

        public DUnion(T2 t2, bool ignored) //extra parameter to disambiguate in case T1 == T2
        {
            Type2Item = t2;
            Type1Item = default(T1);
            IsType1 = false;
        }

        public bool IsType1 { get; }
        public bool IsType2 => !IsType1;

        public T1 Type1Item { get; }
        public T2 Type2Item { get; }
    }

    public static IObservable<DUnion<T1, T2>> Union<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
    {
        return a.Select(x => new DUnion<T1, T2>(x))
            .Merge(b.Select(x => new DUnion<T1, T2>(x, false)));
    }

    public static IObservable<TState> ScanUnion<T1, T2, TState>(this IObservable<DUnion<T1, T2>> source,
            TState initialState,
            Func<TState, T1, TState> type1Handler,
            Func<TState, T2, TState> type2Handler)
    {
        return source.Scan(initialState, (state, u) => u.IsType1
            ? type1Handler(state, u.Type1Item)
            : type2Handler(state, u.Type2Item)
        );
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

按时间或运行总和进行缓冲以进行反应式扩展 的相关文章

  • 使用 #pragma Once 和 #ifndef 时出现 VS 2010 C++ LNK2005 错误

    1 gt Deck obj error LNK2005 class Card card card 3VCard A already defined in Card obj 1 gt PokerTester obj error LNK2005
  • 我的 std::hash for std::tuples...有什么改进吗? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 有些人可能已经注意到 std hash 不支持元组 所以我添加了一个重载 它看起来比我到目前为止看到的解决方案 更好 有人有进一步减少这段代码的
  • getline 之后返回到文件开头

    所以我已经从文件中读取了所有行 while getline ifile line logic 其中 ifile 是 ifstream line 是字符串 我的问题是我现在想再次使用 getline 并且似乎无法返回到文件的开头 因为运行 c
  • strtok() 使用安全吗[重复]

    这个问题在这里已经有答案了 我读到了很多负面的东西strtok 有人说它已经过时 有人说它不是线程安全的 等等 那么真相是什么 我可以使用吗strtok 它是线程安全的吗 Note 我正在使用 Visual C 您可以使用它 它是标准库的一
  • 头文件中实现的函数的静态与内联

    我想到的方式inline在 C 中用于链接 作用域 我把它放在同一个篮子里extern and static对于全局对象 通常 对于在头文件中实现的函数 我的首选解决方案是将其设为静态 In Foo h static void foo Do
  • 无法将参数从 `const char *` 转换为 `char *`

    鉴于此代码 void group build int size std string ips Build the LL after receiving the member list from bootstrap head new memb
  • 选择initializer_list迭代器定义

    Why std initializer list
  • QSpinBox 输入 NaN 作为有效值

    我正在尝试扩展 QSpinBox 以能够输入 NaN 或 nan 作为有效值 根据文档 我应该使用 textFromValue valueFromText 和 validate 函数来完成此操作 但我无法让它工作 因为它仍然不允许我输入除数
  • FluentAssertions ShouldNotThrow 无法识别异步方法/Func

    我正在尝试检查异步方法是否抛出具体异常 为此 我使用 MSTEST 和 FluentAssertions 2 0 1 我已经检查过这个关于 Codeplex 的讨论 http fluentassertions codeplex com wo
  • 原子存储抛出错误

    我最近升级到了 C 11 兼容编译器 并且尝试将一些代码从 boost 更新到 c 11 标准 我在使用atomic store转换一些代码时遇到了问题 这是一些简单的测试代码 似乎会引发编译器错误 int main std shared
  • 检查两个函数或成员函数指针的签名是否相等

    我编写了一些代码来检查自由函数的签名是否等于成员函数的签名等 它比较提取的返回类型和函数参数 include
  • ASP.NET MVC 动作过滤器

    有谁知道即使在 CATCH 块中 ActionFilterAttribute 类的 OnResultExecuted 方法是否也会执行 ie CookiesActions public ActionResult Login Usuarios
  • 节点*链表中的下一个

    我是数据结构和算法的新手 我遇到了以下代码 typedef struct node int data node next 谁能告诉我为什么我们要声明节点 next next 不能声明为 int next 吗 因为你希望能够做到n gt ne
  • 从 C# 调用时无法识别 Powershell 命令

    这是这个的延续Question https stackoverflow com questions 66280000 powershell object returns null 66280138 noredirect 1 comment1
  • 如何重用具有稍微不同的 ProcessStartInfo 实例的 Process 实例?

    我有以下开始的代码robocopy https technet microsoft com en us library cc733145 aspx as a Process 我还需要进行数据库查询以确定每次需要复制哪些目录robocopy被
  • 展开 std::reference_wrapper 的成本

    Given include
  • C - 获取外部IP地址

    我需要通过 C C 调用获取我的公共 IP 地址 我知道作为替代方案 我可以从 http whatismyip akamai com 等外部链接获取 我写了一个示例来获取外部IP地址 但我的程序没有返回外部 IP 地址 我正在获取内部 IP
  • 为什么 C++ 标准没有将 sizeof(bool) 定义为 1?

    Size of char signed char and unsigned char由 C 标准本身定义为 1 个字节 我想知道为什么它没有定义sizeof bool also C 03 标准 5 3 3 1 说 sizeof char s
  • 强制函数调用的顺序?

    假设我有一个抽象基类 并且我想要一个必须由派生类实现的纯虚方法 但我想确保派生方法以特定顺序调用函数 我可以做什么来强制执行它 I E base class virtual void doABC 0 virtual void A 0 vir
  • 如何通过API退出Win32应用程序?

    我有一个使用 Win32 API 编写的 C Win32 应用程序 我希望强制它在其中一个函数中退出 有没有类似的东西Exit or Destroy or Abort 类似的东西会终止它吗 哎呀呀呀呀呀呀 不要做任何这些事情 exit 和

随机推荐

  • 您的内容必须有一个 id 属性为“android.R.id.list”的 ListView

    我创建了一个像这样的 xml 文件
  • Python 列表字典中的最小值

    抱歉 问题重新发布了 我应该首先编辑这个问题 为模组标记了新的 抱歉 添麻烦了 由于要求发生变化 不得不重新编写问题 我有一本字典 如下所示 d a 4 2 b 3 4 c 4 3 d 4 3 e 4 f 4 g 4 我想获取与字典 d 中
  • 在另一个视图中更新核心数据实体后,SwiftUI 列表视图未更新

    我有一个存储在核心数据中的课程实体 其变量之一是存储课程是否完成 这些课程列在 SwiftUI 列表中 选择后会转到游戏所在的视图 游戏完成后 complete 变量将更新为 true 应该发生的情况是列表视图显示列出的游戏 并在游戏旁边显
  • 无法在centOS中运行sshpass命令[关闭]

    Closed 这个问题不符合堆栈溢出指南 目前不接受答案 正在研究 CentOS 6 3 最终版 系统 并尝试使用 ssh 另一台机器sshpass实用程序喜欢 sshpass p password ssh user host 但它给了我这
  • 通过 Random.Next() 排序

    In 这个问题其中一个建议是通过 Random Next 对列表进行排序 我认为 也许是错误的 他是在建议这个 public static IEnumerable
  • 如果我使用 Mockito,我还需要 Guice 吗?

    我一直在学习依赖注入 例如 Guice 在我看来 主要驱动因素之一 可测试性 已经被 Mocking 例如 Mockito 很好地涵盖了 依赖注入和模拟框架之间的区别 Ninject vs RhinoMock 或 Moq 是对依赖注入和 M
  • 在JSSOR中,如何通过Javascript访问当前标题?

    我想将 JSSOR 幻灯片中的值传递到 DOM 的其他部分 Markup div class slide img src bilder bild2 jpg div class caption p Caption text p div div
  • 在构建 Android 应用程序之前,将 jar 从其他目录拉到 libs 文件夹

    我有一个 Android 项目 它依赖于外部 jar 文件 即A jar 我已经配置了我的安卓build gradle首先构建构建的项目A jar 然后 Android 构建将继续 jar 构建后 将 jar 从其构建文件夹复制到 andr
  • FireFox 警告“未知伪类或伪元素‘隐藏’”不断运行

    我最近发现 Firefox 中出现一个警告 Warning 未知的伪类或伪元素 隐藏 这是页面http eleven23 net eleven23 beta work web lounge22 php 当到达具有 img hidden 的
  • 为什么反汇编后的数据会变成指令?

    我需要一些帮助来理解此时此刻发生的事情 这段代码 发生 jmp Begin 我只知道 com 文件可以是 64kb 因此您希望将所有内容放在一个段中 如果你想放置变量 你需要jmp 但是当我搜索它时 很多指南只是在评论中说 jmp Begi
  • 在 PHP 中检测移动设备的最简单方法

    判断用户是否使用移动设备使用 PHP 浏览我的网站的最简单方法是什么 我遇到过很多可以使用的类 但我希望有一个简单的 if 条件 我有办法做到这一点吗 这是一个来源 检测移动浏览器 下载 PHP 脚本 Code
  • Fortran 中是否有用于将数组初始化为零的内在函数?

    在 Fortran 中是否有办法将数组 向量 矩阵 甚至标量 设置为零 2003 2008 似乎将 Fortran 提升到了一个非常现代的水平 我想知道它们是否包含了一种简单的方法来将数组值设置为零 而无需执行以下操作 do i 1 X d
  • 显然你可以修改 const 值而无需 UB。或者你可以吗?

    开始编辑 用户 user17732522指出调用UB的缺陷来自事实pop back 使根据使用的参考资料无效vector库文档 当发生这种情况时 不需要 constexpr 求值来检测它 因为它不是 C 核心的一部分 然而 user1773
  • Android 的“onActivityResult”机制不起作用

    我在一个简单的应用程序中遇到了一个大而令人沮丧的问题 我有两个活动 A 和 B 活动 A 通过单击按钮打开活动 B 活动 B 有两个 结束 如果用户单击 B Back 如果刚刚完成 并返回到 A 如果用户单击 B OK 按钮 则活动 B 完
  • 为什么 Python 字符串是不可变的?使用它们的最佳实践

    让Python字符串不可变的设计原因是什么 它如何让编程变得更容易 我习惯了可变字符串 比如 C 中的字符串 如果没有可变字符串 我该如何编程呢 有没有最佳实践 当您收到字符串时 您将确保它保持不变 假设您要构造一个Foo如下带有字符串参数
  • 在 Silverlight 中将 XML 动态绑定到 DataGrid

    我一直在尝试在 Silverlight 特别是 Silverlight 4 但 SL3 中的任何解决方案也可以 中将 XML 通过 XElement 动态绑定到 DataGrid 但一直无法做到这一点 我希望动态地执行此操作 即 没有严格的
  • RewriteRule 无法编译正则表达式

    我以前的服务器工作正常 今天我更换了新服务器 并且 RewriteRule 无法在我的 htaccess 上编译正则表达式 如何修复这条线 RewriteRule category 0 9 category php pid 1 catnam
  • 你可以将 UITabBarController 推入 UINavigationController 中吗

    是否无法将 UITabBarController 推入 UINavigationController 中 我在旧版本的 iOS 中读过 但现在情况仍然如此吗 简而言之 是的 你可以 但仅仅因为你可以 并不意味着你应该这样做 这UITabBa
  • Django 中的导航

    我刚刚在 django 中完成了我的第一个小 web 应用程序 我喜欢它 我即将开始将旧的 PHP 生产站点转换为 django 作为其模板的一部分 有一个导航栏 在 PHP 中 我在模板代码中根据当前 URL 检查每个导航选项的 URL
  • 按时间或运行总和进行缓冲以进行反应式扩展

    我对反应式扩展相当陌生 想要根据时间缓冲流 或者根据不超过阈值的运行总和 每个项目的大小由 lambda 指定 以先发生者为准 就像现有的Buffer按计数或时间 目前我已经编写了自己的实现Buffer方法按预期工作 使用ISchedule