具有自定义计数标准的 RxJava 缓冲区/窗口

2024-05-03

我有一个 Observable,它发出许多对象,我想使用以下方法对这些对象进行分组:window or buffer运营。但是,不是指定count用于确定窗口中应有多少对象的参数我希望能够使用自定义标准。

例如,假设可观察对象正在发出 a 的实例Message像下面这样的类。

class Message(
   val int size: Int
)

我想根据它们的消息实例来缓冲或窗口size变量不仅仅是它们的计数。例如,获取总大小最多为 5000 的消息窗口。

// Something like this
readMessages()
    .buffer({ message -> message.size }, 5000)

是否有捷径可寻?


首先我必须承认,我不是 RxJava 专家。 我只是发现你的问题具有挑战性,并试图找到解决方案。

有一个window()带参数的函数boundaryIndicator。你必须创建一个Publisher/ Flowable如果达到窗口大小,则发出一个项目。

在示例中我创建了一个对象windowManager用作boundaryIndicator。在里面onNext回调我调用windowManager并给它一个打开新窗口的机会。

val windowManager = object {
    lateinit var emitter: FlowableEmitter<Unit>
    var windowSize: Long = 0

    fun createEmitter(emitter: FlowableEmitter<Unit>) {
        this.emitter = emitter
    }

    fun openWindowIfRequired(size: Long) {
        windowSize += size
        if (windowSize > 5) {
            windowSize = 0
            emitter.onNext(Unit)
        }
    }
}

val windowBoundary = Flowable.create<Unit>(windowManager::createEmitter, BackpressureStrategy.ERROR)

