Akka.Net Streams 和远程处理 (Sink.ActorRefWithAck)

2023-12-31

我使用 Akka.net Streams 做了一个非常简单的实现Sink.ActorRefWithAck:订阅者向发布者请求一个大字符串,发布者通过切片发送它。 它在本地(UT)工作得很好,但是不远程。我不明白出了什么问题?具体来说:订阅者能够将请求发送到发布者,发布者以OnInit消息,但随后OnInit.Ack永远不会回到出版商那里。这Ack消息最终作为死信:

INFO  Akka.Actor.EmptyLocalActorRef - Message Ack from akka.tcp://OutOfProcessTaskProcessing@localhost:12100/user/Streamer_636568240846733287 to akka://OutOfProcessTaskProcessing/user/StreamSupervisor-0/StageActorRef-0 was not delivered. 1 dead letters encountered.

请注意,日志来自目标参与者,因此消息会在正确的进程中处理。没有明显的路径错误。

看看不处理此消息的发布者代码,我真的不知道我做错了什么:

    public static void ReplyWithStreamedString(IUntypedActorContext context, string toStream, int chunkSize = 2000)
    {
        Source<string, NotUsed> source = Source.From(toStream.SplitBy(chunkSize));
        source.To(Sink.ActorRefWithAck<string>(context.Sender, new StreamMessage.OnInit(),
                new StreamMessage.OnInit.Ack(),
                new StreamMessage.Completed(),
                exception => new StreamMessage.Failure(exception.Message)))
            .Run(context.System.Materializer());
    }

这是订阅者代码:

public static Task<string> AskStreamedString(this ICanTell self, object message, ActorSystem context, TimeSpan? timeout = null)
    {
        var tcs = new TaskCompletionSource<string>();
        if (timeout.HasValue)
        {
            CancellationTokenSource ct = new CancellationTokenSource(timeout.Value);
            ct.Token.Register(() => tcs.TrySetCanceled());
        }

        var props = Props.Create(() => new StreamerActorRef(tcs));
        var tempActor = context.ActorOf(props, $"Streamer_{DateTime.Now.Ticks}");

        self.Tell(message, tempActor);

        return tcs.Task.ContinueWith(task =>
        {
            context.Stop(tempActor);
            if(task.IsCanceled)
                throw new OperationCanceledException();
            if (task.IsFaulted)                    
                throw task.Exception.GetBaseException();
            return task.Result;
        });
    }

    internal class StreamerActorRef : ReceiveActor
    {
        readonly TaskCompletionSource<string> _tcs;

        private readonly StringBuilder _stringBuilder = new StringBuilder();

        public StreamerActorRef(TaskCompletionSource<string> tcs)
        {
            _tcs = tcs;
            Ready();
        }

        private void Ready()
        {
            ReceiveAny(message =>
            {
                switch (message)
                {
                    case StreamMessage.OnInit _:
                        Sender.Tell(new StreamMessage.OnInit.Ack());
                        break;
                    case StreamMessage.Completed _:
                        string result = _stringBuilder.ToString();
                        _tcs.TrySetResult(result);
                        break;
                    case string slice:
                        _stringBuilder.Append(slice);
                        Sender.Tell(new StreamMessage.OnInit.Ack());
                        break;
                    case StreamMessage.Failure error:
                        _tcs.TrySetException(new InvalidOperationException(error.Reason));
                        break;
                }
            });
        }
    }

带有消息:

public class StreamMessage
{
        public class OnInit
        {
            public class Ack{}
        }

        public class Completed { }

        public class Failure
        {
            public string Reason { get; }

            public Failure(string reason)
            {
                Reason = reason;
            }
        }
    }

一般来说,与 actor ref 一起使用的源和接收器并未设计为通过远程连接工作 - 它们不涵盖消息重试,如果某些流控制消息未传入,这可能会导致系统死锁。

