我有2个IConnectableObservable其中一个正在重播旧的历史消息,另一个正在发出新的当前值:
HistoricObservable: - 1 - 2 - 3 - 4 - 5 - 6 - 7 - 8 - ...
CurrentObservable: - - - - - 5 - 6 - 7 - 8 - 9 - 10 - ...
如何将它们合并到单个可观察量中,以便从两个可观察量中获取完整(正确)的序列,并且在开始从 CurrentObservable 发出值后,删除订阅并在 HistoricObservable 订阅上调用 Dispose。
MergedObservable: - 1 - 2 - 3 - 4 - 56 - 7 - 8 - 9 - 10 - ...
我的消息由Guid,因此该解决方案只能使用 Equal 来比较它们,并且不能依赖于每个可观察量发出的顺序之外的任何顺序。
简而言之,我正在寻找填充方法:
public static IObservable<T> MergeObservables<T>(
IObservable<T> historicObservable,
IObservable<T> currentObservable)
where T : IEquatable<T>
{
throw new NotImplementedException();
}
MergedObservable 应该继续从 HistoricObservable 发出值,而不等待从 CurrentObservable 发出第一个值,如果 CurrentObservable 中的第一个值之前已经发出,那么 MergedObservable 应该跳过 CurrentObservable 中已经发出的任何值,处理对 HistoricObservable 的订阅,然后开始从 CurrentObservable 中获取所有新值。我也不想在 CurrentObservable 发出第一个对象时立即切换,直到到达 HistoricObservable 中的那个点,所以我一直很难尝试使用 TakeWhile/TakeUntil。我在使用 JointLatest 保存状态方面取得了一些小成功,但我认为可能有更好的方法。
测试用例
对于以下测试用例,假设每条消息都由 GUID 表示,如下所示:
A = E021ED8F-F0B7-44A1-B099-9878C6400F34
B = 1139570D-8465-4D7D-982F-E83A183619DE
C = 0AA2422E-19D9-49A7-9E8C-C9333FC46C46
D = F77D0714-2A02-4154-A44C-E593FFC16E3F
E = 14570189-4AAD-4D60-8780-BCDC1D23273D
F = B42983F0-5161-4165-A2F7-074698ECCE77
G = D2506881-F8AB-447F-96FA-896AEAAD1D0A
H = 3063CB7F-CD25-4287-85C3-67C609FA5679
I = 91200C69-CC59-4488-9FBA-AD2D181FD276
J = 2BEA364E-BE86-48FF-941C-4894CEF7A257
K = 67375907-8587-4D77-9C58-3E3254666303
L = C37C2259-C81A-4BC6-BF02-C96A34011479
M = E6F709BE-8910-42AD-A100-2801697496B0
N = 8741D0BB-EDA9-4735-BBAF-CE95629E880D
1) 如果历史可观察量永远赶不上当前可观察量,则合并的可观察量永远不应该从当前可观察量发出任何内容
Historic: - A - B - C - D - E - F - G - H|
Current: - - - - - - - - - - - - - - - I - J - K - L - M - N|
Merged: - A - B - C - D - E - F - G - H|
2)一旦历史可观察量达到当前可观察量发出的第一个值,则合并的可观察量应立即发出当前可观察量先前发出的所有值,并与历史可观察量断开连接。
Historic: - A - B - C - D - E - F - G - H - I - J|
Current: - - - - - - E - F - G - H - I - J|
Merged: - A - B - C - D - EF-G- H - I - J|
3)解决方案应该能够在历史可观察值之前处理来自当前可观察值的值。
Historic: - - - - - A - B - C - D - E - F - G - H - I - J|
Current: - - C - D - E - F - G - H - I - J - K - L - M - N|
Merged: - - - - - A - B - CDEF-G-H- I - J - K - L - M - N|
4)如果当前可观察的值已经被发出,那么解决方案应该跳过它们,直到发出新值。
Historic: - A - B - C - D - E - F - G - H - I - J|
Current: - - - - - - - - B - C - D - E - F - G - H - I - J|
Merged: - A - B - C - D - - - - - - E - F - G - H - I - J|
5)对于我的用例,我保证当前可观察量将是历史可观察量的子集,但为了完整性,我想解决方案将继续从历史可观察量中提取,认为第一个元素将在稍后发生
Historic: - - - - - E - F - G - H - I - J - ... - Z - A|
Current: - - A - B - C - D - E - F - G - H - I - J|
Merged: - - - - - E - F - G - H - I - J - ... - Z - ABCDEFGHIJ|
6)我还保证历史可观察量一旦同步,就不会与当前可观察量不同,但如果由于某种原因他们这样做,合并的可观察量应该已经与它断开连接,并且不会出现任何差异
Historic: - A - B - C - D - E - D - C - B - A|
Current: - - - - - - E - F - G - H - I - J|
Merged: - A - B - C - D - EF-G- H - I - J|
为了帮助创建工作解决方案,这里有一些输入数据:
var historic = new Subject<int>();
var current = new Subject<int>();
// query & subscription goes here
historic.OnNext(1);
historic.OnNext(2);
current.OnNext(5);
historic.OnNext(3);
current.OnNext(6);
historic.OnNext(4);
current.OnNext(7);
historic.OnNext(5);
current.OnNext(8);
historic.OnNext(6);
current.OnNext(9);
historic.OnNext(7);
current.OnNext(10);
正确的解决方案应该产生从 1 到 10 的数字。