Project Reactor:如何控制通量排放

2024-02-22

我有一个能发出一些光的通量Date. This Date映射到我在某些设备上运行的 1024 个模拟 HTTP 请求Executer.

我想做的是等待所有 1024 个 HTTP 请求,然后再发出下一个请求Date.

目前运行时,onNext()被调用多次,然后稳定在某个稳定的速率上。

我怎样才能改变这种行为?

附:如果需要的话,我愿意转向架构。

private void run() throws Exception {

    Executor executor = Executors.newFixedThreadPool(2);

    Flux<Date> source = Flux.generate(emitter ->
        emitter.next(new Date())
    );

    source
            .log()
            .limitRate(1)
            .doOnNext(date -> System.out.println("on next: " + date))
            .map(date -> Flux.range(0, 1024))
            .flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
                    .subscribeOn(Schedulers.fromExecutor(executor)))
            .subscribe(s -> System.out.println(s));

    Thread.currentThread().join();
}

HTTP请求模拟:

private static String simulateHttp() {
    try {
        System.out.println("start http call");
        Thread.sleep(3_000);
    } catch (Exception e) {}

    return "HTML content";
}

编辑:改编自答案的代码:

  • 首先,我的代码中有一个错误(另一个flatMap需要)
  • 其次,我补充说concurrency的参数1二者皆是flatMap(貌似两者都需要)

    Executor executor = Executors.newSingleThreadExecutor();
    
    Flux<Date> source = Flux.generate(emitter -> {
        System.out.println("emitter called!");
        emitter.next(new Date());
    });
    
    source
            .limitRate(1)
            .map(date -> Flux.range(0, 16))
            .flatMap(Function.identity(), 1) # concurrency = 1
            .flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
                    .subscribeOn(Schedulers.fromExecutor(executor)), 1) # concurrency = 1
            .subscribe(s -> System.out.println(s));
    
    Thread.currentThread().join();
    

您应该看看这些方法:

  • Flux.flatMap(Function, int, int) https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#flatMap-java.util.function.Function-int-int-
  • Flux.concatMap(Function, int) https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#concatMap-java.util.function.Function-int-.

concatMap确保在算子内按顺序处理通量上的元素:

内部消息的生成和订阅:该操作员正在等待一个 在生成下一个之前完成内部并订阅 它。

flatMap让你通过暴露来做同样的事情concurrency and prefetch参数可以让您更好地控制此行为:

并发参数允许控制可以有多少个发布者 并行订阅和合并。反过来,该论点表明 向上游发出的第一个 Subscription.request(long) 的大小。这 prefetch 参数允许给定任意预取大小 合并的发布者(换句话说,预取大小意味着 第一个 Subscription.request(long) 到合并的发布者)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Project Reactor:如何控制通量排放 的相关文章

  • 我是否需要安装 SQLite 才能使 SQLiteJDBC 正常工作?

    我想我只是没有 明白 如果我的计算机上尚未安装 SQLite 并且我想编写一个使用嵌入式数据库的 Java 应用程序 并且我将 SQLiteJDBC JAR 下载 导入到我的项目中 那么这就是我所需要的吗 或者 我是否需要先安装 SQLit
  • 使用 Java 的 Apache Http 摘要身份验证

    我目前正在开发一个 Java 项目 但无法使 http 摘要身份验证正常工作 我尝试使用 Apache 网站 但没有帮助 我有一个需要 HTTP 摘要身份验证的网站 DefaultHttpClient httpclient new Defa
  • Netbeans 8.1 Gnome 3 GTK+ UI 字体和选项卡高度

    我刚刚在运行 GNOME 3 桌面的 Ubuntu 16 04 上安装了 NetBeans 8 1 如果可能的话 我想继续使用 IDE 的 GTK 外观和感觉 但 UI 上的字体 尤其是选项卡中的字体 太小且重叠 我尝试添加 fontsiz
  • 带路径压缩算法的加权 Quick-Union

    有一种 带路径压缩的加权快速联合 算法 代码 public class WeightedQU private int id private int iz public WeightedQU int N id new int N iz new
  • 如何在由子控件组成的 SWT 复合材料上跟踪鼠标?

    我创建了自己的控件 我想跟踪鼠标并添加一个MouseTrackListener 很遗憾MouseEnter and MouseLeave当鼠标移动到我的合成部分 即标签和按钮 上时 也会生成事件 Mouse enter mouse ente
  • Java Logger 未记录到 Netbeans 中的输出

    我正在 Netbeans 中使用 Maven 启动一个 Java 项目 我编写了一些代码来使用 Logger 类进行日志记录 但是 日志记录似乎不起作用 在程序开始时 我运行 Logger getLogger ProjectMainClas
  • Android:文本淡入和淡出

    我已阅读此 stackoverflow 问题和答案 并尝试实现文本淡入和淡出 Android中如何让文字淡入淡出 https stackoverflow com questions 8627211 how to make text fade
  • Jframe 内有 2 个 Jdialogs 的 setModal 问题

    当我设置第一个选项时 我遇到了问题JDialog模态 第二个非模态 这是我正在尝试实现的功能 单击 测试对话框 按钮 一个JDialog有名字自定义对话框 主要的将会打开 如果单击 是 选项自定义对话框主 其他JDialog named 自
  • 从 MATLAB 调用 Java?

    我想要Matlab程序调用java文件 最好有一个例子 需要考虑三种情况 Java 内置库 也就是说 任何描述的here http docs oracle com javase 6 docs api 这些项目可以直接调用 例如 map ja
  • 在 Java 中如何找出哪个对象打开了文件?

    我需要找出答案哪个对象在我的 Java 应用程序中打开了一个文件 这是为了调试 因此欢迎使用工具或实用程序 如果发现哪个对象太具体了 这class也会很有帮助 这可能很棘手 您可以从使用分析器开始 例如VisualVM http visua
  • 如何在 Spring 中使 @PropertyResource 优先于任何其他 application.properties ?

    我正在尝试在类路径之外添加外部配置属性资源 它应该覆盖任何现有的属性 但以下方法不起作用 SpringBootApplication PropertySource d app properties public class MyClass
  • Android 无法解析日期异常

    当尝试解析发送到我的 Android 客户端的日期字符串时 我得到一个无法解析的日期 这是例外 java text ParseException 无法解析的日期 2018 09 18T00 00 00Z 位于 偏移量 19 在 java t
  • 如何使用 JMagick 转换色彩空间?

    如何使用 JMagick API 转换色彩空间 例如 CMYK gt RGB 和 RGB gt CMYK None
  • 如何在.NET中使用java.util.zip.Deflater解压缩放气流?

    之后我有一个转储java util zip Deflater 可以确认它是有效的 因为 Java 的Inflater打开它很好 并且需要在 NET中打开它 byte content ReadSample sampleName var inp
  • 提高 PostgreSQL 1 亿数据左连接查询性能

    我在用Postgresql 9 2 version Windows 7 64 bit RAM 6GB 这是一个Java企业项目 我必须在我的页面中显示订单相关信息 有三个表通过左连接连接在一起 Tables TV HD 389772 行 T
  • JDBC 时间戳和日期 GMT 问题

    我有一个 JDBC 日期列 如果我使用 getDate 则会得到 date 仅部分2009 年 10 月 2 日但如果我使用 getTimestamp 我会得到完整的 date 2009 年 10 月 2 日 13 56 78 890 这正
  • 为什么\0在java中不同系统中打印不同的输出

    下面的代码在不同的系统中打印不同的输出 String s hello vsrd replace 0 System out println s 当我在我的系统中尝试时 Linux Ubuntu Netbeans 7 1 它打印 When I
  • 将 JScrollPane 添加到 JFrame

    我有一个关于向 Java 框架添加组件的问题 我有一个带有两个按钮的 JPanel 和一个添加了 JTable 的 JScrollPane 我想将这两个添加到 JFrame 中 我可以将 JPanel 添加到 JFrame 或将 JScro
  • partitioningBy 必须生成一个包含 true 和 false 条目的映射吗?

    The 分区依据 https docs oracle com javase 8 docs api java util stream Collectors html partitioningBy java util function Pred
  • Java 和/C++ 在多线程方面的差异

    我读过一些提示 多线程实现很大程度上取决于您正在使用的目标操作系统 操作系统最终提供了多线程能力 比如Linux有POSIX标准实现 而windows32有另一种方式 但我想知道编程语言水平的主要不同 C似乎为同步提供了更多选择 例如互斥锁

