基于内容的 RxJava Observable 缓冲区

2024-04-16

我使用 vertX 和 RxJava 启动了一个项目,但遇到了一个问题,但没有找到解决方案。

我有一个 Observable,它为传入通信发出 WebSocketFrame, 每个 WebSocketFrame 由有效负载(ByteBuffer)和指示它是消息的第一帧还是最后一帧的标志组成。

我想对此 Observable 进行操作,将其转换为发出 ByteBufferd 的 Observable,其中包含每条消息的所有帧。

我尝试过buffer方法,但它似乎被设计为通过任意标准(时间或其他可观察的)重新组合项目。

另一种方法似乎使用compose订阅 WebSocketFrame observable,在非结束帧上添加到缓冲区,并在结束帧上“馈送”ByteBuffer Observable。但我不知道如何手动创建和提供缓冲区。

因此,如果有人已经看到这个问题(恕我直言,这似乎很常见)并且对 RxJava 有足够的了解来提出实现方案,我将不胜感激。

感谢您的阅读。


我想你必须使用buffer http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(rx.Observable,%20rx.functions.Func1)运算符(也许你可以使用更简单的buffer http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(rx.functions.Func0),但我对此不确定)。也可以看看这另一个问题 https://stackoverflow.com/questions/31314345/rxjava-buffer-window-until-element-suffers-condition涵盖大致相同的主题并且这个 GitHub 页面 https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md进行更多讨论。希望这对您有帮助!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

