如何使用 Reactor 3.x 将 List 转换为 Flux

2024-01-04

我有一个异步调用节俭接口:

public CompletableFuture<List<Long>> getFavourites(Long userId){
    CompletableFuture<List<Long>> future = new CompletableFuture();
    OctoThriftCallback callback = new OctoThriftCallback(thriftExecutor);
    callback.addObserver(new OctoObserver() {
        @Override
        public void onSuccess(Object o) {
            future.complete((List<Long>) o);
        }

        @Override
        public void onFailure(Throwable throwable) {
            future.completeExceptionally(throwable);
        }
    });
    try {
        recommendAsyncService.getFavorites(userId, callback);
    } catch (TException e) {
        log.error("OctoCall RecommendAsyncService.getFavorites", e);
    }
    return future;
}

现在它返回一个 CompletableFuture。然后我用 Flux 调用它来做一些处理器。

public Flux<Product> getRecommend(Long userId) throws InterruptedException, ExecutionException, TimeoutException {
    // do not like it
    List<Long> recommendList = wrapper.getRecommend(userId).get(2, TimeUnit.SECONDS);

    System.out.println(recommendList);
    return Flux.fromIterable(recommendList)
            .flatMap(id -> Mono.defer(() -> Mono.just(Product.builder()
                    .userId(userId)
                    .productId(id)
                    .productType((int) (Math.random()*100))
                    .build())))
            .take(5)
            .publishOn(mdpScheduler);
}

但是,我想从getFavourites方法,我可以用它getRecommend method.
或者,您可以推荐一个Flux API,我可以转换List<Long> recommendList to Flux<Long> recommendFlux.


要转换一个CompletableFuture<List<T>> into a Flux<T>您可以使用Mono#fromFuture https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#fromFuture-java.util.concurrent.CompletableFuture- with Mono#flatMapMany https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#flatMapMany-java.util.function.Function-:

var future = new CompletableFuture<List<Long>>();
future.completeAsync(() -> List.of(1L, 2L, 3L, 4L, 5L),
    CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS));

Flux<Long> flux = Mono.fromFuture(future).flatMapMany(Flux::fromIterable);

flux.subscribe(System.out::println);

List<T>在回调中异步接收也可以转换为Flux<T>不使用CompletableFuture。 您可以直接使用Mono#create https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#create-java.util.function.Consumer- with Mono#flatMapMany https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#flatMapMany-java.util.function.Function-:

Flux<Long> flux = Mono.<List<Long>>create(sink -> {
  Callback<List<Long>> callback = new Callback<List<Long>>() {
    @Override
    public void onResult(List<Long> list) {
      sink.success(list);
    }

    @Override
    public void onError(Exception e) {
      sink.error(e);
    }
  };
  client.call("query", callback);
}).flatMapMany(Flux::fromIterable);

flux.subscribe(System.out::println);

或者简单地使用Flux#create https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#create-java.util.function.Consumer-一次通过多次发射:

Flux<Long> flux = Flux.create(sink -> {
  Callback<List<Long>> callback = new Callback<List<Long>>() {
    @Override
    public void onResult(List<Long> list) {
      list.forEach(sink::next);
    }

    @Override
    public void onError(Exception e) {
      sink.error(e);
    }
  };
  client.call("query", callback);
});

flux.subscribe(System.out::println);
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用 Reactor 3.x 将 List 转换为 Flux 的相关文章

