如何在 Reactor 中使用多线程执行 flatMap?

2024-01-08

我尝试过运行flatMap on a Flux range其次是subscribeOn似乎所有操作都在同一个线程上运行。这是正常的吗?

Flux.range(0, 1000000).log().flatMap{ it + 1 }.subscribeOn(Schedulers.parallel()).subscribe()

您可以创建一个ParallelFlux https://projectreactor.io/docs/core/release/reference/#advanced-parallelizing-parralelflux如下:

Flux.range(0, 100000).parallel(2).runOn(Schedulers.parallel()).log().map{ it + 1 }.subscribe()
                      ^^^^^^^^^^^  ^^^^^^use runOn ^^^^^^^^^^^
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 Reactor 中使用多线程执行 flatMap? 的相关文章

  • 终结器线程的范围是什么 - 每个应用程序域或每个进程?

    根据我的所有阅读 应该有一个 GC 线程来调用所有终结器 现在的问题是这个 一个 线程的范围是什么 每个进程或每个应用程序域 因为域的整体目的是在一个进程空间中分离并创建 独立 的不同应用程序 I read here http dn cod
  • JetPack Compose - 卡中行中的weight() 不起作用

    创建 Android 应用程序时 我将一些可组合项放在卡片的一行中 如下所示 但它没有按我的预期工作 我添加 weight 1f 的可组合项不再显示 data class Test val title String val text Str
  • Python 2.7:支持一个端口上多个连接的流式 HTTP 服务器

    我正在寻找一个标准的Python 2 7包 提供一个同时执行的HTTP服务器流媒体同一端口号上的连接 嘿 各位版主 请停止将我的问题标记为想要以非流媒体方式提供服务的问题的重复项 例如 python 中的多线程 Web 服务器 https
  • 如何停止提交给 ExecutorService 的 Callable?

    我正在尝试实现一个示例应用程序来测试Callable and ExecutorService接口 在我的应用程序中我已经声明 ExecutorService exSvc Executors newSingleThreadExecutor T
  • 如何使用表内的 JSONB 数据类型和 PostgreSQL JDBC 驱动程序将 JSON 对象存储到 PostgreSQL 中

    我想将以下 json 对象保存到 PostgreSQL 数据库表中as jsonb fname john lname doe 我当前使用 PGObject 创建对象并将类型设置为 jsonb 并将值作为 json 字符串传递 寻找更好的 m
  • 这个等待通知线程语义的真正目的是什么?

    我刚刚遇到一些代码 它使用等待通知构造通过其其他成员方法与类中定义的线程进行通信 有趣的是 获取锁后 同步范围内的所有线程都会在同一锁上进行定时等待 请参见下面的代码片段 随后 在非同步作用域中 线程执行其关键函数 即 做一些有用的事情1
  • 使用单独的线程在java中读取和写入文件

    我创建了两个线程并修改了 run 函数 以便一个线程读取一行 另一个线程将同一行写入新文件 这种情况会发生直到整个文件被复制为止 我遇到的问题是 即使我使用变量来控制线程一一执行 但线程的执行仍然不均匀 即一个线程执行多次 然后控制权转移
  • 奇怪的跨线程 UI 错误

    我正在编写一个 WinForms 应用程序 它有两种模式 控制台或 GUI 同一解决方案中的三个项目 一个用于控制台应用程序 一个用于 UI 表单 第三个用于保存两个界面也将连接的逻辑 控制台应用程序运行绝对流畅 保存用户选择的模型 它有一
  • 这是 C# 的有效、惰性、线程安全的 Singleton 实现吗?

    我实现了这样的单例模式 public sealed class MyClass public static MyClass Instance get return SingletonHolder instance static class
  • 如何在Android Compose中使用otf类型字体文件?

    我正在学习使用 Android Jetpack Compose 现在我有一个正则 otf字体文件在资产 字体 我想在文本中使用它 java lang RuntimeException Font asset not found commonu
  • 父子进程之间的通信

    我正在尝试创建一个具有一个或多个子进程的 Python 3 程序 父进程生成子进程 然后继续处理自己的业务 有时我想向特定的子进程发送一条消息 由其捕获该消息并采取行动 此外 子进程在等待消息时需要处于非锁定状态 它将运行自己的循环来维护服
  • 如何使信号量超时

    Go 中的信号量是通过通道来实现的 一个例子是这样的 https sites google com site gopatterns concurrency semaphores https sites google com site gop
  • 定期更新 SWT 会导致 GUI 冻结

    Problem 当 GUI 字段定期更新时 SWT 会冻结 我想要一个基于 SWT 的 GUI 其中文本字段的值会定期递增 最初我从单独的线程访问 textField 导致抛出异常 线程 Thread 0 org eclipse swt S
  • 线程睡眠和Windows服务

    我正在开发一个 Windows 服务 该服务存在一些问题Thread Sleep 所以我想我会尝试使用计时器 因为这个问题建议 在 Windows 服务中使用 Thread Sleep https stackoverflow com que
  • 如何将 Java 赋值表达式转换为 Kotlin

    java中的一些东西就像 int a 1 b 2 c 1 if a b c System out print true 现在它应该转换为 kotlin 就像 var a Int 1 var b Int 2 var c Int 1 if a
  • 异步多播委托

    我最近在一个广泛使用事件的项目上做了一些工作 我需要做的事情之一是在多播委托上异步调用多个事件处理程序 我认为诀窍是对 GetInvocableList 中的每个项目调用 BeginInvoke 但似乎那里不存在 BeginInvoke 有
  • 如何将 Flux 包装在 ResponseEntity 中

    我需要我的端点以以下 json 格式返回数据 code SUCCESS message SUCCESS errors null data 这是我的控制器代码 GetMapping value productSubcategories pro
  • 在 C# 中创建加密随机数的最快、线程安全的方法?

    请注意 在多个线程上并行生成随机数时 加密随机数生成器不是线程安全的 使用的发电机是RNGCryptoServiceProvider它似乎重复了很长一段随机位 128 位 重现此情况的代码如下所示 缺乏使用锁来保护访问RNGCryptoSe
  • 主线程如何在该线程之前运行?

    我有以下代码 public class Derived implements Runnable private int num public synchronized void setA int num try Thread sleep 1
  • 暂停下载线程

    我正在用 C 编写一个非常简单的批量下载程序 该程序读取要下载的 URL 的 txt 文件 我已经设置了一个全局线程和委托来更新 GUI 按下 开始 按钮即可创建并启动该线程 我想要做的是有一个 暂停 按钮 使我能够暂停下载 直到点击 恢复

随机推荐