将回调方法实现转换为 akka 流源

2023-12-27

我正在与我无法控制的 java 库中的数据发布者合作。发布者库使用典型的回调设置;库代码中的某处(该库是java的,但为了简洁起见,我将在scala中进行描述):

type DataType = ???

trait DataConsumer {
  def onData(data : DataType) : Unit
}

库的用户需要编写一个类来实现onData方法并将其传递到DataProducer,库代码如下所示:

class DataProducer(consumer : DataConsumer) {...}

The DataProducer有它自己的内部线程我无法控制,以及附带的数据缓冲区,即调用onData每当有另一个DataType消费的对象。

所以,我的问题是:如何编写一个层将原始库模式转换/翻译为 akka 流Source http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M5/?_ga=1.227496093.129608303.1419543038#akka.stream.scaladsl.Source object?

先感谢您。


回调 --> 来源

详细阐述 Endre Varga 的答案,下面是将创建DataConsumer回调函数将消息发送到 akka 流中Source.

注意:创建一个功能性的 ActorPublish 比我下面指出的要多得多。特别是,需要进行缓冲来处理以下情况:DataProducer正在打电话onDataSink正在发出需求信号(请参阅此example http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html)。下面的代码只是设置“接线”。

import akka.actor.ActorRef
import akka.actor.Actor.noSender

import akka.stream.Actor.ActorPublisher

/**Incomplete ActorPublisher, see the example link.*/
class SourceActor extends ActorPublisher[DataType] {
  def receive : Receive = {
    case message : DataType => deliverBuf() //defined in example link
  }    
}

class ActorConsumer(sourceActor : ActorRef) extends DataConsumer {
  override def onData(data : DataType) = sourceActor.tell(data, noSender)
}

//setup the actor that will feed the stream Source
val sourceActorRef = actorFactory actorOf Props[SourceActor]

//setup the Consumer object that will feed the Actor
val actorConsumer = ActorConsumer(sourceActorRef)

//setup the akka stream Source
val source = Source(ActorPublisher[DataType](sourceActorRef))

//setup the incoming data feed from 3rd party library
val dataProducer  = DataProducer(actorConsumer)

回调 --> 整个流

最初的问题专门要求对 Source 进行回调,但是如果整个流已经可用(而不仅仅是 Source),则处理回调会更容易处理。这是因为流可以具体化为ActorRef使用来源#actorRef http://doc.akka.io/api/akka/2.4.11/index.html#akka.stream.scaladsl.Source%24@actorRef%5BT%5D(bufferSize:Int,overflowStrategy:akka.stream.OverflowStrategy):akka.stream.scaladsl.Source%5BT,akka.actor.ActorRef%5D功能。举个例子:

val overflowStrategy = akka.stream.OverflowStrategy.dropHead

val bufferSize = 100

val streamRef = 
  Source
    .actorRef[DataType](bufferSize, overflowStrategy)
    .via(someFlow)
    .to(someSink)
    .run()

val streamConsumer = new DataConsumer {
  override def onData(data : DataType) : Unit = streamRef ! data
} 

val dataProducer = DataProducer(streamConsumer)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将回调方法实现转换为 akka 流源 的相关文章