随机推荐

  • 通过初始化列表实例化抽象类[重复]

    这个问题在这里已经有答案了 我想了解为什么编译器允许编译以下代码 include
  • Numpy 和 matplotlib 垃圾收集

    我有一个 python 脚本 它对不同的参数进行许多模拟 Q K 绘制结果并将其存储到磁盘 每组参数 Q K 生成 200x200x80 数据点的 3D 体积网格 这需要约 100 MB 的数据 然后逐层绘制该体积网格的一部分 生成约 60
  • 将参数绑定到信号/槽

    我基本上有多个事件信号 我想将它们连接到同一个插槽 我想知道的是如何将基于字符串的参数传递到同一插槽 以便该插槽知道该信号来自哪个信号 一种替代方法是制作与信号一样多的槽 然后以 1 1 的方式连接它们 但考虑到所有处理的代码非常相似 这种
  • 在 R 中生成滞后时间序列横截面变量

    我是 R 新用户 我有一个时间序列横截面数据集 尽管我已经找到了滞后时间序列数据的方法R 我还没有找到创建滞后时间序列横截面变量的方法 以便我可以在分析中使用它们 以下是您可以如何使用lag 功能与zoo 和面板系列数据 gt librar
  • 远程主机标识已更改?

    NPM 表示可能存在安全错误 meteor npm install mui system npm ERR Error while executing npm ERR usr local bin git ls remote h t ssh e
  • 开始创建自定义视图过渡

    我正在寻找有关创建自定义视图转换的教程 特别是 涉及除受影响的 UIView 之外的元素的转换 例如在转换发生时播放动画或修改正在转换的 UIView 的屏幕截图 我并不是指实现一组基本的过渡 幻灯片 淡入淡出等 苹果网站上有很多相关示例
  • 如何以相同的方式对两个数组进行排序?

    我希望输出为 3 0 2 36 1 1 键和以相同方式排序的值 three one two fun main var l 0 var letters arrayOf one two three var digits arrayOf 2 36
  • hadoop 空指针异常

    我正在尝试设置一个hadoop的多节点集群迈克尔 诺尔的方式 http www michael noll com tutorials running hadoop on ubuntu linux multi node cluster 使用两
  • Dockerfile 中 RUN 和 CMD 的区别

    我很困惑什么时候应该使用CMD vs RUN 例如 执行 bash shell 命令 即ls la 我总是会使用CMD或者有什么情况我会使用RUN 试图了解这两个类似的最佳实践Dockerfile指令 RUN https docs dock
  • 在 IIS 6 中的应用程序级别设置 NTAuthenticationProviders

    我在 IIS 中有以下结构 Internet Information Services local computer Web Sites Default Web Site MyApplication MyApplication是 IIS 中
  • 无法在脚本模块中创建 PowerShell 别名

    重现步骤 使用以下函数和别名在 WindowsPowerShell Modules TestAlias TestAlias psm1 中创建 TestAlias 模块 function foo write output foo New Al
  • 将 defaultdict(list) 写入文件

    之前问过一个问题使用defaultdict解析多分隔符文件 https stackoverflow com questions 46264408 using defaultdict to parse multi delimiter file
  • 在 Glassfish 中导入 ssl 证书

    我有以下问题 我从 comodo 为我的 glassfish Web 应用程序获得了免费证书 90 天 然后我通过以下方式将证书导入 glassfish 3 1http javadude wordpress com 2010 04 06 g
  • 如何在不使用 UITableViewDiffableDataSource 删除和插入的情况下重新加载项目?

    我正在使用我的应用程序中实现搜索屏幕UITableViewDiffableDataSource 每个单元格代表一个搜索命中 并在单元格标题中突出显示搜索匹配 有点像 Xcode 的 快速打开 窗口突出显示其结果项的部分内容 当在搜索字段中输
  • 字符串文字中 Informix JDBC、MONEY 和小数点分隔符的问题

    我在使用 MONEY 数据类型的 JDBC 应用程序时遇到问题 当我插入 MONEY 列时 insert into money test amt values 123 45 我得到了例外 Character to numeric conve
  • 为什么不可能窃取访问令牌?

    我正在学习 OAuth 我脑子里有一个问题 我找不到答案 我理解请求令牌是否授权应用程序使用 API 但是 一旦用户获得了访问令牌 如果有人窃取了他的访问令牌 会发生什么情况 想象一下我们有类似的东西http www example com
  • 如何在不运行测试的情况下获取所有标签和黄瓜场景

    我想以某种方式获取我在项目中使用的所有标签的列表 并获取我在项目中没有运行测试的黄瓜场景的所有名称 有人可以帮助我我该怎么做吗 根据 mpkorstanje的建议 您可以为此创建一个自定义插件 public class DryRunPlug
  • 如何测试MySQL事务?

    我有一个关于测试事务中的查询的问题 我使用 MySQL 事务已经有一段时间了 每次我这样做时 我都会使用类似的东西 doCommit true error mysql query BEGIN repeat this part with th
  • Object方法中的“this”关键字指向Window

    var name The Window var object name My Object getNameFunc function return function return this name console log object g
  • 如何使用 Reactor 3.x 将 List 转换为 Flux

    我有一个异步调用节俭接口 public CompletableFuture