我正在与我无法控制的 java 库中的数据发布者合作。发布者库使用典型的回调设置;库代码中的某处(该库是java的,但为了简洁起见,我将在scala中进行描述):
type DataType = ???
trait DataConsumer {
def onData(data : DataType) : Unit
}
库的用户需要编写一个类来实现onData
方法并将其传递到DataProducer
,库代码如下所示:
class DataProducer(consumer : DataConsumer) {...}
The DataProducer
有它自己的内部线程我无法控制,以及附带的数据缓冲区,即调用onData
每当有另一个DataType
消费的对象。
所以,我的问题是:如何编写一个层将原始库模式转换/翻译为 akka 流Source http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M5/?_ga=1.227496093.129608303.1419543038#akka.stream.scaladsl.Source object?
先感谢您。
回调 --> 来源
详细阐述 Endre Varga 的答案,下面是将创建DataConsumer
回调函数将消息发送到 akka 流中Source
.
注意:创建一个功能性的 ActorPublish 比我下面指出的要多得多。特别是,需要进行缓冲来处理以下情况:DataProducer
正在打电话onData
比Sink
正在发出需求信号(请参阅此example http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html)。下面的代码只是设置“接线”。
import akka.actor.ActorRef
import akka.actor.Actor.noSender
import akka.stream.Actor.ActorPublisher
/**Incomplete ActorPublisher, see the example link.*/
class SourceActor extends ActorPublisher[DataType] {
def receive : Receive = {
case message : DataType => deliverBuf() //defined in example link
}
}
class ActorConsumer(sourceActor : ActorRef) extends DataConsumer {
override def onData(data : DataType) = sourceActor.tell(data, noSender)
}
//setup the actor that will feed the stream Source
val sourceActorRef = actorFactory actorOf Props[SourceActor]
//setup the Consumer object that will feed the Actor
val actorConsumer = ActorConsumer(sourceActorRef)
//setup the akka stream Source
val source = Source(ActorPublisher[DataType](sourceActorRef))
//setup the incoming data feed from 3rd party library
val dataProducer = DataProducer(actorConsumer)
回调 --> 整个流
最初的问题专门要求对 Source 进行回调,但是如果整个流已经可用(而不仅仅是 Source),则处理回调会更容易处理。这是因为流可以具体化为ActorRef
使用来源#actorRef http://doc.akka.io/api/akka/2.4.11/index.html#akka.stream.scaladsl.Source%24@actorRef%5BT%5D(bufferSize:Int,overflowStrategy:akka.stream.OverflowStrategy):akka.stream.scaladsl.Source%5BT,akka.actor.ActorRef%5D功能。举个例子:
val overflowStrategy = akka.stream.OverflowStrategy.dropHead
val bufferSize = 100
val streamRef =
Source
.actorRef[DataType](bufferSize, overflowStrategy)
.via(someFlow)
.to(someSink)
.run()
val streamConsumer = new DataConsumer {
override def onData(data : DataType) : Unit = streamRef ! data
}
val dataProducer = DataProducer(streamConsumer)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)