如何将 Observable 序列化到云端并返回

2024-02-14

我需要分割处理序列(就像在这个问题中如何使用 .net RX 组织数据处理器的序列 https://stackoverflow.com/q/13310865/296494)到 Azure 环境中的多个计算单元。
这个想法是将 Observable 序列序列化到 Azure 队列(或服务总线)并将其反序列化回来。
如果生产者或消费者失败,其他方应该能够继续生产/消费。

谁能建议一种优雅的方式来做到这一点以及使用什么(Azure 队列或服务总线)?
有没有人使用过 TCP Observable 提供程序 -http://rxx.codeplex.com/wikipage?title=TCP%20Qbservable%20Provider http://rxx.codeplex.com/wikipage?title=TCP%20Qbservable%20Provider对于此类问题,其中一方失败是否安全?


假设您有一个具有以下 API 的消息队列

class MQ {

    public MQ();

    // send a single message from your message queue
    public void send(string keyPath, string msg);

    // Receive a single message from your message queue
    public async Task<string> receive(keyPath);

}

为了使这个 RX 兼容

class MQRX: IObserver<string> {
    MQ _mq;
    string _keyPath

    MQRX(string keyPath){
        _mq = mq;
        _keyPath = keyPath;
    }

    IObservable<string> Observe(){
        return Observable.Defer(()=> mq.receive(keyPath).ToObservable() ).Repeat();
    }

    void OnNext(string msg){
        _mq.send(msg);
    }

    void OnError(Exception e){
        // The message queue might not
        // support serializing exceptions
        // or it might or you might build
        // a protocol for it.
    }
}

以容错方式使用它。注意如果出现异常则Retry会重新订阅 由 OnError 向上游抛出

new MQRX("users/1/article/2").
    Retry().
    Subscribe((msg)=>Console.Writeln(msg));

例如,在写入方面,您可以每两秒发送一条消息并重试 如果出现错误,则订阅生成器。注意不太可能有 Observable.Interval 中的错误只是在每个时间间隔生成一条消息,但是 想象一下从文件或其他消息队列中读取数据。

var mq = new MQRX("users/1/article/2");

Observable.Interval(TimeSpan.FromSeconds(2)).
    Select((x)=>x.ToString()).

请注意,您可能应该使用 IObservable Catch 扩展方法而不是 盲目重试,因为您可能会一遍又一遍地遇到相同的错误。 重试()。 订阅(mq);

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将 Observable 序列化到云端并返回 的相关文章

  • 用于 C# XNA 的 Javascript(或类似)游戏脚本

    最近我准备用 XNA C 开发另一个游戏 上次我在 XNA C 中开发游戏时 遇到了必须向游戏中添加地图和可自定义数据的问题 每次我想添加新内容或更改游戏角色的某些值或其他内容时 我都必须重建整个游戏或其他内容 这可能需要相当长的时间 有没
  • 在哪里存储 Java 的 .properties 文件?

    The Java教程 http download oracle com javase tutorial essential environment properties htmlon using Properties 讨论如何使用 Prop
  • 在 GWT 中,在任何主机页标记上添加事件处理程序

    我想为任何标签添加 MouseOver 事件处理程序 举个例子 我想为旧版 HTML 页面中的每个锚点页面添加事件处理程序 继GWT指南 http code google com webtoolkit doc 1 6 DevGuideUse
  • 迭代 pandas 数据框的最快方法?

    如何运行数据框并仅返回满足特定条件的行 必须在之前的行和列上测试此条件 例如 1 2 3 4 1 1 1999 4 2 4 5 1 2 1999 5 2 3 3 1 3 1999 5 2 3 8 1 4 1999 6 4 2 6 1 5 1
  • 如何为 Windows toast 注册协议?

    如何注册 Windows toast 协议 样本中来自https blogs msdn microsoft com tiles and toasts 2015 07 02 adaptive and interactive toast not
  • 闪亮井板宽度

    library shiny library shinydashboard ui lt dashboardPage dashboardHeader dashboardSidebar dashboardBody wellPanel tags d
  • 使用 crypt() 加密

    我目前正在做一个非常安全的登录系统 但我是 crypt 函数的新手 需要一些快速帮助 我在注册过程中使用 crypt 加密密码字符串并将其保存到数据库中 但是 我如何在登录过程中解密密钥 或者我应该怎么做 或者是否可以对提交的密码字符串进行
  • 带重定向标准流的 C# + telnet 进程立即退出

    我正在尝试用 C 做一个 脚本化 telnet 项目 有点类似于Tcl期望 http expect nist gov 我需要为其启动 telnet 进程并重定向 和处理 其 stdin stdout 流 问题是 生成的 telnet 进程在
  • 您可以使用关键字参数而不提供默认值吗?

    我习惯于在 Python 中使用这样的函数 方法定义 def my function arg1 None arg2 default do stuff here 如果我不供应arg1 or arg2 那么默认值None or default
  • Scrapy Spider不存储状态(持久状态)

    您好 有一个基本的蜘蛛 可以运行以获取给定域上的所有链接 我想确保它保持其状态 以便它可以从离开的位置恢复 我已按照给定的网址进行操作http doc scrapy org en latest topics jobs html http d
  • NGinx $proxy_add_x_forwarded_for 和 real_ip_header

    我在 NGinx 下有一个 web 应用程序和另一个前端负载均衡器 如下所示 x x x x IP 地址 客户端 a a a a gt LB b b b b gt NGX c c c c gt WEBAPP d d d d 这是我的 NGi
  • Typescript 函数接口重载

    我有以下代码 interface MySecondInterface a type A interface MyInterface val1 string val2 string MySecondInterface a
  • 实例化 Microsoft.Office.Interop.Excel.Application 对象时出现错误:800700c1

    实例化 Microsoft Office Interop Excel Application 以从 winforms 应用程序生成 Excel 时 出现以下错误 这之前是有效的 但突然间它停止工作了 尽管代码和 Excel 版本没有变化 我
  • 带显示块的SPAN

    和默认有什么区别 div 元素和默认值 span 元素与display block HTML 元素的有效性和语义存在差异 否则它们是相同的 div and span两者都被定义为通用容器 在 HTML 方面没有更深层次的含义 一个默认为块显
  • 使用泛型全面实现特征

    我正在通过实现矩阵数学来练习 Rust 但遇到了一些障碍 我定义了我认为与矩阵相关的特征 trait Matrix
  • Android:如何检测手机设置中的语言已更改

    我如何检测我的手机语言是否已更改 例如 Facebook 应用程序将向我们宣布 please wait we preparing your language i used myString Locale getDefault getDisp
  • 错误:无效使用不完整类型“类 Move”/未定义对 Move::NONE 的引用

    拜托 我不知道为什么这个简单的代码被拒绝 它给了我 2 个编译错误 请帮帮我 I use 代码 块 20 03 我的编译器是GNU GCC 移动 hpp class Move public Move Move int int public
  • Android 和 Java 中绘制椭圆的区别

    在Java中由于某种原因Ellipse2D Double使用参数 height width x y 当我创建一个RectF在Android中参数是 left top right bottom 所以我对适应差异有点困惑 如果在 Java 中创
  • 当ScrollView滚动到底部时加载更多数据

    我有一个带有动态加载内容的滚动视图 有时可能会有很多内容 所以我想在用户滚动到底部时加载更多内容 我搜索了合适的方法 发现了两种 onScrollChanged and getScrollY 但我不知道如何将它用于我的目的 请给我一些建议
  • 如果产品重量超过1000克,如何以公斤为单位显示

    在 Storefront 主题中 我使用下面的代码将格式化重量从 1000g 更改为 1kg add action woocommerce after shop loop item title show weight 10 function

随机推荐

  • 解释 OpenGL ES 背景图像的工作原理

    有人可以解释一下如何在 OpenGL ES 视图上渲染背景图像吗 从设置 OpenGL 环境的基础知识开始 我是 OpenGL 的新手 I m seeing https stackoverflow com questions 3387132
  • 如何使用 Firefox 插件从硬盘读取/写入文件?

    是否可以开发一个可以从硬盘读取 写入文件的 Firefox 插件 我应该使用什么代码 它只是链接 Hypnos 和 ephemient 中提供的代码的复制 和组合 const Cc Ci require chrome create prop
  • 模拟标准输入 - python 3中的多行

    我是 python 新手 一直在使用 python 3 进行学习 我正在使用 python 的单元测试框架来测试我的代码 问题 我需要进行单元测试的函数以以下方式接受输入 def compare a b c input strip spli
  • PHP 中比较的可变位置

    下面两种情况哪个更优化 if var value and if value var 抱歉 如果这与另一个问题重复 但我无法用谷歌搜索出答案 Thanks UPDATE 这称为尤达条件 更多信息here http wiert me 2010
  • 为什么 DRF 可浏览 API 对每个实际请求的多个请求类型运行权限检查?

    我有一个简单的 DRF 列表视图 想要编写一些与以下内容相关的权限POST要求 这导致了错误GET发出了请求 这让我意识到我的权限类在未提交的请求上被多次调用 这是我的文件 权限 py class IsDummy permissions B
  • django 形成 MultipleChoiceField 在保存时恢复为原始值

    我编写了一个自定义 MultipleChoiceField 我一切正常 但是当我提交表单时 即使表单验证正常 所选值也会返回原始选择 我的代码看起来像这样 class ProgrammeField forms MultipleChoiceF
  • 连接池到底是什么?

    我听说过连接池这个术语 并通过谷歌搜索寻找一些参考资料 但不知道何时使用它 我什么时候应该考虑使用 连接池 有什么优点和 连接池的缺点 任何建议 这个想法是 您不会打开和关闭与数据库的单个连接 而是创建一个打开连接的 池 然后重用它们 一旦
  • Cocoa App webview未加载请求

    我已经使用 webview 加载 url 但它没有加载 我已经尝试使用 wkwebview 进行相同的操作 但无法加载网址 我已经做了以下 导入WebKit Info plist 允许任意负载 是 允许任意加载网页内容 是 LOG dnss
  • 使用智能指针实现容器

    好的 所以每个人都知道应该像瘟疫一样避免原始指针并更喜欢智能指针 但是这个建议在实现容器时适用吗 这就是我想要实现的目标 template
  • 在 Msys 上安装 Pip

    我使用 Python 3 5 2 和 Msys 构建了一个简单的 PyGTK 应用程序 但我需要一些默认安装中没有的模块 尽管我可以使用setup py install为了得到它们我宁愿使用pip 我环顾四周发现this https sou
  • 使用 lambda 表达式的嵌套集合创建对象图

    我对利用 lambda 表达式创建属性选择器树感兴趣 使用场景是 我们有一些代码对对象图进行一些递归反射 为了限制递归的范围 我们目前使用 Attributes 来标记应该遍历哪些属性 即获取对象的所有修饰属性 如果该属性是具有修饰属性的引
  • Java接口实现对象?

    是否有 Java 接口隐式实现 java lang Object 当我做这样的事情时出现了这个问题 public static String sizeSort String sa Comparator
  • 在 bash_profile 中设置路径

    为什么设置一个PATH要求 PATH 在最后 PATH Library Frameworks Python framework Versions 2 7 bin PATH 当我附加一条路径时我会这样做 PATH PATH 我如何附加一个PA
  • pylab/networkx;更新后不显示节点标签

    将 matplotlib 更新到当前版本后 我遇到了 networkX 中节点标签的问题 如果我使用nx draw G 命令 我得到一个图表 但没有显示标签 但我们还是举个例子吧 import networkx as nx import m
  • 计算已用时间的 Bash 脚本

    我正在 bash 中编写一个脚本来计算执行命令所用的时间 请考虑 STARTTIME date s command block that takes time to complete ENDTIME date s echo It takes
  • 如何在 p:calendar 中使用 java.time.ZonedDateTime / LocalDateTime

    我一直在 Java EE 应用程序中使用 Joda Time 进行日期时间操作 其中关联客户端提交的日期时间字符串表示形式在将其提交到数据库之前已使用以下转换例程进行转换 即在getAsObject JSF 转换器中的方法 org joda
  • Xampp MySQL 未启动 - “MYSQL 未在 XAMPP 3.2.1 版本上启动...”

    我在我的笔记本电脑上安装了 xampp 版本 3 2 1 之前 mysql 工作正常 但突然 mysql 停止工作 而 apache 和其他人正在工作 当我单击启动 mysql 时 它显示此错误 我使用 Windows 10 8 52 32
  • 我在尝试发送消息时收到错误

    send setOnClickListener new OnClickListener Override public void onClick View v TODO Auto generated method stub URI uri
  • KendoUI 网格默认值与数据注释

    我将 Kendo UI Grid 与 ASP NET MVC Helpers 和自动生成的列一起使用 I have DefaultValue 60 60 我的视图模型中存在注释 但 Kendo 助手似乎并不尊重这一点 我可以指定默认值 可能
  • 如何将 Observable 序列化到云端并返回

    我需要分割处理序列 就像在这个问题中如何使用 net RX 组织数据处理器的序列 https stackoverflow com q 13310865 296494 到 Azure 环境中的多个计算单元 这个想法是将 Observable