您正在寻找的功能称为流引用(其工作方式类似于 actor refs,但适用于流),并将作为 v1.4 版本的一部分发布(请参阅github拉取请求 https://github.com/akkadotnet/akka.net/pull/3321更多细节)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Akka.Net Streams 和远程处理 (Sink.ActorRefWithAck) 的相关文章

  • akka.net 有没有办法获取或创建 actor

    对于我的 Actor 层次结构 在通过几个 Actor 处理数据之前 我不知道所需的所有 Actor 因此我正在寻找一种方法来返回现有 ActorRef 或创建新操作 这就是我希望下面的代码要么创建一个演员 如果 my id 1 不存在 要
  • 使用 Play 2.6 和 akka 流的 Websocket 代理

    我正在尝试使用 Play 和 akka 流为 Websocket 连接创建一个简单的代理 交通流向是这样的 Client request gt gt request Server Proxy Client response lt lt re
  • 如何动态地将源添加到现有图表?

    什么可以替代动态改变运行图 这是我的情况 我有将文章摄取到数据库中的图表 文章来自 3 个不同格式的插件 因此我有几个流程 val converterFlow1 Flow ImpArticle Article NotUsed val con
  • 如何响应演员调用的结果?

    我们正在考虑使用 Akka HTTP Java API 使用路由 DSL 不清楚如何使用路由功能来响应 HttpRequest 使用无类型 Akka Actor 例如 在匹配 Route 路径后 我们如何将请求传递给 处理程序 ActorR
  • 在Akka.NET中,(使用Akka.Cluster)如何配置多个种子节点(Lighthouse)相互了解?

    如果我想让两个 Lighthouse 种子节点在不同的 PC 上运行 每个节点都应该了解另一个节点 我应该如何配置它们 尽管我发现提到使用多个 Lighthouse 实例 并看到使用多个种子节点的非种子节点配置 但我无法在了解一个的单独 P
  • 将回调方法实现转换为 akka 流源

    我正在与我无法控制的 java 库中的数据发布者合作 发布者库使用典型的回调设置 库代码中的某处 该库是java的 但为了简洁起见 我将在scala中进行描述 type DataType trait DataConsumer def onD
  • akka-http中使用源队列的连接池实现线程安全吗?

    参考下面提到的实现 http doc akka io docs akka http 10 0 5 scala http client side host level html http doc akka io docs akka http
  • Actor 内的异步 API 调用和异常

    我知道关于PipeTo https stackoverflow com a 25225274 1180426 but 有些东西 比如嵌套延续上的同步等待 似乎违背了异步和等待的方式 https github com petabridge a
  • 如何在断开连接后清理akka-http websocket资源然后重试?

    下面的代码成功建立了 websocket 连接 websockets 服务器 也称为 akk http 故意使用以下命令关闭连接安德鲁的建议答案在这里 https stackoverflow com a 40962834 2074859 T
  • Akka Stream + Akka Http - 获取错误请求

    我有以下效果很好的流 source map x gt HttpRequest uri x rawRequest via Http outgoingConnection host port to Sink actorRef myActor I
  • akka-stream + akka-http 生命周期

    TLDR 当我有一个传出的 http 请求作为流的一部分时 每个请求实现一个流 即使用短期流 还是跨请求使用单个流实现更好 详细信息 我有一个典型的服务 它接受 HTTP 请求 将其分散到多个第三方下游服务 不受我控制 并在将结果发送回之前
  • Akka.Net 发送巨大消息(最大帧大小)

    我有一个关于增加最大帧大小和发送 接收缓冲区大小值的问题 他们的高度有限制吗 我将大量数据传递到系统中 比如 20mb 然后用于计算一些结果并返回 将上述参数设置为 100mb 会导致消息被丢弃 在这种情况发生之前我可以通过的最大块大约是
  • 如何让在一个进程上运行的参与者向在单独进程上运行的另一个参与者发送消息?

    我想让运行在不同进程 或节点 上的参与者向运行在不同进程 或节点 上的其他参与者发送消息 同时保持容错和负载平衡 我目前正在尝试使用 Akka Cluster 的分片功能来完成此任务 但是 我不确定如何实现这一点 我有以下反映我的种子节点的
  • groupBy 的子流可以依赖于它们生成的键吗?

    我有一个包含与用户关联的数据的流程 我还为每个用户提供了一个状态 我可以从数据库异步获取该状态 我想将我的流与每个用户一个子流分开 并在具体化子流时加载每个用户的状态 以便可以根据该状态来处理子流的元素 如果我不想合并下游的子流 我可以做一
  • 为什么 Akka Streams 会吞掉我的异常?

    为什么异常在 import akka actor ActorSystem import akka stream ActorMaterializer import akka stream scaladsl Source object Test
  • 如何通过 Scala 中的 Play Framework 2.5 流式传输压缩文件(即时)?

    我想流式传输一些文件并即时压缩它们 以便用户可以将多个文件下载到一个压缩文件中 而无需向本地磁盘写入任何内容 但是 我当前的实现将所有内容保存在内存中 并且不适用于大文件 有什么办法可以解决吗 我正在研究这个实现 https gist gi
  • 监控封闭图 Akka Stream

    如果我创建了一个RunningGraph在 Akka Stream 中 我怎么知道 从外部 当所有节点因完成而被取消时 当所有节点因错误而停止时 我认为没有办法对任意图执行此操作 但是如果您可以控制图 则只需将监视接收器附加到每个可能失败或
  • Akka Stream 中的 Via/ViaMat/to/toMat

    有人能清楚地解释一下这4种方法有什么区别吗 什么时候使用每一种更合适 一般来说 这组方法的名称是什么 还有更多方法可以完成相同的工作吗 scaladoc 的链接也可能有帮助 D 所有这些方法都是将两个流合并为一个流所必需的 例如 您可以创建
  • akka 远程处理中出现“最大允许大小 128000 字节,编码类 scala 的实际大小”错误

    我想使用 Akka Remoting 在参与者之间通过网络交换消息 但是对于大型字符串消息 我收到以下错误 akka remote OversizedPayloadException Discarding oversized payload
  • Akka-Streams 收集数据(Source -> Flow -> Flow (collect) -> Sink)

    我对 Scala 和 Akka 完全陌生 我有一个简单的 RunnableFlow Source gt Flow do some transformation gt Sink runForeach 现在我想要这样的东西 Source gt

随机推荐

  • 在 VBA 上计算 SHA512 (Excel 2003)

    我正在尝试在 VBA Excel 2003 上计算字符串的哈希值 但是当我调用ComputeHash 它给了我一个Invalid argument procedure call error DLL 参考 mscorlib v4 0 Syst
  • 如何在 Spring Boot 中从 application.properties 转换为 application.yml ?

    我有一个非常简单的 Spring Boot 应用程序 并且有 application properties 现在我想转移到 application yml 这是 application yml 文件 spring datasource ur
  • 创建自定义“自动增量”复合主键?

    我有一组父子表 一对多关系 我正在建表 对 PK 和自动增量的使用有一些疑问 父表有一个自动编号 PK 用于存储销售单标头 这里的一张记录是指票上的记录 子表用于存储票证详细信息 这里的一条记录是票证中的一项 例如可乐 火星棒等 我知道子表
  • 无法生成应用程序

    我有 rvm passenger ruby 1 9 3 nginx 但我现在收到此错误 无法生成应用程序 path to my app 无法从生成服务器读取 连接由对等方重置 104 I have passenger root设置为输出pa
  • 连接3个表以显示某些数据 PHP-MSSQL

    所以我有这个表 我想获取某些数据供用户查看并能够发布到其他页面 我无法发布图片 所以我必须将其分解 所以请耐心等待 第一桌 dbo 用户 pkey 用户ID 员工姓名 第二张桌子 dbo PC pkey PCID PC 号码 第三张桌子 d
  • 是否可以基于CSS创建一个带有渐变边框和透明内部的圆形?

    我正在尝试创建一个用 CSS 画圈有一个渐变边框但也是一个透明内胆这样它看起来像这样 如果内部不透明 有一些解决方案可以创建渐变边框 我的下面的代码片段就是基于这些解决方案 但它们原则上是通过在渐变上覆盖单色 div 来工作的 gt gt
  • 类 x 不存在默认构造函数(继承)C++

    我有以下三个标题 IBaseStates h class IBaseStates public enum STATE virtual void Update STATE state 0 玩家状态 h pragma once include
  • 更新 opencv 库后仍存在 Libpng 漏洞问题

    我在我的应用程序中使用以下依赖项 dependencies compile fileTree include jar dir libs compile project zxing 2 3 0 compile project ColorPic
  • 实施 MySQL NDB Cluster 有哪些限制?

    我想为 MySQL Cluster 6 实现 NDB Cluster 我想为至少有 200 万条记录的非常庞大的数据结构执行此操作 我想知道实施 NDB cluster 是否有任何限制 例如 RAM 大小 数据库数量或 NDB 集群的数据库
  • ASP.NET MVC 会话使用

    目前 我在 ASP NET MVC 应用程序中使用 ViewData 或 TempData 进行对象持久化 然而 在某些情况下 我通过基本控制器类将对象存储到 ViewData 中 每次请求时我都会访问数据库 当 ViewData what
  • 在 iPhone 上使用 html5 视频事件,如何区分“完成”按钮单击和简单暂停?

    我有一个适用于 iPhone 的网页 它使用 HTML5 视频标签 在 iPhone 上 此类嵌入视频在本机播放器中播放 我想查看视频何时结束and当用户使用 完成 按钮关闭视频时 最初 我尝试了这个 var video someVideo
  • Python+Flask:如何从带有换行符的请求中获取文本?

    我不确定它是如何工作的 但它在我的一个项目中有效 但在新项目中不起作用 显然 我错过了一些东西 我想要一个只有一个 POST 的简单 REST 服务器 它应该从 POST 请求获取文本 文本包含换行符 这是我的text txt hello
  • 如何使用 RadioSelect 渲染 Django 表单而不默认选中单选按钮?

    在 Django 1 2 1 上 我使用 ModelForm 并生成带有单选按钮的表单 class myModelForm ModelForm class Meta model myModel widgets choose RadioSel
  • 如何将 JNI 与 AAR 库一起使用?

    我正在创建一个 Android 库 aar 文件 并且需要使用 JNI 我非常清楚 Google 在可能的情况下不鼓励使用 JNI NDK 但在这种情况下 这是不可能的 我从一个独立的 hello jni 示例应用程序开始 首先学习 JNI
  • R 合并 data.frames asof join

    我有一大堆时间间隔不规则的 data frames 我想创建一个新的 data frame 并将其他 data frame 加入其中 对于加入的每个 data frame 从新的 data frame 中选择最新的值 例如 下面的 list
  • 使浮动 div 保持在同一行

    如何将两个元素保留在同一行并固定右列 我希望右侧 div 具有固定大小 左侧列流体 但是当在左侧插入长文本时 右侧 div 会转到下一列 例子 http jsfiddle net Jbbxk 2 http jsfiddle net Jbbx
  • 在排序更改事件期间更新排序顺序 - jQuery UI

    我希望列表元素的值是排序事件期间排序位置的索引 该值应在排序更改事件期间自动更新
  • 向 Scala 案例类添加字段?

    我看过一些关于Pimp my Library pattern 这些似乎对于向类添加行为效果很好 但如果我有一个case class我想要add data members到它 作为一个案例类 我无法扩展它 从案例类继承已被弃用 强烈反对 这些
  • 如何在 Swift 中实现与 Strongify 相同的行为?

    我有一个 self 较弱的闭包 我需要确保 self 在闭包执行期间被保留 并在执行完成后由闭包释放 之前它是由 strongify 完成的 在 Swift 中还有其他方法可以做到这一点吗 虽然其他答案有效 但另一种选择是使用反引号 这样做
  • Akka.Net Streams 和远程处理 (Sink.ActorRefWithAck)

    我使用 Akka net Streams 做了一个非常简单的实现Sink ActorRefWithAck 订阅者向发布者请求一个大字符串 发布者通过切片发送它 它在本地 UT 工作得很好 但是不远程 我不明白出了什么问题 具体来说 订阅者能