随机推荐

  • 如何在java中使用AWS Textract检索pdf中存在的表

    我发现下面的文章是用 python 做的 https docs aws amazon com texttract latest dg examples export table csv html https docs aws amazon
  • 如何调试 WSO2 ESB 代码

    您通常如何调试 ESB 代码 我正在使用日志语句 但它们在系统日志中丢失了 有没有更好的机制可以使用 Thanks 您可以使用 ESB 代码从 IDE 进行远程调试 首先使用调试选项启动 ESB 服务器脚本 如下所示 wso2server
  • PHP:不区分大小写的参数

    我如何接受已通过GET or POST值不区分大小写 Like sample php OrderBy asc仍然会是一样的sample php orderby asc or sample php ORDERBY asc 有没有一种方法可以有
  • 在 Java 中构建分隔项字符串的最佳方法是什么?

    在使用 Java 应用程序时 我最近需要组装一个以逗号分隔的值列表以传递给另一个 Web 服务 而无需提前知道其中有多少元素 我能想到的最好的办法是这样的 public String appendWithDelimiter String o
  • Model.find Mongoose 6.012 始终返回所有文档,即使有过滤器

    我的架构示例 const XXXSchema new mongoose Schema name String 我使用猫鼬有一段时间了 最 近我开始遇到这些问题 以下查询按预期工作 await MyModel find id ObjectId
  • 是否有内置支持在 Azure 容器实例上启用 SSL?

    是否有内置支持在 Azure 容器实例上启用 SSL 如果没有 我们可以连接到像 Lets Encrypt 这样的 SSL 提供商吗 今天没有任何内置的东西 您需要将证书加载到容器中并在那里终止 SSL 很快 我们将启用对 ACI 容器加入
  • 使用 keyup 捕获 TAB 按键

    我需要实现 2 个目标 但我一次只实现一个目标 而不是同时实现两个目标 首先 我有一个输入字段 当按下按键时应该触发一个事件 并且我需要捕获字段值 我使用字母 数字和 TAB 键 因此 如果我使用 keyup 它会在第一个字符处触发 如果我
  • 打开 jquery.min.js 后 Eclipse 变得非常慢

    正如标题所说 每当我打开一个长压缩版本的任何 javascript 库 如 jquery 或 Foundation min js 时 我的 Eclipse 安装都会变得非常非常缓慢 有时重新启动后 事情会再次正常 直到我打开这些文件 有什么
  • tsc 编译使用文件扩展名导入的打字稿

    我以独立于生态系统的方式编写打字稿代码 我决定在导入中包含文件扩展名 以匹配 Web 和 Deno import xyz from foo ts 我怎样才能获得打字稿编译器 tsc 编译这些文件不会出现以下错误 error TS2691 A
  • 为什么 npm install 仅在 ElasticBeanstalk 中失败?

    我有一个 Nest js Node js 应用程序 我想将其部署在 ElasticBeanstalk Node 16 版本 AL2 5 5 0 上 我的部署一直失败 我发现错误在eb engine log 2022 03 23 15 11
  • “const int& jj”和“int& const jj”有什么区别?

    我对两者感到困惑 我知道 C 引用本质上是恒定的 一旦设置它们就不能更改为引用其他内容 const int 表示对 const 的引用int 相似地 int 表示对非常量的引用int int const字面意思是 const 引用 对非 c
  • Android:使用媒体播放器播放本地视频

    我正在尝试播放我在项目中保存的视频 我有下载this http camendesign co uk code video for everybody test html 一个 mp4 测试视频 然后在我的项目中创建了一个名为 vid 的文件
  • Cordova - 命令错误代码 1 |命令失败的时间为

    我是科尔多瓦的新手 所以如果我的问题不相关 请原谅我 我的 Windows 7 x64 机器中有一个 cordova 项目 昨天我通过构建我的科尔多瓦应用程序cordova build android release 但我需要添加新插件co
  • 创建直方图 OCaml

    我的任务是创建一个直方图 输出某个元素在列表中出现的次数 Input 2 2 2 3 4 4 1 Output 2 3 2 2 2 1 3 1 4 2 4 1 1 1 Expected output 2 3 3 1 4 2 1 1 My c
  • YARN 中应用程序管理器和应用程序主控之间的区别?

    我了解 MRv1 的工作原理 现在我试图了解 MRv2 YARN 中的应用程序管理器和应用程序主控之间有什么区别 应用程序主控和应用程序管理器这两个术语通常可以互换使用 实际上 Application Master 是请求 启动和监视应用程
  • 使用 moment.js 将日期转换为字符串“MM/dd/yyyy”

    我需要从 jquery datepicker 中获取日期值 将其转换为字符串格式 MM dd yyyy 以便它可以执行正确的 ajax post 当页面加载或更改日期选择器时 将进行 jquery ajax 调用 我有这个代码 var sT
  • 以编程方式在 WSO2 API Manager 中添加自定义处理程序

    我通过使用 WSO2 API Manager 的自动化流程创建和订阅新的 API出版商 https docs wso2 com display AM190 Publisher APIs and Store https docs wso2 c
  • 识别与给定字符串向量匹配的列索引

    我有一个字符串向量 x lt c a b 我有一个多列矩阵 其中包含该字符串向量中的名称 我想获取与其名称匹配的列号 索引 which colnames sample matrix x 当 x 不是向量而是单个元素时 上面的方法有效 有什么
  • 长整数中单个位的索引(在C中)[重复]

    这个问题在这里已经有答案了 我试图找到一个最佳代码来定位长整数 64 位 中的单个位索引 长整数只有一位设置位 使用C语言 目前 我只是将整个事情移动一位 然后检查零 我读过有关查找表的内容 但它不适用于整个 64 位 我考虑过检查每个 8
  • 将回调方法实现转换为 akka 流源

    我正在与我无法控制的 java 库中的数据发布者合作 发布者库使用典型的回调设置 库代码中的某处 该库是java的 但为了简洁起见 我将在scala中进行描述 type DataType trait DataConsumer def onD