Rx.NET:按顺序组合可观察量

2023-12-12

我有2个IConnectableObservable其中一个正在重播旧的历史消息,另一个正在发出新的当前值:

HistoricObservable: - 1 - 2 - 3 - 4 - 5 - 6 - 7 - 8 - ...
CurrentObservable:    - - - - - 5 - 6 - 7 - 8 - 9 - 10 - ...

如何将它们合并到单个可观察量中,以便从两个可观察量中获取完整(正确)的序列,并且在开始从 CurrentObservable 发出值后,删除订阅并在 HistoricObservable 订阅上调用 Dispose。

MergedObservable: - 1 - 2 - 3 - 4 - 56 - 7 - 8 - 9 - 10 - ...

我的消息由Guid,因此该解决方案只能使用 Equal 来比较它们,并且不能依赖于每个可观察量发出的顺序之外的任何顺序。

简而言之,我正在寻找填充方法:

public static IObservable<T> MergeObservables<T>(
    IObservable<T> historicObservable,
    IObservable<T> currentObservable)
    where T : IEquatable<T>
{
    throw new NotImplementedException();
}

MergedObservable 应该继续从 HistoricObservable 发出值,而不等待从 CurrentObservable 发出第一个值,如果 CurrentObservable 中的第一个值之前已经发出,那么 MergedObservable 应该跳过 CurrentObservable 中已经发出的任何值,处理对 HistoricObservable 的订阅,然后开始从 CurrentObservable 中获取所有新值。我也不想在 CurrentObservable 发出第一个对象时立即切换,直到到达 HistoricObservable 中的那个点,所以我一直很难尝试使用 TakeWhile/TakeUntil。我在使用 JointLatest 保存状态方面取得了一些小成功,但我认为可能有更好的方法。

测试用例

对于以下测试用例,假设每条消息都由 GUID 表示,如下所示:

A = E021ED8F-F0B7-44A1-B099-9878C6400F34
B = 1139570D-8465-4D7D-982F-E83A183619DE
C = 0AA2422E-19D9-49A7-9E8C-C9333FC46C46
D = F77D0714-2A02-4154-A44C-E593FFC16E3F
E = 14570189-4AAD-4D60-8780-BCDC1D23273D
F = B42983F0-5161-4165-A2F7-074698ECCE77
G = D2506881-F8AB-447F-96FA-896AEAAD1D0A
H = 3063CB7F-CD25-4287-85C3-67C609FA5679
I = 91200C69-CC59-4488-9FBA-AD2D181FD276
J = 2BEA364E-BE86-48FF-941C-4894CEF7A257
K = 67375907-8587-4D77-9C58-3E3254666303
L = C37C2259-C81A-4BC6-BF02-C96A34011479
M = E6F709BE-8910-42AD-A100-2801697496B0
N = 8741D0BB-EDA9-4735-BBAF-CE95629E880D

1) 如果历史可观察量永远赶不上当前可观察量,则合并的可观察量永远不应该从当前可观察量发出任何内容

Historic: - A - B - C - D - E - F - G - H|
Current:    - - - - - - - - - - - - - - - I - J - K - L - M - N|
Merged:   - A - B - C - D - E - F - G - H|

2)一旦历史可观察量达到当前可观察量发出的第一个值,则合并的可观察量应立即发出当前可观察量先前发出的所有值,并与历史可观察量断开连接。

Historic: - A - B - C - D - E - F - G - H - I - J|
Current:  - - - - - - E - F - G - H - I - J|
Merged:   - A - B - C - D - EF-G- H - I - J|

3)解决方案应该能够在历史可观察值之前处理来自当前可观察值的值。

Historic: - - - - - A - B - C - D - E - F - G - H - I - J|
Current:  - - C - D - E - F - G - H - I - J - K - L - M - N|
Merged:   - - - - - A - B - CDEF-G-H- I - J - K - L - M - N|

4)如果当前可观察的值已经被发出,那么解决方案应该跳过它们,直到发出新值。

Historic: - A - B - C - D - E - F - G - H - I - J|
Current:  - - - - - - - - B - C - D - E - F - G - H - I - J|
Merged:   - A - B - C - D - - - - - - E - F - G - H - I - J|

