使用 Java DSL 在远程 S(ftp) 中移动已处理的文件

2023-11-30

一旦批处理使用 Spring 集成和 Java DSL 成功处理文件,我就会尝试在远程 SFTP 上移动文件。

实现这一目标的最佳方法是什么?

  1. 添加批量移动远程文件的步骤?
  2. 或者使用FTP出站网关并提供MV命令?

我倾向于选择第二种解决方案,让批处理仅关注逻辑,但我很难尝试使用 java dsl 来实现它。

我读了http://docs.spring.io/spring-integration/reference/html/ftp.html#ftp-outbound-gateway并尝试像这样实现:

@Bean
public MessageHandler ftpOutboundGateway() {
    return Sftp.outboundGateway(SftpSessionFactory(), 
            AbstractRemoteFileOutboundGateway.Command.MV, "payload")
            .localDirectory(new File("/home/blabla/"))
            .get();

}

@Bean
public IntegrationFlow ftpInboundFlow() {
    return IntegrationFlows
            .from(
                Sftp.inboundAdapter(SftpSessionFactory())
                .regexFilter(".*\\.xml.mini$")
                ...             
               , 
                e -> e.id("sftpInboundAdapter")
                .poller(
                        Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES)
                        .maxMessagesPerPoll(-1)
                        .advice(retryAdvice())
                        )
            )
            .enrichHeaders( h -> h
                    .header(FileHeaders.REMOTE_DIRECTORY,"/home/filedrop/")
                    .header(FileHeaders.REMOTE_FILE, "/home/filedrop/OFFERS.xml.mini")
                    .header(FileHeaders.RENAME_TO, "/home/filedrop/done/OFFERS.xml.mini")
            )
            .transform(fileToJobLaunchRequestTransformer())         
            .handle(jobLaunchingGw()))
            .transform(jobExecutionToFileStringTransformer())
            .handle(ftpOutboundGateway())
            .handle(logger())
            .get();
}

我知道我的标题应该是动态的,但我不知道该怎么做,所以现在我使用现有文件的名称。我收到此错误消息(他正在尝试删除目标目录中的文件!):

Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.core.NestedIOException: Failed to delete file /home/filedrop/done/OFFERS.xml.mini; nested exception is org.springframework.core.NestedIOException: Failed to remove file: 2: No such file     at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:343)
    at org.springframework.integration.file.remote.RemoteFileTemplate.rename(RemoteFileTemplate.java:290)
    at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.doMv(AbstractRemoteFileOutboundGateway.java:482)
    at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.handleRequestMessage(AbstractRemoteFileOutboundGateway.java:400)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    ... 94 more
 Caused by: org.springframework.core.NestedIOException: Failed to delete file /home/filedrop/done/OFFERS.xml.mini; nested exception is org.springframework.core.NestedIOException: Failed to remove file: 2: No such file
    at org.springframework.integration.sftp.session.SftpSession.rename(SftpSession.java:211)
    at org.springframework.integration.file.remote.RemoteFileTemplate$3.doInSessionWithoutResult(RemoteFileTemplate.java:300)
    at org.springframework.integration.file.remote.SessionCallbackWithoutResult.doInSession(SessionCallbackWithoutResult.java:34)
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:334)
    ... 100 more
 Caused by: org.springframework.core.NestedIOException: Failed to remove file: 2: No such file
    at org.springframework.integration.sftp.session.SftpSession.remove(SftpSession.java:83)
    at org.springframework.integration.sftp.session.SftpSession.rename(SftpSession.java:205)
    ... 103 more

谢谢你的帮助!

EDIT工作流程,然后我将其简化了很多,但这里是我之前问题的解决方案:

@Bean
public IntegrationFlow ftpInboundFlow() {
    return IntegrationFlows
            .from(
                Sftp.inboundAdapter(SftpSessionFactory())
                .regexFilter(".*\\.xml$")
                ...
                , 
                e -> e.id("sftpInboundAdapter")
                .poller(Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES)
                        .maxMessagesPerPoll(-1)
                        )
            )
            .enrichHeaders( h -> h
                    // headers necessary for moving remote files (ftpOutboundGateway)
                    .headerExpression(FileHeaders.RENAME_TO, "'/home/blabla/done/' + payload.getName()")
                    .headerExpression(FileHeaders.REMOTE_FILE, "payload.getName()")
                    .header(FileHeaders.REMOTE_DIRECTORY,"/home/blabla/")
                    // headers necessary for moving local files (fileOutboundGateway_MoveToProcessedDirectory)
                    .headerExpression(FileHeaders.ORIGINAL_FILE,  "payload.getAbsolutePath()" )
                    .headerExpression(FileHeaders.FILENAME,  "payload.getName()")
            )
            .transform(fileToJobLaunchRequestTransformer())         
            .handle(jobLaunchingGw(), e-> e.advice(retryAdvice()))

            .<JobExecution, Boolean>route(p -> BatchStatus.COMPLETED.equals(p.getStatus()),
                                            mapping -> mapping
                                            .subFlowMapping("true", sf -> sf


                                                .handle(org.springframework.batch.core.JobExecution.class,
                                                         (p, h) -> myServiceActivator.jobExecutionToString(p, 
                                                                 (String) h.get(FileHeaders.REMOTE_DIRECTORY),
                                                                 (String) h.get(FileHeaders.REMOTE_FILE)))
                                                .handle(ftpOutboundGateway())
                                                .handle(Boolean.class,
                                                         (p, h) -> myServiceActivator.BooleanToString(p, 
                                                                 (String) h.get(FileHeaders.FILENAME)))
                                                .handle(fileOutboundGateway_MoveToProcessedDirectory())

                                                                                    )


                                        .subFlowMapping("false", sf -> sf
                                            .channel("nullChannel")     

                                            )
            )

            .handle(logger())
            .get();
}

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
    return Pollers.fixedRate(500).get();
}


@Bean
public MessageHandler ftpOutboundGateway() {
    return Sftp
            .outboundGateway(SftpSessionFactory(),
                    AbstractRemoteFileOutboundGateway.Command.MV,
                    "payload")
            .renameExpression("headers['file_renameTo']").get();
}

也许您没有执行重命名的权限,或者由于其他原因重命名失败;此异常是尝试删除“to”文件名,因为初始重命名失败。打开 DEBUG 日志记录,您应该看到此日志...

if (logger.isDebugEnabled()){
    logger.debug("Initial File rename failed, possibly because file already exists. Will attempt to delete file: "
            + pathTo + " and execute rename again.");
}
try {
    this.remove(pathTo);

既然失败就在于此remove()操作,您的失败表明重命名由于某些其他原因而失败(因为显然“to”文件不存在)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Java DSL 在远程 S(ftp) 中移动已处理的文件 的相关文章

随机推荐