Akka 定义集合
如果你不介意使用 akka 确定的集合类型,那么你可以使用grouped
函数代替:
//alternative stream formation
val stream = Source(1 to 100).via(Flow[Int].grouped(bufferSize))
.runWith(Sink foreach println)
用户定义的集合
如果您想控制用于缓冲区的集合类型,例如ASeq
or Array
:
type MyCollectionType[X] = Array[X]
def emptyMyCollection[X] : MyCollectionType[X] = Array.empty[X]
然后您可以使用两个流程执行此操作。第一个 Flow 执行scan
构建元素序列:
val bufferSize = 10
def appendToMyCollection[X](coll : MyCollectionType[X], i : X) : MyCollectionType[X] =
(if(coll.size < bufferSize) coll else emptyMyCollection[Int]) :+ i
val buffer : Flow[Int, MyCollectionType[Int], _] =
Flow[Int].scan[MyCollectionType[Int]](emptyMyCollection[Int]){
(coll, i) => appendToMyCollection(coll, i)
}
第二个流程是filter
对于大小合适的序列(即“goldiLocks”):
val goldiLocks : Flow[MyCollectionType[Int], MyCollectionType[Int],_] =
Flow[MyCollectionType[Int]].filter(_.size == bufferSize)
这两个 Flow 可以组合起来生成一个 Stream,它将生成所需的集合类型:
val stream = Source(1 to 100).via(buffer)
.via(goldiLocks)
.runWith(Sink foreach println)