如何基于 Func 将 IObservable 窗口/缓冲为块

2024-02-13

给定一个类:

class Foo { DateTime Timestamp {get; set;} }

...和IObservable<Foo>,有保证的单调递增 Timestamps,我怎样才能生成一个IObservable<IList<Foo>>根据这些分块到列表中Timestamps?

IE。每个IList<Foo>应该有五秒钟的事件,或者其他什么。我知道我可以使用Buffer with a TimeSpan超载,但我需要从事件本身而不是挂钟中获取时间。 (除非有一种巧妙的方法来提供IScheduler这里使用的是IObservable本身作为源.Now?)

如果我尝试使用Observable.Buffer(this IObservable<Foo> source, IObservable<Foo> bufferBoundaries)像这样重载:

IObservable<Foo> foos = //...;
var pub = foos.Publish();
var windows = pub.Select(x => new DateTime(
    x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks)).DistinctUntilChanged();

pub.Buffer(windows).Subscribe(x => t.Dump()));  // linqpad
pub.Connect();

...然后IList实例包含导致窗口关闭的项目,但我真的希望这个项目进入下一个窗口/缓冲区。

例如。带时间戳[0, 1, 10, 11, 15]你会得到块[[0], [1, 10], [11, 15]]代替[[0, 1], [10, 11], [15]]


这是一个想法。组关键条件是“窗口号”,我使用GroupByUntil。这为您提供了示例中所需的输出(我使用了一个 int 流,就像该示例一样 - 但您可以替换为窗口编号所需的任何内容)。

public class Tests : ReactiveTest
{
    public void Test()
    {
        var scheduler = new TestScheduler();
        var xs = scheduler.CreateHotObservable<int>(
            OnNext(0, 0),
            OnNext(1, 1),
            OnNext(10, 10),
            OnNext(11, 11),
            OnNext(15, 15),
            OnCompleted(16, 0));                  
            
        xs.Publish(ps =>                                // (1)
            ps.GroupByUntil(
                p => p / 5,                             // (2)
                grp => ps.Where(p => p / 5 != grp.Key)) // (3)
            .SelectMany(x => x.ToList()))               // (4)
        .Subscribe(Console.WriteLine);
        
        scheduler.Start();
    }
}

Notes

  1. 我们发布源流是因为我们将订阅多次。
  2. 这是创建组密钥的函数 - 使用它根据您的项目类型生成窗口号。
  3. 这是组终止条件 - 使用它来检查源流中另一个窗口中的项目。请注意,这意味着窗口不会关闭,直到窗口外部的元素到达或源流终止。如果您考虑一下,这是显而易见的 - 您想要的输出需要考虑窗口结束后的下一个元素。请注意,如果您的源与实时有任何关系,您可以将其与Observable.Timer+Select输出术语的空/默认实例以提前终止流。
  4. SelectMany 将组放入列表并展平流。

