此代码示例中有几个问题。
我假设这是一个反应式 Web 应用程序。
首先,不清楚您如何创建电子邮件正文;您是从数据库还是远程服务中获取内容?如果它主要受 CPU 限制(而不是 I/O),那么您不需要将其包装为响应式类型。现在如果它应该是包装在Publisher
并且电子邮件内容对于所有用户都是相同的,使用cache
运营商是一个不错的选择。
Also, Flux.fromIterable(userRepository.findAllByRole(Role.USER))
建议您从反应式上下文中调用阻塞存储库。
你应该never在 a 中执行大量 I/O 操作doOn***
操作员。这些是为日志记录或轻微副作用操作而设计的。事实是你需要.block()
这是另一个线索,表明您将阻塞整个反应管道。
最后一篇:你不应该打电话subscribe
Web 应用程序中的任何位置。如果这绑定到 HTTP 请求,那么您基本上会触发反应式管道,而无法保证资源或完成情况。呼唤subscribe
触发管道但不会等到它完成(此方法返回一个Disposable
).
一个更“典型”的示例如下所示:
Flux<User> users = userRepository.findAllByRole(Role.USER);
String emailBody = emailContentGenerator.createEmail();
// sendEmail() should return Mono<Void> to signal when the send operation is done
Mono<Void> sendEmailsOperation = users
.flatMap(user -> sendEmail(user.getEmail(), emailBody, subject))
.then();
// something else should subscribe to that reactive type,
// you could plug that as a return value of a Controller for example
如果您不知何故被阻塞组件困住了(sendEmail
例如,一),您应该在特定的调度程序上安排这些阻塞操作,以避免阻塞整个反应管道。为此,请查看反应堆参考文档中的调度程序部分 http://projectreactor.io/docs/core/release/reference/#schedulers.