Spring 网关:使用 Mono.fromCallable() 包装阻塞调用

2023-12-22

场景:在我的 Spring 云网关中,我需要根据数据库中保存的一些数据修改请求。因此我正在实现一个网关过滤器。考虑以下实现:

存储库接口:

public interface MyReactiveRepository{
    Mono<String> getSomeData();
}

存储库实施:

@Repository
public class MyReactiveRepositoryImpl implements MyReactiveRepository
{
    private final JdbcTemplate template;

    public MyReactiveRepositoryImpl(@Autowired JdbcTemplate template){
        this.template = template;
    }

    @Override
    public Mono<String> getSomeData(){
        return Mono.fromCallable(
            // Wrapping blocking code in a Mono:
            () -> template.queryForObject(SOME_SQL_QUERY, String.class)
        ).subscribeOn(Schedulers.boundedElastic());
    }
}

和过滤器:

@Component
public class MyGatewayFilter implements GatewayFilter
{
    private final MyReactiveRepository repository;

    public MyGatewayFilter(@Autowired MyReactiveRepository repository){
        this.repository = repository;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain){
        return repository.getSomeData()
            .flatMap(
                (repositoryData) -> {
                    // Modify the exchange based on the repository data in some way...
                    // Then return
                    return chain.filter(exchange);
                }
            );
    }
}

这里的关键是我有阻塞 JDBCTemplate,我“尝试”使其成为非阻塞,以保留 spring gateway 的非阻塞性质。当然,我可以利用 Spring R2DBC,但是通过 Google 搜索,我得到的印象是“Mono.fromCallable+BoundedElastic-scheduler”技巧也可以工作。

我的问题是,与使用 R2DBC 的实现相比,我的实现实际上是否具有相同的性能:我的实现有哪些缺点?

谢谢

Edit:我第一次看到“fromCallable”模式是在项目反应堆常见问题解答 https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking所以我假设有些情况是可以接受的?


我在生产中使用了这两种方法:包装的 JDBC 调用和纯 R2DBC。它们是不同的应用程序,所以我不会谈论性能,因为还有其他参数对其影响很大,例如ORM 的存在。

您的方法大部分是正确的,但确实存在一些问题。第一个是您当前正在将调度程序“泄漏”到存储库之外,这意味着连接到它的任何流现在都将在您想要用于数据库调用的有界弹性调度程序上运行您的代码。这最终将导致令人讨厌的僵局。确保定义输出调度程序(并行或单个)并在离开存储库之前将执行转移到那里。

我在这里看到的另一个问题是,您的实现不能很好地使用重试和重复机制。使用其中任何一个都不会导致进行数据库调用,而是使用之前获取的数据。使用Mono.defer解决这个问题:

    public Mono<String> getSomeData(){
        return Mono.defer(() -> Mono.fromCallable(
                () -> template.queryForObject(SOME_SQL_QUERY, String.class)))
           .subscribeOn(Schedulers.boundedElastic())
           .publishOn(Schedulers.parallel());
    }

对于小型应用程序来说,使用默认调度程序是没问题的,但对于涉及更多的项目,最好有一个专用的 DB 有界弹性和参数化输出调度程序。

最后一个问题很难解决,那就是您的 JDBC 线程池的大小应该与用于数据库调用的boundedElastic 的大小相同。否则,有些人会在压力很大的情况下“失踪”。

我希望这有帮助!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spring 网关:使用 Mono.fromCallable() 包装阻塞调用 的相关文章

随机推荐