如果包含 nuget 包 rx-testing,此示例将在 LINQPad 中运行得很好。新建一个测试实例并运行Test() method.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何基于 Func 将 IObservable 窗口/缓冲为块 的相关文章

  • 警告:从指针目标类型中丢弃“const”限定符

    没有const char s意味着 s 是一个指向常量 char 的指针 那么为什么它给我这个警告 我并不是想改变价值观 在第一个函数中警告是return discards const qualifiers from pointer tar
  • 何时对向量进行归一化?

    我正在学习 XNA 并且在几乎所有的教育套件中都可以找到http creators xna com en US http creators xna com en US 我总是看到向量上对 Normalize 的调用 我知道归一化基本上将向量
  • 在 MVC 类上创建主键字段

    我是 MVC 和 C 新手 我只是偶然发现它并发现它很有趣 我遇到了一个不允许我继续的问题 这是我的代码 using System using System Collections Generic using System Linq usi
  • 可选参数“必须是编译时常量”

    我有一个类分为两个部分文件 如下所示 public partial class PersonRepository BaseRepository
  • WPF - 按多列排序时使用自定义比较器

    我有一个 ListView GridView 我想按 2 列排序 因此如果第 1 列中有 2 个以上的项目具有相同的值 它将按第 2 列排序 非常简单 但是在对 A Z 进行排序时 空字符串会出现在顶部 我想把它们移到底部 我制作了一个比较
  • 为什么 LinkedList 通常比 List 慢?

    我开始在我的一些 C 算法中使用一些 LinkedList 而不是列表 希望能够加快速度 然而 我注意到他们只是感觉更慢 像任何优秀的开发人员一样 我认为我应该尽职调查并验证我的感受 所以我决定对一些简单的循环进行基准测试 我认为用一些随机
  • 浏览器收集哪些值作为回发数据?

    当页面被发送回服务器时 浏览器收集每个控件的当前值并将其粘贴到一个字符串中 然后 该回发数据通过 HTTP POST 发送回服务器 Q1 除了控件的 Text 属性和 SelectedIndexchanged 因此除了用户输入数据 之外 控
  • 如何检查 .NET 4.0 中的泛型参数是否是动态的

    我有课ObjectMapper
  • 将语句插入 SQL Server 数据库

    最近几天我试图找到这个错误 但没有成功 我正在尝试在数据库中插入一个新行 一切都很顺利 没有错误 也没有程序崩溃 My INSERT声明如下 INSERT INTO Polozaj Znesek Uporabnik Cas Kupec Po
  • C++ 模板参数类型推断

    我有一个这样的C 模板 template
  • 在 C++ 中处理音频缓冲区时,如何执行从 float -> double -> float 的转换

    我目前正在开发一个应用程序 其中音频样本帧在以下回调中进行处理 void Eav07AudioProcessor processBlock AudioSampleBuffer buffer for int channel 0 channel
  • 使用对象列表构建树

    我有一个带有属性 id 和parent id 的对象列表 我想建造一棵树来连接那些孩子和父母 1 个父对象可以有多个子对象 并且有一个对象将成为所有对象的祖先 实现该功能最快的算法是什么 我使用 C 作为编程语言 但其他语言也可以 像这样的
  • 如何使用 C# 代码使用超链接的 onClick 事件?

    我正在尝试为页面中的超链接添加条件 而不是仅仅使用特定的链接 例如 a href help Tutorial html Tutorial a 我想为不同的用户显示不同的页面 例如 如果用户以管理员身份登录 他们将看到与普通用户不同的链接 我
  • C# - 命名空间内的类型声明

    在命名空间内而不是在类中声明类型的可能用途是什么 For ex namespace Test public delegate void Ispossible 这是有效的并且不会产生任何编译错误 但我无法想象为什么我们会以这种方式声明它而不是
  • 如何使用eclipse构建C++应用程序

    我已经从以下位置下载了 Eclipse Juno for C here http www eclipse org downloads download php file technology epp downloads release ju
  • 停止 TcpListener 的正确方法

    我目前正在使用 TcpListener 来处理传入连接 每个连接都有一个线程用于处理通信 然后关闭该单个连接 代码如下 TcpListener listener new TcpListener IPAddress Any Port Syst
  • 检索 Autofac 容器以解析服务

    在 C WindowForms 应用程序中 我启动一个 OWIN WebApp 它创建另一个类 Erp 的单例实例 public partial class Engine Form const string url http 8080 49
  • 统一;随机物体移动[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我正在制作一款机器人战斗游戏 我希望敌人随机移动 然后有时会向敌人移动 我希望运动包含在其中的代码 else if avoid fal
  • 获取大于某个数字的元素个数

    我正在尝试解决以下问题 数字被插入到容器中 每次插入数字时 我需要知道容器中有多少元素大于或等于当前插入的数字 我相信这两个操作都可以以对数复杂度完成 我的问题 C 库中有标准容器可以解决这个问题吗 我知道std multiset可以在对数
  • C# 中成员访问中的问号是什么意思?

    有人可以向我解释一下以下代码中会员访问中的问号是什么意思吗 它是标准 C 的一部分吗 尝试在 Xamarin Studio 中编译此文件时出现解析错误 this AnalyzerLoadFailed Invoke this new Anal

随机推荐