如何为流数据创建 Flux/Publisher

2024-05-06

我正在使用轮询方法定期获取数据。新数据可能随时到达。我想向我的客户公开一个反应式接口。因此,我想创建一个发布者(Flux?),它会在新数据可用时发布新数据并通知订阅者。我怎么做?我看到的所有 Flux 示例都是针对数据已知/可用的情况。实际上,我想要类似基于队列的 Flux 之类的东西,并且我的轮询线程在发现新数据时可以继续填充队列。


对于简单的事情,您可能想要使用DirectProcessor。这不是最复杂的通量汇,但它会让您有所了解。

我写了一个简单的例子:

Flux<String> hot = DirectProcessor.create<String>()
hot.onNext("Hello")//not printed
hot.subscribe(it -> System.out.println(it))

hot.onNext("Goodbye")//printed
Thread.sleep(100)
hot.onNext("foo")//printed

DirectProcessor 实现了 Flux,因此您可以像 Flux 一样使用它。

正如您所看到的,在订阅热源之前添加的元素不会被传递到订阅。

查看其他帖子,Flux#create 和 Flux#generate 可能是不错的起点。Flux.create 和 Flux.generate 之间的区别 https://stackoverflow.com/questions/49951060/difference-between-flux-create-and-flux-generate

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何为流数据创建 Flux/Publisher 的相关文章

  • 如何在日期选择器中设置不在当前月份的单元格的样式

    我目前正在为我的 JavaFX 应用程序制作注册表 问题是 当日期选择器中的单元格不在页面的月份上时 我想让该单元格变灰 让我们看看我当前的日期选择器 我的日期选择器 正如您所看到的 我希望下个月的日期 27 日 28 日 30 日以及 1
  • 热重载在docker中运行的java程序

    我开发了一个java程序 应该在docker中运行 然而 我在调试docker中运行的java程序时遇到了很多痛苦 我在网上搜索 一些教程提出了像 spring dev tools 这样的工具 因为我的java程序是基于spring boo
  • Java 中的 XPath 节点集

    我在 eclipse 中有这段代码 NodeSet nodes NodeSet xPath evaluate expression inputSource XPathConstants NODESET 它给我 NodeSet 上的编译时错误
  • 如何在 JFace 的 TableViewer 中创建复选框?

    我创建了一个包含两列的 tableViewer 我想将其中一列设为复选框 为此 我创建了一个 CheckBoxCellEditor 但我不知道为什么它不起作用 名为 tableName 的列显示其值正常 色谱柱规格如下 String COL
  • 如何在一行中将字符串数组转换为双精度数组

    我有一个字符串数组 String guaranteedOutput Arrays copyOf values values length String class 所有字符串值都是数字 数据应转换为Double QuestionJava 中
  • ExceptionConverter:java.io.IOException:文档没有页面。我正在使用 iText

    当我执行下面的代码时 File f new File c sample pdf PdfWriter getInstance document new FileOutputStream f document open System out p
  • 如何查找 Android 设备中的所有文件并将它们放入列表中?

    我正在寻求帮助来列出 Android 外部存储设备中的所有文件 我想查找所有文件夹 包括主文件夹的子文件夹 有办法吗 我已经做了一个基本的工作 但我仍然没有得到想要的结果 这不起作用 这是我的代码 File files array file
  • Java 页面爬行和解析之 Crawler4j 与 Jsoup

    我想获取页面的内容并提取其中的特定部分 据我所知 此类任务至少有两种解决方案 爬虫4j https github com yasserg crawler4j and Jsoup http jsoup org 它们都能够检索页面的内容并提取其
  • 如何在jsp代码中导入java库?

    我有以下jsp代码 我想添加 java io 等库 我怎样才能做到这一点
  • 使用替换字符串中多个单词的最有效方法[重复]

    这个问题在这里已经有答案了 此刻我正在做 Example line replaceAll replaceAll cat dog replaceAll football rugby 我觉得那很丑 不确定有更好的方法吗 也许循环遍历哈希图 ED
  • Prim 的迷宫生成算法:获取相邻单元格

    我基于 Prim 算法编写了一个迷宫生成器程序 该算法是 Prim 算法的随机版本 从充满墙壁的网格开始 选择一个单元格 将其标记为迷宫的一部分 将单元格的墙壁添加到墙壁列表中 While there are walls in the li
  • 序列化对象以进行单元测试

    假设在单元测试中我需要一个对象 其中所有 50 个字段都设置了一些值 我不想手动设置所有这些字段 因为这需要时间而且很烦人 不知何故 我需要获得一个实例 其中所有字段都由一些非空值初始化 我有一个想法 如果我要调试一些代码 在某个时候我会得
  • Java直接内存:在自定义类中使用sun.misc.Cleaner

    在 Java 中 NIO 直接缓冲区分配的内存通过以下方式释放 sun misc Cleaner实例 一些比对象终结更有效的特殊幻像引用 这种清洁器机制是否仅针对直接缓冲区子类硬编码在 JVM 中 或者是否也可以在自定义组件中使用清洁器 例
  • Windows 上的 Nifi 命令

    在我当前的项目中 我一直在Windows操作系统上使用apache nifi 我已经提取了nifi 0 7 0 bin zip文件输入C 现在 当我跑步时 bin run nifi bat as 管理员我在命令行上看到以下消息 但无法运行
  • Keycloak - 自定义 SPI 未出现在列表中

    我为我的 keycloak 服务器制作了一个自定义 SPI 现在我必须在管理控制台上配置它 我将 SPI 添加为模块 并手动安装 因此我将其放在 module package name main 中 并包含 module xml 我还将其放
  • 如何配置eclipse以保持这种代码格式?

    以下代码来自 playframework 2 0 的示例 Display the dashboard public static Result index return ok dashboard render Project findInv
  • 查看Jasper报告执行的SQL

    运行 Jasper 报表 其中 SQL 嵌入到报表文件 jrxml 中 时 是否可以看到执行的 SQL 理想情况下 我还想查看替换每个 P 占位符的值 Cheers Don JasperReports 使用 Jakarta Commons
  • 休眠以持久保存日期

    有没有办法告诉 Hibernate java util Date 应该持久保存 我需要这个来解决 MySQL 中缺少的毫秒分辨率问题 您能想到这种方法有什么缺点吗 您可以自己创建字段long 或者使用自定义的UserType 实施后User
  • com.jcraft.jsch.JSchException:身份验证失败

    当我从本地磁盘上传文件到远程服务器时 出现这样的异常 com jcraft jsch JSchException Auth fail at org apache tools ant taskdefs optional ssh Scp exe
  • java8 Collectors.toMap() 限制?

    我正在尝试使用java8Collectors toMap on a Stream of ZipEntry 这可能不是最好的想法 因为在处理过程中可能会发生异常 但我想这应该是可能的 我现在收到一个我不明白的编译错误 我猜是类型推理引擎 这是

