从 Spring Batch 处理器调用异步 REST api

2023-11-24

我编写了一个处理列表列表的 Spring 批处理作业。

Reader 返回列表的列表。 处理器处理每个 ListItem 并返回处理后的 List。 Writer 将列表中的内容写入 DB 和 sftp。

我有一个用例,我从 Spring Batch 处理器调用 Async REST api。 在 ListenableFuture 响应中,我实现了 LitenableFutureCallback 来处理成功和失败,这按预期工作,但在异步调用返回某些内容之前,ItemProcessor 不会等待来自异步 api 的回调,而是将对象(列表)返回给编写器。

我不确定如何实现和处理来自 ItemProcessor 的异步调用。

我确实阅读了有关 AsyncItemProcessor 和 AsyncItemWriter 的内容,但我不确定这是否是我应该在这种情况下使用的东西。

我还考虑过在 AsyncRestTemplate 的 ListenableFuture 响应上调用 get() ,但根据文档,它将阻塞当前线程,直到收到响应。

我正在寻求一些关于我应该如何实现这一点的帮助。代码片段如下:

处理器:

public class MailDocumentProcessor implements ItemProcessor<List<MailingDocsEntity>, List<MailingDocsEntity>> {

... Initialization code

@Override
public List<MailingDocsEntity> process(List<MailingDocsEntity> documentsList) throws Exception {
    logger.info("Entering MailingDocsEntity processor");


    List<MailingDocsEntity> synchronizedList = Collections.synchronizedList(documentsList);


    for (MailingDocsEntity mailingDocsEntity : synchronizedList) {
        System.out.println("Reading Mailing id: " + mailingDocsEntity.getMailingId());

       ..code to get the file

         //If the file is not a pdf convert it
         String fileExtension = readFromSpResponse.getFileExtension();
         String fileName = readFromSpResponse.getFileName();
         byte[] fileBytes = readFromSpResponse.getByteArray();

         try {

             //Do checks to make sure PDF file is being sent
             if (!"pdf".equalsIgnoreCase(fileExtension)) {
                 //Only doc, docx and xlsx conversions are supported

                     ...Building REquest object
                     //make async call to pdf conversion service
            pdfService.convertDocxToPdf(request, mailingDocsEntity);

                 } else {
                     logger.error("The file cannot be converted to a pdf.\n"
                        );

                 }
             }


         } catch (Exception ex){
             logger.error("There has been an exception while processing data", ex);

         }

    }
    return synchronizedList;
}

}

异步 PdfConversion 服务类:

@Service
public class PdfService{


   @Autowired
   @Qualifier("MicroServiceAsyncRestTemplate")
   AsyncRestTemplate microServiceAsyncRestTemplate;

   public ConvertDocxToPdfResponse convertDocxToPdf(ConvertDocxToPdfRequest request, MailingDocsEntity mailingDocsEntity){

        ConvertDocxToPdfResponse pdfResponse = new ConvertDocxToPdfResponse();


            try {

                HttpHeaders headers = new HttpHeaders();
                headers.setContentType(MediaType.APPLICATION_JSON);

                HttpEntity<?> entity = new HttpEntity<>(request, headers);



                ListenableFuture<ResponseEntity<ConvertDocxToPdfResponse>> microServiceResponse = microServiceAsyncRestTemplate.postForEntity(batchMailProcessingConfiguration.getPdfUrl(), entity, ConvertDocxToPdfResponse.class);

                ConvertDocxToPdfResponse resultBody = microServiceResponse.get().getBody();
                microServiceResponse.addCallback(new ListenableFutureCallback<ResponseEntity<ConvertDocxToPdfResponse>>()  {

                    @Override
                    public void onSuccess(ResponseEntity<ConvertDocxToPdfResponse> result) {
                        ...code to do stuff on success


                    }

                    @Override
                    public void onFailure(Throwable ex) {
                        pdfResponse.setMessage("Exception while retrieving response");

                    }
                });

            } catch (Exception e) {
                String message = "There has been an error while issuing a pdf generate request to the pdf micro service";
                pdfResponse.setMessage(message);
                logger.error(message, e);
            }


        return pdfResponse;
    }

}

我原来的批处理作业是同步的,我正在转换为异步以加快处理速度。 我确实尝试寻找类似的问题,但找不到足够的信息。 非常感谢任何指示或帮助。

谢谢你!!


我确实阅读了有关 AsyncItemProcessor 和 AsyncItemWriter 的内容,但我不确定这是否是我应该在这种情况下使用的东西。

Yes, AsyncItemProcessor and AsyncItemWriter适合您的用例。这AsyncItemProcessor将执行委托的逻辑(您的其余调用)ItemProcessor对于新线程上的项目。项目完成后,Future结果被传递到AsynchItemWriter待写。这AsynchItemWriter然后将打开Future并写下该项目。这些组件的优点是您不必处理Future自己包装、拆开等等。

你可以找到:

  • 更多详细信息请参见此处:https://docs.spring.io/spring-batch/4.0.x/reference/html/spring-batch-integration.html#asynchronous-processors
  • 这里有一个例子:https://github.com/mminella/scaling-demos/blob/master/single-jvm-demos/src/main/java/io/spring/batch/scalingdemos/asyncprocessor/AsyncProcessorJobApplication.java

希望这可以帮助。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从 Spring Batch 处理器调用异步 REST api 的相关文章

随机推荐