如何构建一个异步休息端点,在工作线程中调用阻塞操作并立即回复(Quarkus)

2024-01-11

我检查了文档和 stackoverflow,但没有找到合适的方法。 例如。这篇文章看起来非常接近:使用 Quarkus/Mutiny 在 Reactive REST GET 端点中调度阻塞服务 https://stackoverflow.com/questions/66944535/dispatch-a-blocking-service-in-a-reactive-rest-get-endpoint-with-quarkus-mutiny但是,我不想在我的服务中出现这么多不必要的样板代码,最好的情况是根本不需要更改服务代码。

我通常只想调用一个使用实体管理器的服务方法,因此是一个阻塞操作,但是,想要立即向调用者返回一个字符串,例如“查询开始”或其他内容。我不需要回调对象,这只是一种即发即忘的方法。

我尝试过这样的事情

@NonBlocking
@POST
@Produces(MediaType.TEXT_PLAIN)
@Path("/query")
public Uni<String> triggerQuery() {
    return Uni.createFrom()
    .item("query started")
    .call(() -> service.startLongRunningQuery());
}

但它不起作用 - >错误消息返回给调用者:

You have attempted to perform a blocking operation on a IO thread. This is not allowed, as blocking the IO thread will cause major performance issues with your application. If you want to perform blocking EntityManager operations make sure you are doing it from a worker thread.",

我实际上希望 quarkus 会注意相应地分配任务,即对 io 线程进行剩余调用并阻止实体管理器操作到工作线程。 所以我一定是用错了。

UPDATE:

还尝试了我在中找到的建议解决方法https://github.com/quarkusio/quarkus/issues/11535 https://github.com/quarkusio/quarkus/issues/11535将方法体更改为

return Uni.createFrom()
        .item("query started")
        .emitOn(Infrastructure.getDefaultWorkerPool())
        .invoke(()-> service.startLongRunningQuery());

现在我没有收到错误,但没有调用 service.startLongRunningQuery() ,因此没有日志,也没有查询实际发送到数据库。

与(如何使用 Mutiny 反应式编程调用长时间运行的阻塞 void 返回方法? https://stackoverflow.com/questions/66626298/how-to-call-long-running-blocking-void-returning-method-with-mutiny-reactive-pro):

return Uni.createFrom()
            .item(() ->service.startLongRunningQuery()) 
            .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())

与(如何在另一个线程上运行阻塞代码并使http请求立即返回 https://stackoverflow.com/questions/69321664/how-to-run-blocking-codes-on-another-thread-and-make-http-request-return-immedia):

ExecutorService executor = Executors.newFixedThreadPool(10, r -> new Thread(r, "CUSTOM_THREAD"));

return Uni.createFrom()
                .item(() -> service.startLongRunningQuery())
                .runSubscriptionOn(executor);

知道为什么根本不调用 service.startLongRunningQuery() 以及如何实现即发即忘行为(假设其余调用通过 IO 线程处理并且服务调用由工作线程处理)?


这取决于您是否想立即返回(在您到达之前)startLongRunningQuery 操作已有效执行),或者如果您想等到操作完成。

如果是第一种情况,请使用以下内容:

@Inject EventBus bus;

@NonBlocking
@POST
@Produces(MediaType.TEXT_PLAIN)
@Path("/query")
public void triggerQuery() {
    bus.send("some-address", "my payload");
}

@Blocking // Will be called on a worker thread
@ConsumeEvent("some-address")
public void executeQuery(String payload) {
    service.startLongRunningQuery();
}

在第二种情况下,您需要在工作线程上执行查询。

@POST
@Produces(MediaType.TEXT_PLAIN)
@Path("/query")
public Uni<String> triggerQuery() {
   return Uni.createFrom(() -> service.startLongRunningQuery())
      .runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
}

请注意,您需要 RESTEasy Reactive 才能正常工作(而不是经典的 RESTEasy)。如果您使用经典的 RESTEasy,则需要 quarkus-resteasy-mutiny 扩展(但我​​建议使用 RESTEasy Reactive,它会更有效)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何构建一个异步休息端点,在工作线程中调用阻塞操作并立即回复(Quarkus) 的相关文章

随机推荐