随机推荐

  • 有没有办法使用 Mayavi 填充陀螺仪表面的一侧?

    我正在使用 Mayavi 绘制陀螺仪的等值面 我的问题是我需要通过填充两个生成区域的一侧来获得更坚固的结构 在下面的图片中 您可以看到我生成的等值面的外观以及填充一侧后的外观 我生成的等值面 它应该是什么样子 等值面可以通过以下方程生成 U
  • Swift 中的 NSCollectionView 选择处理

    使用 Swift 学习 我一整天都在做这件事 但进展甚微 需要知道何时选择 NSCollectionView 中的项目 最终目标是让项目突出显示 并能够使用删除键将其从集合中删除 我的 NSCollectionView 绑定到 ArrayC
  • 在 eclipse indigo 中找不到 Window builder pro

    我读到 Eclipse Indigo 安装了 Window Builder Pro 插件 但我不知道它在哪里 文件 新建 项目没有给我选择 Window Builder Pro 的选项 如何找到它 您需要将此更新站点添加到 Eclipse
  • Font Awesome 4.0.0 缺少图标

    是我一个人的问题 还是 FontAwesome 3 x x 中实际上有 FontAwesome 4 0 0 中缺少的图标 如果是这样 这肯定会让 FontAwesome 对我来说不再那么棒 例如 我似乎找不到相当于icon remove 没
  • java格式化表格输出

    所以我正在尝试格式化我的输出 System out println Menu nItem tItem t tPrice tQuantity for int i 0 i
  • jQuery 单击事件更改 php 会话变量

    对此最好的方法是什么 因为 正如我发现的 只有在尝试过之后才完全有意义 p 你不能在 javascript 条件下设置 PHP 变量 杜尔赫 我能想到的唯一解决方案是对一个处理会话变量的小型 PHP 文件进行 AJAX 调用 elm cli
  • 给定字符串的所有可能排列?

    我该如何在 Ruby 中做到这一点 p abc all possible permutations 将返回 abc acb bca bac cba cab Edit 感谢雅库布 汉普尔 class String def all possib
  • 仅以 int 形式显示和保存小时数

    如何仅显示小时并使用 int 变量 我的意思是打印时间 例如 20 30 44 PM 我只想存储小时 即 int 变量中的 20 小时 怎么做 有谁知道的话可以告诉我密码吗 谢谢 尝试使用日历get http docs oracle com
  • 无法让 Rails Server 与 MySQL 一起使用

    我正在尝试让我的 Rails 应用程序与 MySQL 一起使用 而不是默认的 SQLite 我创建了一个新项目 强制使用 MySQL 它似乎工作正常 它在 Gem 文件中添加了 gem 条目 如下所示 source https rubyge
  • StateObject 作为 init() 中另一个对象的参数

    我试图将 StateObject 用户传递给authenticationHelper 但我不能 因为 IDE 说 在初始化所有存储的属性之前使用 self 即使它是在结构体的开头初始化的 我考虑过将 user 的初始化移至 init 但同样
  • Windows批处理编程中的用户输入操作

    我想以 ddmmyyyy 格式接受用户的输入 当用户以这种格式输入日期时 文件将移动到相应的文件夹 我尝试了以下代码但失败了 SET p str 输入文件夹的名称 例如30062011 移动 C Documents and Settings
  • 如何从图书馆创建承诺

    我对此感到困惑 因为到目前为止我发现的每个教程都假设我可以编辑库代码 或者库只有回调或回调作为最后一个参数 我正在使用的库的每个功能都设置为function successCallBack result FailCallBack error
  • java中使用awt.Toolkit和Clipboard是否可以知道剪贴板中复制的内容是否是mp3文件

    我正在尝试编写一个运行于背景 and monitors复制 a 的复制操作 mp3 file or a 文件夹包含 a mp3 file Clipboard cb Toolkit getDefaultToolkit getSystemCli
  • 返回值必须是 ?Illuminate\\Database\\Query\\Builder, App\\Models\\ModelName 返回的类型

    我试图得到以下回应 user id 1 first name john last name doe email email protected cdn cgi l email protection phone number 12345678
  • 随机梯度下降可以与 TensorFlow 一起使用吗?

    我设计了一个全连接 MLP 具有 2 个隐藏层和 1 个输出层 如果我使用批量或小批量梯度下降 我会得到一个很好的学习曲线 But a straight line while performing Stochastic Gradient D
  • asp.net core 3.0 web api request.body 和 [frombody] 冲突 [已关闭]

    Closed 这个问题需要细节或清晰度 help closed questions 目前不接受答案 我需要让身体通过request bodyfrombody后 但测试了2天还没有找到解决方案 我已经添加Request EnableBuffe
  • Yii2:ActiveController 中的 REST API 操作

    在文档指南中有示例 namespace app controllers use yii rest ActiveController class UserController extends ActiveController public m
  • 当输入为空时如何禁用按钮?

    我试图在输入字段为空时禁用按钮 React 中最好的方法是什么 我正在做类似以下的事情
  • 使用 scikit 确定每个特征对特定类别预测的贡献

    我正在使用 scikit 额外的树分类器 model ExtraTreesClassifier n estimators 10000 n jobs 1 random state 0 一旦模型拟合并用于预测类别 我想找出每个特征对特定类别预测
  • 如何为流数据创建 Flux/Publisher

    我正在使用轮询方法定期获取数据 新数据可能随时到达 我想向我的客户公开一个反应式接口 因此 我想创建一个发布者 Flux 它会在新数据可用时发布新数据并通知订阅者 我怎么做 我看到的所有 Flux 示例都是针对数据已知 可用的情况 实际上