5)对于我的用例,我保证当前可观察量将是历史可观察量的子集,但为了完整性,我想解决方案将继续从历史可观察量中提取,认为第一个元素将在稍后发生

Historic: - - - - - E - F - G - H - I - J - ... - Z - A|
Current:  - - A - B - C - D - E - F - G - H - I - J|
Merged:   - - - - - E - F - G - H - I - J - ... - Z - ABCDEFGHIJ|

6)我还保证历史可观察量一旦同步,就不会与当前可观察量不同,但如果由于某种原因他们这样做,合并的可观察量应该已经与它断开连接,并且不会出现任何差异

Historic: - A - B - C - D - E - D - C - B - A|
Current:  - - - - - - E - F - G - H - I - J|
Merged:   - A - B - C - D - EF-G- H - I - J|

为了帮助创建工作解决方案,这里有一些输入数据:

var historic = new Subject<int>();
var current = new Subject<int>();

// query & subscription goes here

historic.OnNext(1);
historic.OnNext(2);
current.OnNext(5);
historic.OnNext(3);
current.OnNext(6);
historic.OnNext(4);
current.OnNext(7);
historic.OnNext(5);
current.OnNext(8);
historic.OnNext(6);
current.OnNext(9);
historic.OnNext(7);
current.OnNext(10);

正确的解决方案应该产生从 1 到 10 的数字。


