知道 akka actor 何时完成

2024-05-05

有几个人和我一起从事一个项目,一直在试图找出解决这个问题的最佳方法。看起来这应该是经常需要的标准东西,但由于某种原因我们似乎无法得到正确的答案。

如果我有一些工作要做,并且我向路由器抛出一堆消息,我如何知道所有工作何时完成?例如,如果我们正在读取 100 万行文件中的行并将该行发送给 actor 来处理它,并且您需要处理下一个文件,但必须等待第一个文件完成,您如何知道它何时完成做完了?

进一步评论。我知道并且已经将 Await.result() 和 Await.ready() 与 Patters.ask() 一起使用。一个区别是,每条线都会有一个 Future,而我们将有大量这些 future 等待,而不仅仅是一个。此外,我们正在填充一个占用大量内存的大型域模型,并且不希望添加额外的内存来在内存中保存相同数量的 future 等待组合,而使用每个参与者在完成工作后完成的操作而不是持有内存等待来组成。

我们使用的是 Java,而不是 Scala。

伪代码:

for(File file : files) {
    ...
    while((String line = getNextLine(fileStream)) != null) {
        router.tell(line, this.getSelf());
    }
    // we need to wait for this work to finish to do the next
    // file because it's dependent on the previous work
}

看起来你经常想做很多工作并知道演员什么时候完成。


我相信我有一个适合你的解决方案,而且不需要积累一大堆Futures。一是理念层次高。将有两名演员参与此流程。我们首先要调用的是FilesProcessor。这位演员的寿命很短,但很有威严。每当您想要按顺序处理一堆文件时,您都可以启动该参与者的一个实例,并向其传递一条包含您想要处理的文件的名称(或路径)的消息。当它完成所有文件的处理后,它会自行停止。我们要呼叫的第二个演员LineProcessor。这个参与者是无状态的、长期存在的并且集中在路由器后面。它处理文件行,然后响应请求该行处理的任何人,告诉他们它已完成该行的处理。现在来看代码。

首先是消息:

public class Messages {

  public static class ProcessFiles{
    public final List<String> fileNames;
    public ProcessFiles(List<String> fileNames){
      this.fileNames = fileNames;
    }
  }

  public static class ProcessLine{
    public final String line;
    public ProcessLine(String line){
      this.line = line;
    }
  }

  public static class LineProcessed{}

  public static LineProcessed LINE_PROCESSED = new LineProcessed();
}

And the FilesProcessor:

public class FilesProcessor extends UntypedActor{
  private List<String> files;
  private int awaitingCount;
  private ActorRef router;

  @Override
  public void onReceive(Object msg) throws Exception {
    if (msg instanceof ProcessFiles){      
      ProcessFiles pf = (ProcessFiles)msg;
      router = ... //lookup router;
      files = pf.fileNames;
      processNextFile();
    }
    else if (msg instanceof LineProcessed){
      awaitingCount--;
      if (awaitingCount <= 0){
        processNextFile();
      }
    }

  }

  private void processNextFile(){
    if (files.isEmpty()) getContext().stop(getSelf());
    else{            
      String file = files.remove(0);
      BufferedReader in = openFile(file);
      String input = null;
      awaitingCount = 0;

      try{
        while((input = in.readLine()) != null){
          router.tell(new Messages.ProcessLine(input), getSelf());
          awaitingCount++;
        }        
      }
      catch(IOException e){
        e.printStackTrace();
        getContext().stop(getSelf());
      }

    }
  }

  private BufferedReader openFile(String name){
    //do whetever to load file 
    ...
  }

}

And the LineProcessor:

public class LineProcessor extends UntypedActor{

  @Override
  public void onReceive(Object msg) throws Exception {
    if (msg instanceof ProcessLine){
      ProcessLine pl = (ProcessLine)msg;

      //Do whatever line processing...

      getSender().tell(Messages.LINE_PROCESSED, getSelf());
    }
  }

}

现在,线路处理器正在发送回没有附加内容的响应。如果您需要根据线路的处理发回某些内容,您当然可以更改此设置。我确信这段代码不是防弹的,我只是想向您展示一个高级概念,说明如何在没有请求/响应语义的情况下完成此流程,并且Futures.

如果您对此方法有任何疑问或需要更多详细信息,请告诉我,我很乐意提供。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

知道 akka actor 何时完成 的相关文章

