Spring Integration 中的 REST 端点使消息通道成为多线程

2024-01-10

我有一个非常简单的 Spring Boot 应用程序,它提供了几个静态端点,这应该驱动将 sftp 文件上传到 sftp 服务器。我的要求是,如果有多个文件,则文件应该排队。我希望通过 sftp spring 集成工作流程的默认行为来实现这一点,因为我读到 DirectChannel 自动对文件进行排队。为了测试行为,我执行以下操作:

  1. 发送一个大文件,通过调用端点来阻塞通道一段时间。
  2. 通过调用端点发送较小的文件。

预期结果:较小的文件在通道上排队,并在较大的文件上传完成后进行处理。 实际结果:打开与 sftp 服务器的新连接,较小的文件不排队上传到那里,而较大的文件继续传输。

我的应用程序中有两个文件:

演示应用程序.java

@SpringBootApplication
@IntegrationComponentScan
@EnableAutoConfiguration(exclude={DataSourceAutoConfiguration.class})
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public SessionFactory<LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(22);
        factory.setUser("tester");
        factory.setPassword("password");
        factory.setAllowUnknownKeys(true);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "toSftpChannel")
    public MessageHandler handler() {
        SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
        handler.setRemoteDirectoryExpression(new LiteralExpression("/"));
        return handler;
    }

    @MessagingGateway
    public interface MyGateway {

         @Gateway(requestChannel = "toSftpChannel")
         void sendToSftp(File file);
    }
}

演示控制器.java

@RestController
public class DemoController {

    @Autowired
    MyGateway gateway;

    @RequestMapping("/sendFile")
    public void sendFile() {
        File file = new File("C:/smallFile.txt");
        gateway.sendToSftp(file);
    }

    @RequestMapping("/sendBigFile")
    public void sendBigFile() {
        File file = new File("D:/bigFile.zip");
        gateway.sendToSftp(file);
    }
}

我是 spring 的新手,我完全不确定我的 sftp 通道是否在这里正确创建,我的猜测是每次我执行 sendToSftp 调用时都会创建一个新通道。任何有关如何在这种情况下实现队列行为的帮助将不胜感激。


这里没有队列,因为每个 HTTP 请求都是在其自己的线程中执行的。是的,当 http 线程池耗尽时,您可能会在那里排队,但这在只有两个请求的简单用例中并不存在。

无论如何,您都可以在那里实现队列行为,但您应该声明您的toSftpChannel as a QueueChannel bean.

这样,下游进程将始终在同一线程上执行,并且下一条消息恰好在第一条消息之后从队列中提取。

See 参考手册 http://docs.spring.io/spring-integration/docs/4.3.11.RELEASE/reference/html/messaging-channels-section.html#polling-consumer了解更多信息。

UPDATE

自从你使用FtpMessageHandler这是单向组件,但是您仍然需要对 MVC 控制器的方法进行一些回复,唯一的方法就是拥有一个@Gateway方法与非void返回,当然我们需要以某种方式发送回复。

为此,我建议使用PublishSubscribeChannel:

@Bean
@BridgeTo
public MessageChannel toSftpChannel() {
    return new PublishSubscribeChannel();
}

@Bean
@ServiceActivator(inputChannel = "toSftpChannel")
@Order(0)
public MessageHandler handler() {
    SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
    handler.setRemoteDirectoryExpression(new LiteralExpression("/"));
    return handler;
}

这样我们就有了两个订阅者toSftpChannel。随着@Order(0)我们确保@ServiceActivator是第一个订阅者,因为我们需要先执行 SFTP 传输。随着@BridgeTo我们添加第二个BridgeHandler到相同的PublishSubscribeChannel。其目的只是为了获得一个replyChannelheader 并在那里发送请求消息。由于我们不使用任何线程BridgeHandler将在传输到 SFTP 完成后立即执行。

当然不是BridgeHandler你可以有任何其他@ServiceActivator or @Transfromer作为回复而不是请求返回File,但还有其他任何东西。例如:

@ServiceActivator(inputChannel = "toSftpChannel")
@Order(1)
public String transferComplete(File payload) {
    return "The SFTP transfer complete for file: " + payload;
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spring Integration 中的 REST 端点使消息通道成为多线程 的相关文章

随机推荐