Flowable.interval(1, TimeUnit.SECONDS).window(windowBoundary).subscribe {
    it.doOnNext {
        windowManager.openWindowIfRequired(it)
    }.doOnSubscribe {
        println("Open window")
    }.doOnComplete {
        println("Close window")
    }.subscribe {
        println(it)
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

具有自定义计数标准的 RxJava 缓冲区/窗口 的相关文章

随机推荐

  • 通过覆盖或样式设置使 ScrollViewer 的 ScrollBar 始终可见

    我试图使 ScrollViewer 的 ScrollBar 始终可见 这样它不仅在我尝试滚动文本视图时才出现 这样用户就知道还有更多内容可以查看 起初 出于某种原因 我认为我只需要更改需要画笔覆盖的颜色 但实际上 ScrollBar 正在淡
  • 用于通过 Apple 登录的自定义圆形按钮

    我遵循 Apple 的指南来实施 使用 Apple 登录 按钮 苹果在文档中表示 也可以仅使用徽标来创建 使用Apple登录 的自定义按钮 您也可以更改图像的形状以具有圆形按钮 为了能够编辑图像 它还提供插入蒙版 但我不明白我们必须为按钮或
  • 如果我后面不写“as Something”,用“Dim”来声明变量是没有用的吗?

    例如 下面两个代码是否相同 如果我之后不写 作为整数 是否没有必要使用 Dim Sub something MyNumber 10 Worksheets 1 Range A1 MyNumber End Sub and Sub somethi
  • 打开图层地图,经纬度获取地址

    我正在尝试获取带有经度和纬度的地址 城市 邮政编码 街道地址 但我不知道如何获取 我正在使用开放图层 当我单击地图的一部分时 会获取该位置的经度和纬度 有人有解决方案吗 div class map div
  • 检测wifi是否启用(无论是否连接)

    对于 GPS 跟踪应用程序来说 在打开 WIFI 的情况下记录位置信号会导致数据非常不精确或存在间隙 在开始跟踪之前 我已使用可达性查询来检测 wifi 是否可用 问题是 如果进行该查询时 wifi 已启用但未连接到网络 则表明无法通过 w
  • 如何从偏移量获取时区名称?

    我正在使用时刻时区进行时区计算 我都有一些offset来自数据库的数据 例如GMT GMT 1 GMT 2 GMT 3 GMT 4 etc 无论如何 我可以从这些数据中获取时区或时区名称 例如 America Los Angeles 吗 我
  • Capistrano 无法部署到远程服务器

    SOLUTION 解决方案是将以下内容添加到 production rb 的顶部 unshift File expand path lib ENV rvm path Add RVM s lib directory to the load p
  • 使用 Qt 在 xoverlay 之上绘制

    我希望在使用 Xoverlay 渲染的视频流之上绘制一些 UI 我正在使用 gstreamer 播放视频并使用 xoverlay 在 xvimagesink 上渲染它 我的小部件继承自 QGLWidget 我希望使用 QPainter 绘制
  • 封装的闭包与类?

    我是 JS 来自 C etc 的新手 我突然想到闭包似乎是比类更简单 更方便的处理封装的方法 这段代码似乎给出了一种处理封装的简单方法 function addProperty o var value o get function retu
  • 我的小程序需要客户端访问资源的权限的策略文件位置在哪里?

    我发现我必须编写一个策略文件来授予我的小程序权限 但我真的很困惑 我想编写一个小程序 它是一个地图查看器 我需要在运行我的小程序的客户端上保存图像图块以在本地访问图块 以获得安全地查看地图的速度和时间 这对用户有利 因此 小程序需要授予读
  • 提升灵气自定义综合属性(通过语义动作设置结构体属性的特定成员)

    假设我有一个结构体 我想用灵气解析成 它的定义如下 struct data bool export std wstring name data export false 另外 假设该结构已适应融合 如下所示 BOOST FUSION ADA
  • 如何读取从 Access 导入的 SAS 数据集(不符合 SAS 命名约定)

    我已使用 Libname 将 Access DB 导入 SAS 库名称 accdb c mydata base accdb DB 中的所有表现在都在 accdb 库中 但 Access DB 中的表名称与 SAS 数据集命名约定不匹配 我的
  • 将泛型与 Firebase snapshot.getValue() 结合使用的最佳实践

    TL DR 如何正确使用 Firebase DataSnapshot getValue 的泛型类 用例 我想使用 Firebase 为我的所有实体 其中一堆 实现一个通用远程数据源类 当监听数据更改时 我想从 datasnapshot 获取
  • Android Studio 中过时的 Kotlin 运行时警告

    下载并安装最新的 Kotlin 插件后 我有过时的 Kotlin 运行时来自 Android Studio 的警告告诉我 您在 kotlin stdlib 1 1 2 库中的 Kotlin 运行时版本是 1 1 2 而插件版本是1 1 2
  • 从 PHP/Web 应用程序打印多个标签到 Dymo LabelWriter 450 Turbo

    我希望添加使用 Dymo LabelWriter 450 Turbo 打印多个标签的功能 我已经从 Dymo 网站下载了 DYMO Label v 8 SDK dmg 但看不到任何 Javascript Web 相关的 SDK 文件或文档
  • 如何在D3中导入json数据?

    如何在D3中导入json文件 I did d3 json temp json 但是我如何在进一步的代码中访问这个数据集呢 到目前为止我已经尝试过 var data d3 json temp json 但使用 data data 在其余代码中
  • 使用 ffmpeg 从 unix 命令批量将 wav 文件转换为 16 位

    我有一个由许多子文件夹组成的文件夹 每个子文件夹都有其他子文件夹 其中包含 wav 文件 我想像这样转换所有文件 ffmpeg i BmBmGG BmBmBmBm wav acodec pcm s16le ar 44100 BmBmGG B
  • 使用 RSQLite 库时加载 MacPorts SQLite3

    我在 SQLite 一个计算乘积的聚合器 中有一个用户定义的函数 它在 R 之外工作得很好 但是我有时在 Mac 上 如果您想添加您的 SQLite3 则需要 MacPorts 版本的 SQLite3自己的功能 扩展 我可以选择 RSQLi
  • 在 Angular 中将图像 url 转换为 base64

    我正在努力尝试将给定的图像 url 转换为 base64 在我的例子中 我有一个带有图像路径的字符串 var imgUrl assets logoEmpresas empresa logoUrl 我如何直接将给定的图像网址转换为base64
  • 具有自定义计数标准的 RxJava 缓冲区/窗口

    我有一个 Observable 它发出许多对象 我想使用以下方法对这些对象进行分组 window or buffer运营 但是 不是指定count用于确定窗口中应有多少对象的参数我希望能够使用自定义标准 例如 假设可观察对象正在发出 a 的