随机推荐

  • Async/Await 无法按预期使用 Promise.all 和 .map 函数

    我有很多async我正在使用的功能 但遇到了一个奇怪的问题 我的代码正在运行 如下所示 async mainAsyncFunc metadata let files metadata map data gt this anotherAsyn
  • 计算标签云中标签字体大小的公式是什么?

    我有一个标签云 我需要知道如何更改最常用标签的字体大小 我需要设置最小字体大小和最大字体大小 您可以使用线性或对数评估与某个标签相对于最大标签关联的项目数量 将其乘以最小和最大字体大小之间的差值 然后将其添加到最小字体大小 例如 伪代码中的
  • C++“智能指针”模板,自动转换为裸指针,但无法显式删除

    我正在一个非常大的遗留 C 代码库中工作 该代码库将保持匿名 作为遗留代码库 它在各处传递原始指针 但我们正在逐渐尝试使其现代化 因此也有一些智能指针模板 这些智能指针 与 Boost 的scoped ptr 不同 具有到原始指针的隐式转换
  • CSS - a:访问过:悬停?

    如何应用字体color仅指向已经存在的超链接visited并且正在被hover通过鼠标 本质上 我想做的是 a visited hover color red 是的 这是可能的 这是一个例子 a href http www google c
  • SQL CE 天蓝色连接

    我正在使用 azure 发布 asp net 应用程序 当我在本地发布时 它工作正常 但在 Azure 上 与数据库相关的所有内容都没有显示 并收到 由于发生内部服务器错误 无法显示页面 想知道我的连接字符串是否有问题 http webly
  • 如何从 Android 广播接收器显示对话框?

    理想情况下 我不想启动一项活动来执行此操作 当 WiFi 连接丢失时 我的应用程序需要关闭 因为这对我们来说是一个致命错误 我想显示一条错误消息并让用户按 确定 按钮 然后退出应用程序 解决这个问题的最佳方法是什么 Thanks AFAIK
  • 为什么我无法在 ES6 中导出已声明的函数?

    我读过 MDN 的文档 好吧 主要是新模块功能很好 让我困惑的是一些小事情export 现在 让我们看看 when I export function foo x return x x or export const foo x gt re
  • 正则表达式贪心问题 (C#)

    我有一个像 text 和 text 这样的输入字符串 我想用相应的 html 标签替换 wiki 语法 input text and text 理想的输出 h1 text and h1 text 但使用以下代码我得到以下输出 var reg
  • 获取集合时的 ​​Backbone.js 进度条

    我想在用新内容更新应用程序时显示进度条 我想最好的办法是在集合上调用 fetch 时执行此操作 我获取的内容主要是图像 视频海报等 但我只获取链接 而不是 base64 字符串或大的东西 我想做的是在获取图像链接时用进度条覆盖屏幕 渲染视图
  • 在 nohup 中使用别名

    为什么以下不起作用 alias sayHello bin echo Hello world sayHello Hello world nohup sayHello nohup appending output to nohup out no
  • authContext.AcquireTokenSilentAsync 抛出错误

    我参考了this https github com Azure Samples active directory dotnet graphapi web git 项目 该项目具有用于连接并获取有关用户配置文件的信息的代码 在运行该项目时 我
  • Java 中如何将 long 转换为 int?

    Java 中如何将 long 转换为 int 在 Java 8 中更新 Math toIntExact value 原答案 简单的类型转换应该可以做到 long l 100000 int i int l 但请注意 较大的数字 通常大于214
  • 如何更改散点图色调图例手柄

    下面这段代码使用seaborn生成的散点图如下 ax sns scatterplot x Param 1 y Param 2 hue Process style Item data df s 30 legend full 我想去掉圆圈中的颜
  • 签署程序集“<程序集名称>.dll”时出现加密失败 –“提供程序版本错误”

    我从知名提供商处购买了身份验证证书 现在我想对程序集进行强命名 然后对其进行数字签名 这是我到目前为止所做的 通过运行 sn exe p keypair pfx key snk 从 pfx 中提取公钥 选中项目属性签名选项卡上的 对程序集进
  • 如何显示 pg-search 多重搜索结果的摘录

    我已经在 Heroku 上的 Rails 应用程序中设置了 pg search query fast PgSearch multisearch query gt
  • 带有 Angular 8 自定义 webpack 配置的 svg-sprite-loader

    我正在尝试使用自定义 webpack 配置将 svg sprite loader 包 用于创建 svg sprite 与我的 Angular 8 项目一起使用 我正在使用以下自定义配置 const SpriteLoaderPlugin re
  • Prolog 罗马数字(属性语法)

    我正在做一项作业prolog questions tagged prolog扫描数字列表并应返回该列表是否是有效的罗马数字以及数字的十进制值 前任 1 roman N I N 1 true 2 当我运行我认为应该工作的程序时 十进制值总是正
  • 我可以将命名空间对象从可变对象转换为不可变对象吗?

    我从命令行参数收到一个命名空间对象 而且我不想修改它 我可以这样做吗 或者你有什么想法吗 coding utf 8 import argparse def parse args parser argparse ArgumentParser
  • 在Java中如何将数字转换为字母?

    有没有比这更好的方法将数字转换为其字母等效值 private String getCharForNumber int i char alphabet ABCDEFGHIJKLMNOPQRSTUVWXYZ toCharArray if i g
  • 知道 akka actor 何时完成

    有几个人和我一起从事一个项目 一直在试图找出解决这个问题的最佳方法 看起来这应该是经常需要的标准东西 但由于某种原因我们似乎无法得到正确的答案 如果我有一些工作要做 并且我向路由器抛出一堆消息 我如何知道所有工作何时完成 例如 如果我们正在