基于内容的 RxJava Observable 缓冲区 的相关文章

  • Rx:组合 ThrottleFirst 和 Sample 运算符

    给定一个源可观察 S 我如何要求 RxJava Rx 生成可观察 D 即 立即从 S 发出第一项 在发射每个项目之后和发射下一个项目 L 之前等待至少 T 秒 其中 L 是 S 在等待期间发射的最后一个项目 如果 S 在等待时间 T 内没有
  • angular2 等待 if 条件下的 observable 完成

    我已经实现了这样的 if 语句 if this service check return true else 这个 if 条件等待后端的响应 但在 observable 执行之前 它会进入 else 语句并完成条件 而不在开始时检查 if
  • 如果另一个可观察量在 RxJS 中有数据,如何忽略一个可观察量中的所有数据?

    我有两个可观察量 一个从浏览器接收数据本地存储另一个来自database通过WebAPI 我想订阅它们 所以如果从本地存储有数据 不启动从database 如果从观察到本地存储没有任何 数据 调用ajax调用以获取数据WebAPI 在下面的
  • Angular5 valuechanges() 函数发生了什么? (角火2)

    我尝试理解 valueChanges 和 subscribe 我用AngularFire2 and Angular5 我的代码可以工作 但我不明白它是如何工作的 我的组件 ngOnInit this itemService getLastU
  • 如何处理 RxJava 中 Observable 中的 map() 中的异常

    我想做这个 Observable just bitmap map new Func1
  • 在 Observable 中 xhr.send() 之后获取服务器响应

    我实现了一种在 Angular 2 应用程序中发布文件的方法 它基于我找到的解决方案here https stackoverflow com a 35985489 2018084 由于 Angular 2 本身不支持文件上传 因此解决方案必
  • RxJava + 改造,获取列表并为每个项目添加额外信息

    我正在玩 RXJava 在 Android 中进行改造 我正在努力完成以下任务 我需要定期轮询一个给我 Observable gt 的调用 从这里我可以做到 一旦我得到这个列表 我想在每个交付中迭代并调用另一个方法来给我预计到达时间 所以只
  • RxJava 作为事件总线?

    我开始学习 RxJava 到目前为止我很喜欢它 我有一个片段与单击按钮时的活动进行通信 用新片段替换当前片段 谷歌推荐界面 http developer android com training basics fragments commu
  • Angular:仅刷新令牌一次

    我使用带有刷新令牌策略的 JWT 作为身份验证 并且我的 Angular 客户端中有一个拦截器 它将令牌作为标头发送 我在发送之前检查是否过期 并在需要时使用我的refreshToken刷新令牌 问题是当发送 2 个 或更多 请求时 两个请
  • 顺序订阅可观察数组

    在这里 我用过forkJoin从 rxjs 并行订阅可观察数组 但我想一一订阅 最好的解决方案是什么 下面是我的代码 var observables Observable forkJoin observables subscribe gt
  • 在 RxJava 中,如何在错误时重试/恢复,而不是完成可观察的

    我想要实现的是 监控特定变化的偏好 当检测到更改时 使用新值启动新的网络调用 变换结果 在 UI 中显示结果 我知道更改何时发生 现在我认为我需要对某个主题调用 onNext 然后这应该会触发 Rx 链 最后我可以更新 UI mViewPe
  • 带有 vararg observables 的 RxJava zip

    当我们确切地知道有多少个具有确切类型的可观察量并且我们想要压缩时 我们会这样做 Observable
  • Angular 5 订阅和取消订阅 Observable

    我必须从两个订阅获取数据 但我总是得到第一个订阅的数据 我有一个数据共享服务 import Injectable from angular core import BehaviorSubject from rxjs BehaviorSubj
  • 更新数据库时 LiveData 列表不更新

    我目前正在重构遗留代码以使用 Android 架构组件 并在一种存储库模式中设置房间数据库和齐射请求 因此 表示 域层要求存储库获取 LiveData Objects 来观察或告诉他与服务器同步 然后删除旧的数据库条目并从服务器重新获取所有
  • RxJava 相当于 orElse 是什么

    在其他语言的流 函数域中有一个常见的操作 那就是 orElse 它就像一个 if 当当前链没有得到任何结果时 它会更改为备用链 在具有 Maybe 类型的语言中 它基本上会继续 Some 类型的链或更改为 None 类型的 orElse 理
  • RxJS Angular2 在 Observable.forkjoin 中处理 404

    我目前正在链接一堆 http 请求 但是在订阅之前我无法处理 404 错误 My code 在模板中 service getData subscribe data gt this items data err gt console log
  • 如果可观察对象之一停止发出事件,为什么 Observable.race 无法工作?

    如果互联网连接丢失 我想在 webapp 中实现 websocket 重新连接 为了检测互联网是否丢失 我使用乒乓方法 这意味着我从客户端发送 ping 消息 服务器返回给我 pong 消息 当 webapp 加载时 我发送 init pi
  • Angular2 - *ngIf 和异步可观察量

    我在将 ngIf 与可观察变量一起使用时遇到问题 问题是 当我隐藏元素时 ngIf 然后再次显示 值将不会加载 因此 div someObservable async div 基本上当 showDiv 设置为true首先 加载了 someO
  • Single.zip - 如何捕获失败的呼叫并继续其余的网络呼叫?

    我正在进行 5 个并行网络调用 模拟其中 4 个成功 其中 1 个失败 失败的调用使整个Single zip 失败 即使其他 4 个网络调用成功 我也无法获得它们的结果 如何处理单个失败的网络调用的错误Single zip 并获得成功者的结
  • 如何使用 RxJava 处理分页?

    我正在考虑将我的 Android 应用程序转换为使用 Rxjava 进行网络请求 我目前访问的网络服务类似于 getUsersByKeyword String query int limit int offset 据我了解 Observab

