我有一个文件列表。我想:
- 将所有这些作为单一来源进行阅读。
- 文件应该按顺序、按顺序读取。 (无循环赛)
- 任何时候都不应要求任何文件完全位于内存中。
- 从文件读取错误应该会崩溃流。
感觉这应该可行:(Scala,akka-streams v2.4.7)
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
)
val source = sources.reduce( (a, b) => Source.combine(a, b)(MergePreferred(_)) )
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
但这会导致编译错误,因为FileIO
具有与之相关的物化价值,并且Source.combine
不支持这一点。
将物化值映射出去让我想知道如何处理文件读取错误,但确实可以编译:
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
.mapMaterializedValue(f => NotUsed.getInstance())
)
val source = sources.reduce( (a, b) => Source.combine(a, b)(MergePreferred(_)) )
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
但在运行时抛出 IllegalArgumentException:
java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out]