假设您想要不同的结果,无论出现的顺序如何,也许这种方法也有效:

       var replayCurrent = current.Replay();
        replayCurrent.Connect();


        var merged = historic
            .Scan(
                new { history = new List<string>(), firstVal = (string)null },
                (state, val) =>
                { state.history.Add(val); return state; }
                )
            .Merge(
                current.Take(1).Select(v => new { history = (List<string>)null, firstVal = v })

                )
            .Scan(new { history = (List<string>)null, firstVal = (string)null },
                (state, val) =>
                new { history = val.history ?? state.history, firstVal = val.firstVal ?? state.firstVal })
            .TakeWhile(v => 
                (null==v.firstVal || ( null!=v.firstVal && !v.history.Contains(v.firstVal)))
                )
            .Select(v=>v.history.Last())
            .Concat(replayCurrent)
            .Distinct();

        merged.Subscribe(x => Console.WriteLine(x));
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Rx.NET:按顺序组合可观察量 的相关文章

  • 如何在 Visual Studio 2010 中增强 XAML 设计器?

    当我使用 XAML 设计器时 进入设计器和退出设计器是如此困难和缓慢 当我这样做时 Visual Studio 卡了一段时间 有什么方法可以增强 XAML 设计器和编辑器吗 Ant 保存 XAML 文件时非常慢 这通常意味着您可能有复杂的
  • 如何为 C 分配的 numpy 数组注册析构函数?

    我想在 C C 中为 numpy 数组分配数字 并将它们作为 numpy 数组传递给 python 我可以做的PyArray SimpleNewFromData http docs scipy org doc numpy reference
  • 如何修复此错误“GDI+ 中发生一般错误”?

    从默认名称打开图像并以默认名称保存 覆盖它 我需要从 Image Default jpg 制作图形 将其放在 picturebox1 image 上并在 picurebox1 上绘制一些图形 它有效 这不是我的问题 但我无法保存 pictu
  • 互斥体实现可以互换(独立于线程实现)

    所有互斥体实现最终都会调用相同的基本系统 硬件调用吗 这意味着它们可以互换吗 具体来说 如果我使用 gnu parallel算法 使用openmp 并且我想让他们称之为线程安全的类我可以使用boost mutex用于锁定 或者我必须编写自己
  • 将 System.Windows.Input.KeyEventArgs 键转换为 char

    我需要将事件参数作为char 但是当我尝试转换 Key 枚举时 我得到的字母和符号与传入的字母和符号完全不同 如何正确地将密钥转换为字符 这是我尝试过的 ObserveKeyStroke this new ObervableKeyStrok
  • 用于检查项目文件中的项目变量和引用路径的 api

    我正在研究一个 net application VS2010 与 x 没有 解和变量号这些解决方案中的项目数量 我需要检查项目属性 特定于一定数量的项目 是否同质 并且检查 验证构建期间的参考路径 有没有一个API是这样的吗 如果没有 我该
  • C# Dns.GetHostEntry 不返回连接到 WiFi 的移动设备的名称

    我有一个 C 中的 Windows 窗体应用程序 我试图获取列表中所有客户端的主机名 下面给出的是 ra00l 来自此链接的代码示例 GetHostEntry 非常慢 https stackoverflow com questions 99
  • 如何在 C# 中定义文本框数组?

    您好 当我在 Windows 申请表上创建文本框时 我无法将其命名为 box 0 box 1 等 我这样做的目的是因为我想循环使用它们 其实我发现TextBox array firstTextBox secondTextBox 也有效
  • 获取 WPF 控件的所有附加事件处理程序

    我正在开发一个应用程序 在其中动态分配按钮的事件 现在的问题是 我希望获取按钮单击事件的所有事件 因为我希望删除以前的处理程序 我尝试将事件处理程序设置为 null 如下所示 Button Click null 但是我收到了一个无法分配 n
  • ASP.NET:获取自 1970 年 1 月 1 日以来的毫秒数

    我有一个 ASP NET VB NET 日期 我试图获取自 1970 年 1 月 1 日以来的毫秒数 我尝试在 MSDN 中寻找方法 但找不到任何东西 有谁知道如何做到这一点 从 NET 4 6 开始 该方法ToUnixTimeMillis
  • 如何在 Linq 中获得左外连接?

    我的数据库中有两个表 如下所示 顾客 C ID city 1 Dhaka 2 New york 3 London 个人信息 P ID C ID Field value 1 1 First Name Nasir 2 1 Last Name U
  • 使用 JNI 从 Java 代码中检索 String 值的内存泄漏

    我使用 GetStringUTFChars 从使用 JNI 的 java 代码中检索字符串的值 并使用 ReleaseStringUTFChars 释放该字符串 当代码在 JRE 1 4 上运行时 不会出现内存泄漏 但如果相同的代码在 JR
  • 未经许可更改内存值

    我有一个二维数组 当我第一次打印数组的数据时 日期打印正确 但其他时候 array last i 的数据从 i 0 到 last 1 显然是一个逻辑错误 但我不明白原因 因为我复制并粘贴了 for 语句 那么 C 更改数据吗 I use g
  • C++:.bmp 到文件中的字节数组

    是的 我已经解决了与此相关的其他问题 但我发现它们没有太大帮助 他们提供了一些帮助 但我仍然有点困惑 所以这是我需要做的 我们有一个 132x65 的屏幕 我有一个 132x65 的 bmp 我想遍历 bmp 并将其分成小的 1x8 列以获
  • Visual Studio 中的测试单独成功,但一组失败

    当我在 Visual Studio 中单独运行测试时 它们都顺利通过 然而 当我同时运行所有这些时 有些通过 有些失败 我尝试在每个测试方法之间暂停 1 秒 但没有成功 有任何想法吗 在此先感谢您的帮助 你们可能有一些共享数据 检查正在使用
  • HttpWebRequest 在第二次调用时超时

    为什么以下代码在第二次 及后续 运行时超时 代码挂在 using Stream objStream request GetResponse GetResponseStream 然后引发 WebException 表示请求已超时 我已经尝试过
  • 英特尔 Pin 与 C++14

    问题 我有一些关于在 C 14 或其他 C 版本中使用英特尔 Pin 的问题 使用较新版本从较旧的 C 编译代码很少会出现任何问题 但由于 Intel Pin 是操作指令级别的 如果我使用 C 11 或 C 14 编译它 是否会出现任何不良
  • memset 未填充数组

    u32 iterations 5 u32 ecx u32 malloc sizeof u32 iterations memset ecx 0xBAADF00D sizeof u32 iterations printf 8X n ecx 0
  • 使用 GROUP 和 SUM 的 LINQ 查询

    请帮助我了解如何使用带有 GROUP 和 SUM 的 LINQ 进行查询 Query the database IEnumerable
  • 检查Windows控制台中是否按下了键[重复]

    这个问题在这里已经有答案了 可能的重复 C 控制台键盘事件 https stackoverflow com questions 2067893 c console keyboard events 我希望 Windows 控制台程序在按下某个

随机推荐