随机推荐

  • 形成 Mockito“语法”

    Mockito 看起来像是一个非常可爱的 Java 存根 模拟框架 唯一的问题是我找不到任何关于使用他们的 API 的最佳方法的具体文档 测试中常用的方法包括 doXXX Stubber when T OngoingStubbing the
  • 如何绘制 UIBezierPath

    这就是我想做的 我有一个 UIBezierPath 我想将它传递给某种方法来绘制它 或者简单地从创建它的方法中提取它 我不确定如何指示应在哪个视图中绘制它 所有绘图方法都必须以 void drawRect CGRect rect 我可不可以
  • Visual Studio 2017 无法修改 - 需要重新启动

    我已经尝试更新和修改 Visual Studio 2017 两天了 但每次运行 Visual Studio 安装程序时 我都会收到以下消息 需要重新启动 如果需要 任何剩余的设置都将恢复 重启后 显然我重新启动了大约10次 知道我必须删除什
  • 外部 jQuery 根本不执行

    下面的代码放在里面时完全可以工作运行页面上的标签 我后来把代码移到了外面 js用于组织目的的文件导致代码停止工作 当应触发某些事件时没有任何反应 我确保脚本包含在给定页面上 此外 我通过 查看源 确保链接有效 当我单击脚本的路径时 脚本会在
  • Pycharm 在移动现有虚拟环境或删除并创建新虚拟环境后忽略新虚拟环境

    如果我在创建项目时允许PyCharm创建虚拟环境 则删除或移动venv文件夹 它不会让我选择一个新文件夹 我可以进入设置中的 Python 解释器菜单 然后选择我自己创建的现有解释器 注意 brokenInterpreter oldFold
  • 在 VBS 中使用环境变量的值时出现问题

    我是 VBScript 新手 编写了一个可以修改 XML 文件的小脚本 但我在将计算机名称放入 XML 时遇到问题 我从以下位置获取了计算机名称HOST使用这些行的环境变量 Set wshShell CreateObject WScript
  • DEP0800:升级到 VS 2015 Update 3 后无法部署 UWP 应用

    升级到 VS 2015 Update 3 后 我很难让我的 UWP 应用程序在调试模式下部署 2 gt 检查是否安装了所需的框架 2 gt 框架 Microsoft VCLibs 140 00 Debug x86 当前未安装应用程序包版本1
  • getline 跳过第一个输入字符 c++ [关闭]

    这个问题不太可能对任何未来的访客有帮助 它只与一个较小的地理区域 一个特定的时间点或一个非常狭窄的情况相关 通常不适用于全世界的互联网受众 为了帮助使这个问题更广泛地适用 访问帮助中心 help reopen questions 所以我制作
  • awk 中的提示和技巧 [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 如何使用 Scanner 只接受有效的 int 作为输入

    我正在努力使一个小程序更加健壮 我需要一些帮助 Scanner kb new Scanner System in int num1 int num2 0 System out print Enter number 1 num1 kb nex
  • 如何将视图作为图像保存到 SD 卡

    我的应用程序使用表格布局 表格行和文本视图创建一个拼图网格 我希望用户能够将该网格保存到 SD 卡上 以便可以打印 复制或加载到其他设备上 如何保存视图 以便在打印时它看起来就像在 Android 屏幕上一样 None
  • JNLP 作为 HTML 页面中的 Applet

    我试图在 HTML 页面中运行 JNLP 但 java 插件不运行 JNLP 只运行 Applet 这是我的代码
  • Spring Java 中许多 DAO 的策略

    我们现有的项目中有许多 DAO 目前没有接口 但这可能会改变 我们没有为每个 DAO 类连接一个 Spring 管理的 bean 并将它们注入到服务层 而是有一个类似这样的 DAO 工厂 public class DAOFactory pr
  • Angular 2 角色和权限

    我在我的项目中使用了 angular2 和 laravel 5 3 在laravel中 当用户登录服务器时 将发送用户的权限以处理角度授权 所以我写了一个守卫来保护无法访问的用户的路由 这是我的警卫类代码 export class Acce
  • 我可以取回诸如“悬停位置”、“刷位置”或“点击位置”之类的信息吗

    我想建立一个闪亮且有情节的交互式图表 Shiny 有一个内置功能来获取有关用户交互的信息 喜欢 输入 plot click 输入 plot dblclick 输入 plot hover and 输入 plot brush See http
  • 如何从 Firefox 扩展中的地址栏获取文本

    我正在构建一个火狐扩展 我在用XUL and JavaScript去做这个 我需要从 Firefox 浏览器的地址栏中获取文本 请不要与浏览器导航的 URL 混淆 它只是用户在页面重定向之前输入的文本 假设用户位于http www myex
  • ' 需要 1 个类型参数' aria-label='@angular/forms 通用类型 'Type' 需要 1 个类型参数'> @angular/forms 通用类型 'Type' 需要 1 个类型参数

    我升级了 Node 和 NPM 并重新安装了 Angular CLI angular cli 1 0 0 beta 11 webpack 8 node 6 5 0 os linux x64 生成了一个foo项目运行良好 然后我将 Angul
  • 品牌旁边的文字

    如何将 Bootstrap 品牌和任何随附文本一起视为品牌 我已经尝试过这个
  • Firefox 中的 JavaScript 错误

    我在 Firefox 中运行 JavaScript 时遇到问题 下面的脚本在除 Firefox 之外的其他浏览器中运行没有问题 var vars hash var hashes window location href slice wind
  • 基于内容的 RxJava Observable 缓冲区

    我使用 vertX 和 RxJava 启动了一个项目 但遇到了一个问题 但没有找到解决方案 我有一个 Observable 它为传入通信发出 WebSocketFrame 每个 WebSocketFrame 由有效负载 ByteBuffer