我需要分割处理序列(就像在这个问题中如何使用 .net RX 组织数据处理器的序列 https://stackoverflow.com/q/13310865/296494)到 Azure 环境中的多个计算单元。
这个想法是将 Observable 序列序列化到 Azure 队列(或服务总线)并将其反序列化回来。
如果生产者或消费者失败,其他方应该能够继续生产/消费。
谁能建议一种优雅的方式来做到这一点以及使用什么(Azure 队列或服务总线)?
有没有人使用过 TCP Observable 提供程序 -http://rxx.codeplex.com/wikipage?title=TCP%20Qbservable%20Provider http://rxx.codeplex.com/wikipage?title=TCP%20Qbservable%20Provider对于此类问题,其中一方失败是否安全?
假设您有一个具有以下 API 的消息队列
class MQ {
public MQ();
// send a single message from your message queue
public void send(string keyPath, string msg);
// Receive a single message from your message queue
public async Task<string> receive(keyPath);
}
为了使这个 RX 兼容
class MQRX: IObserver<string> {
MQ _mq;
string _keyPath
MQRX(string keyPath){
_mq = mq;
_keyPath = keyPath;
}
IObservable<string> Observe(){
return Observable.Defer(()=> mq.receive(keyPath).ToObservable() ).Repeat();
}
void OnNext(string msg){
_mq.send(msg);
}
void OnError(Exception e){
// The message queue might not
// support serializing exceptions
// or it might or you might build
// a protocol for it.
}
}
以容错方式使用它。注意如果出现异常则Retry会重新订阅
由 OnError 向上游抛出
new MQRX("users/1/article/2").
Retry().
Subscribe((msg)=>Console.Writeln(msg));
例如,在写入方面,您可以每两秒发送一条消息并重试
如果出现错误,则订阅生成器。注意不太可能有
Observable.Interval 中的错误只是在每个时间间隔生成一条消息,但是
想象一下从文件或其他消息队列中读取数据。
var mq = new MQRX("users/1/article/2");
Observable.Interval(TimeSpan.FromSeconds(2)).
Select((x)=>x.ToString()).
请注意,您可能应该使用 IObservable Catch 扩展方法而不是
盲目重试,因为您可能会一遍又一遍地遇到相同的错误。
重试()。
订阅(mq);
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)