我正在尝试使用新的 Akka 流,想知道如何使用源队列并将其返回给调用者,而不在我的代码中实现它?
想象一下,我们有一个库可以进行多次异步调用并通过以下方式返回结果Source
。函数看起来像这样
def findArticlesByTitle(text: String): Source[String, SourceQueue[String]] = {
val source = Source.queue[String](100, backpressure)
source.mapMaterializedValue { case queue =>
val url = s"http://.....&term=$text"
httpclient.get(url).map(httpResponseToSprayJson[SearchResponse]).map { v =>
v.idlist.foreach { id =>
queue.offer(id)
}
queue.complete()
}
}
source
}
调用者可能会这样使用它
// There is implicit ActorMaterializer somewhere
val stream = plugin.findArticlesByTitle(title)
val results = stream.runFold(List[String]())((result, article) => article :: result)
当我在里面运行这段代码时mapMaterializedValue
永远不会被执行。
我不明白为什么我无权访问实例SourceQueue
是否应该由调用者决定如何实现源。
我应该如何实施这个?
在您的代码示例中,您返回源而不是返回值source.mapMaterializedValue
(方法调用不会改变 Source 对象)。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)