随机推荐

  • Runtime.getRuntime().exec(String[]) 安全性

    我正在使用 Runtime getRuntime exec String 来运行进程 其中 String 数组的某些元素是由用户定义的 这安全吗 或者它允许将代码注入终端吗 如果不安全 我该怎么做才能避免代码注入 它必须是平台独立的 正如我
  • 如何在迭代时向列表添加值[重复]

    这个问题在这里已经有答案了 我有一个这样的场景 List
  • Android-R.java 文件未找到

    在处理 android 项目时 我被 R java 文件困住了 即使我清理项目 项目 gt clean 也找不到该文件 但该文件仍然找不到 即使我创建一个相同的新项目出现问题 我需要做什么 我期待有价值的答复 以便我可以克服这个问题 R j
  • 不知道如何导出 Objective-C 类。 i386 体系结构的未定义符号

    我正在尝试在 OSX 上的 GTK 上做一些工作 但遇到了一些麻烦 因为说实话 我对 Objective C 不太熟悉 我有足够的编程经验 可以很快掌握基本语法 并且可以在文档中查找我需要的内容 但我遇到的问题与链接库并将类暴露给我链接的程
  • 使用 Linkify.addLinks 与 Html.fromHtml 结合使用

    我有一个TextView通过调用以下命令获取其数据集 tv setText Html fromHtml myText 字符串myText包含部分格式化的 html 数据 例如 它可能有字体标签 但没有任何使用格式设置的 url 链接 a h
  • 通过 MsBuildProj 文件转换多个项目的多个配置文件

    我正在尝试根据模式 所有形式的文件 在文件列表上运行多个命令 config在给定目录的子目录下 如下所示
  • Node.js docker 容器未更新以适应卷的变化

    我正在尝试在我的 Windows 计算机上托管一个开发环境 该计算机托管前端和后端容器 到目前为止 我只在后端工作 所有文件都位于 C 盘上 通过 Docker Desktop 共享 我有以下 docker compose 文件和 Dock
  • 通过 Response.ContentType、Response.End 输出文件时如何显示进度状态/旋转器?

    我有一个网络表单下载链接按钮 在按钮的点击事件上我正在获取数据 然后生成 XLSX 文件供下载 在文件生成过程中 响应 Clear 叫做 响应内容类型被设定并最终响应 End 叫做 我需要显示微调器 gif在那次操作期间 文件生成并弹出文件
  • 角度区域

    什么是区域 Angular ngZone 与 zone js 有何不同 什么时候应该使用它们 有人可以帮助提供使用 ngZone 的实际示例吗 我在这里浏览了角度文档 但是我无法完全理解 https angular io api core
  • Systemd http 健康检查

    我在 Redhat 7 1 上有一个服务 我使用 systemctl 启动 停止 重新启动和状态来控制 有一次 systemctl 状态返回 active 但服务 背后 的应用程序响应的 http 代码与 200 不同 我知道我可以使用 M
  • 每次插入数据库时​​如何找到数据字段(例如电子邮件)的唯一性?

    我正在开发一个 Android 应用程序 用户在其中输入姓名 电子邮件和密码进行注册 这个输入过程工作得很好 现在我想在每次用户输入他 她的电子邮件时检查输入的电子邮件是否已存在于我的数据库中 为此 我在 DBHelper 类中尝试了以下方
  • 4.1 android模拟器未检测到sd卡

    我曾经使用 4 1 kitkat x86 android 模拟器和 SD 卡进行测试 将 Android Studio 升级到 2 3 后 我无法再访问 android 中提供的 SD 卡 这使得我无法进行测试 谷歌还没有对此的答案 我也没
  • 等待所有 pid 在 php 中退出

    我的问题是这样的 我正在分叉一个进程 以便可以加快磁盘上文件的访问时间 我将这些文件中的所有数据存储在本地桌面上的 tmp 文件中 理想情况下 在所有进程完成后 我需要访问该 tmp 文件并将该数据放入数组中 然后我取消链接 tmp 文件
  • 自动化 sftp 上传过程

    我正在寻找一种将文件 目录结构从一台服务器上传到另一台服务器的方法 在我的情况下 唯一可能的方法是 SFTP 上传 有没有简单的方法来上传它 使用脚本或其他东西 而不需要对文件 目录进行存档 我想在远程服务器上重新创建 谢谢你 也许可以使用
  • 如何获取不带参数的文件名?

    我需要找到我包含的不带 GET 参数的文件的文件名 例如 如果当前 URL 是 我想要返回 file php 我发现了什么 basename SERVER REQUEST URI 返回 file php a b c d 就我而言 我在购物车
  • DataAnnotations:递归验证整个对象图

    我有一个对象图 上面散布着 DataAnnotation 属性 其中对象的某些属性是本身具有验证属性的类 等等 在以下场景中 public class Employee Required public string Name get set
  • 什么东西永远不等于自己?

    Prolog 中是否存在不等于其自身的价值 我写的answer https stackoverflow com a 53404595 10631003对某些人关于树的最小值的问题 https stackoverflow com q 5339
  • PHP如何检索数组值

    我有以下数组 我想检索name comment and each of the tags 插入数据库 我如何检索数组值 另外 我可以仅过滤大于 3 个字符且仅包含 a Z0 9 值的标签值吗 非常感谢 Array folder gt tes
  • Facebook Like 按钮以 0 宽度和 0 高度呈现?

    我是 facebook api 的新手 所以我不知道这是否是一个新手问题 我所做的是我遵循快速开始 https developers facebook com docs javascript quickstart v2 3 我将以下代码片段
  • Project Reactor:如何控制通量排放

    我有一个能发出一些光的通量Date This Date映射到我在某些设备上运行的 1024 个模拟 HTTP 请求Executer 我想做的是等待所有 1024 个 HTTP 请求 然后再发出下一个请求Date 目前运行时 onNext 被