有几个人和我一起从事一个项目,一直在试图找出解决这个问题的最佳方法。看起来这应该是经常需要的标准东西,但由于某种原因我们似乎无法得到正确的答案。
如果我有一些工作要做,并且我向路由器抛出一堆消息,我如何知道所有工作何时完成?例如,如果我们正在读取 100 万行文件中的行并将该行发送给 actor 来处理它,并且您需要处理下一个文件,但必须等待第一个文件完成,您如何知道它何时完成做完了?
进一步评论。我知道并且已经将 Await.result() 和 Await.ready() 与 Patters.ask() 一起使用。一个区别是,每条线都会有一个 Future,而我们将有大量这些 future 等待,而不仅仅是一个。此外,我们正在填充一个占用大量内存的大型域模型,并且不希望添加额外的内存来在内存中保存相同数量的 future 等待组合,而使用每个参与者在完成工作后完成的操作而不是持有内存等待来组成。
我们使用的是 Java,而不是 Scala。
伪代码:
for(File file : files) {
...
while((String line = getNextLine(fileStream)) != null) {
router.tell(line, this.getSelf());
}
// we need to wait for this work to finish to do the next
// file because it's dependent on the previous work
}
看起来你经常想做很多工作并知道演员什么时候完成。
我相信我有一个适合你的解决方案,而且不需要积累一大堆Future
s。一是理念层次高。将有两名演员参与此流程。我们首先要调用的是FilesProcessor
。这位演员的寿命很短,但很有威严。每当您想要按顺序处理一堆文件时,您都可以启动该参与者的一个实例,并向其传递一条包含您想要处理的文件的名称(或路径)的消息。当它完成所有文件的处理后,它会自行停止。我们要呼叫的第二个演员LineProcessor
。这个参与者是无状态的、长期存在的并且集中在路由器后面。它处理文件行,然后响应请求该行处理的任何人,告诉他们它已完成该行的处理。现在来看代码。
首先是消息:
public class Messages {
public static class ProcessFiles{
public final List<String> fileNames;
public ProcessFiles(List<String> fileNames){
this.fileNames = fileNames;
}
}
public static class ProcessLine{
public final String line;
public ProcessLine(String line){
this.line = line;
}
}
public static class LineProcessed{}
public static LineProcessed LINE_PROCESSED = new LineProcessed();
}
And the FilesProcessor
:
public class FilesProcessor extends UntypedActor{
private List<String> files;
private int awaitingCount;
private ActorRef router;
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof ProcessFiles){
ProcessFiles pf = (ProcessFiles)msg;
router = ... //lookup router;
files = pf.fileNames;
processNextFile();
}
else if (msg instanceof LineProcessed){
awaitingCount--;
if (awaitingCount <= 0){
processNextFile();
}
}
}
private void processNextFile(){
if (files.isEmpty()) getContext().stop(getSelf());
else{
String file = files.remove(0);
BufferedReader in = openFile(file);
String input = null;
awaitingCount = 0;
try{
while((input = in.readLine()) != null){
router.tell(new Messages.ProcessLine(input), getSelf());
awaitingCount++;
}
}
catch(IOException e){
e.printStackTrace();
getContext().stop(getSelf());
}
}
}
private BufferedReader openFile(String name){
//do whetever to load file
...
}
}
And the LineProcessor
:
public class LineProcessor extends UntypedActor{
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof ProcessLine){
ProcessLine pl = (ProcessLine)msg;
//Do whatever line processing...
getSender().tell(Messages.LINE_PROCESSED, getSelf());
}
}
}
现在,线路处理器正在发送回没有附加内容的响应。如果您需要根据线路的处理发回某些内容,您当然可以更改此设置。我确信这段代码不是防弹的,我只是想向您展示一个高级概念,说明如何在没有请求/响应语义的情况下完成此流程,并且Future
s.
如果您对此方法有任何疑问或需要更多详细信息,请告诉我,我很乐意提供。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)