考虑到每次轮询最大消息数和 Cron,轮询如何用于 FTP 入站通道适配器

2024-04-19

我有 UC,我需要从 ftp 位置选取文件并将其放入服务器位置 我正在使用 ftp-inbound-channel-adapter (Spring 集成 - 2.0.4)来实现它。 下面是我的xml中的配置

 <bean id="ftpAASessionFactory" class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
          <property name="host" value="${ftp.session.host}" />
          <property name="port" value="${ftp.session.port}" />
          <property name="username" value="${ftp.session.username}" />
          <property name="password" value="${ftp.session.password}" />
          <property name="clientMode" value="0" />
          <property name="fileType" value="0" />
   </bean>


   <ftp:inbound-channel-adapter id="ftpAAInbound"
          channel="ftpChannel" session-factory="ftpAASessionFactory" charset="UTF-8"
          auto-create-local-directory="false" delete-remote-files="true"
          remote-directory="${ftp.source.location}" local-directory="file://${ftp.target.location}"
          >
          <int:poller max-messages-per-poll="5" cron="0 */2 * ? * *">
          </int:poller>

   </ftp:inbound-channel-adapter>

   <int:channel id="ftpChannel">
          <int:queue />
          <int:interceptors>
                 <int:wire-tap channel="debugLogger" />
          </int:interceptors>
   </int:channel>

   <int:logging-channel-adapter id="debugLogger"
          level="DEBUG" log-full-message="true" />

   <int:logging-channel-adapter id="errorLogger"
          level="ERROR" log-full-message="true" />

我已将 max-messages-per-poll 配置为 5,并且每隔偶数分钟进行一次轮询(使用 cron 表达式)。

我的问题是,如果我们在 ftp 位置有 6 个文件,则所有 6 个文件都会在第一次轮询时传输到服务器位置(根据每次轮询的最大消息数= 5,它应该只从 Ftp 位置选择 5 个文件)并且有效负载仅由 5 个文件组成。

我希望在第一次轮询时只将 5 个文件传输到我的服务器,在第二次轮询时它应该选择最后一个

请提出解决方案 TIA

当 ftp 位置有 6 个文件时 PFB 日志

    [CRA] [01/03/2017 12:38:00] DEBUG [task-scheduler-8] DefaultFtpSessionFactory.createClient(158) | Connected to server [prgrear01.group.root.ad:21]
***[CRA] [01/03/2017 12:38:00] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_808_2015_02_01_07_50_01_102_20.txt***
[CRA] [01/03/2017 12:38:00] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_808_2015_02_01_07_50_01_102_20.txt
[CRA] [01/03/2017 12:38:00] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_808_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:00] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_808_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2015_02_01_07_50_01_102_01.txt
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2015_02_01_07_50_01_102_01.txt
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2015_02_01_07_50_01_102_21.txt
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2015_02_01_07_50_01_102_21.txt
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_810_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_810_2017_02_22_07_50_01_102_02.txt
***[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FileReadingMessageSource.scanInputDirectory(272) | Added to queue: [D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt, D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt, D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt, D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt, D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt, D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt]
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8]*** QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:40:00] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] DefaultFtpSessionFactory.createClient(158) | Connected to server [prgrear01.group.root.ad:21]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: null
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(101) | Received no Message during the poll, returning 'fal

.......


该日志仅显示 4 个文件;看起来它被配置为每次轮询 3 条消息,并且清楚地显示在 12:38 以及 1 和 12:40 发送了 3 个文件(没有找到第五个文件)。

Poll resulted in Message: null

EDIT

这是实现所需结果的另一种方法(使用出站网关)。该版本使用 Java DSL。

如果您更喜欢使用 XML 配置,则在中提供了类似的流程(以 XML 形式)ftp-样本 https://github.com/spring-projects/spring-integration-samples/tree/master/basic/ftp- 您必须在之间插入文件限制器ls网关和分路器。

@SpringBootApplication
public class So42528316Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So42528316Application.class, args);
        try {
            context.getBean(So42528316Application.class).runDemo();
        }
        finally {
            context.close();
        }
    }

    @Autowired
    private FileGateway fetchAndProcess;

    private void runDemo() throws Exception {
        Collection<Boolean> rmResults = this.fetchAndProcess.processFilesAt("so42528316");
        while (rmResults != null) {
            System.out.println("Processed " + rmResults.size() + " files");
            Thread.sleep(10_000);
            rmResults = this.fetchAndProcess.processFilesAt("so42528316");
        }
        System.out.println("No more files available");
    }

    @MessagingGateway(defaultRequestChannel = "flow.input", defaultReplyTimeout = "0")
    public interface FileGateway {

        Collection<Boolean> processFilesAt(String pattern);

    }

    @Bean
    public DefaultFtpSessionFactory sessionFactory() {
        DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
        factory.setHost("10.0.0.3");
        factory.setUsername("ftptest");
        factory.setPassword("ftptest");
        factory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
        return factory;
    }

    @Bean
    public IntegrationFlow flow() {
        return f -> f
                .handle(Ftp.outboundGateway(sessionFactory(), "ls", "payload"))
                .handle("so42528316Application", "limitFiles")
                .split()
                .handle(Ftp.outboundGateway(sessionFactory(), "get",
                                "payload.remoteDirectory + '/' + payload.filename")
                        .localDirectory(new File("/tmp", "so42528316")))
                .handle("so42528316Application", "process")
                .handle(Ftp.outboundGateway(sessionFactory(), "rm",
                        "headers['file_remoteDirectory'] + '/' + headers['file_remoteFile']"))
                .aggregate();
    }

    public List<FtpFileInfo> limitFiles(List<FtpFileInfo> files) {
        // Add any logic you want here, e.g. check if file already on local disk.
        if (files.size() == 0) {
            return null;
        }
        else if (files.size() > 2) {
            System.out.println("Reducing fetch list from " + files.size() + " to 2");
            return files.stream().limit(2).collect(Collectors.toList());
        }
        else {
            return files;
        }
    }

    public String process(File file) {
        System.out.println("Processing " + file);
        file.delete();
        return file.getName();
    }

}

Result:

Reducing fetch list from 3 to 2
Processing /tmp/so42528316/bar.txt
Processing /tmp/so42528316/baz.txt
Processed 2 files
Processing /tmp/so42528316/foo.txt
Processed 1 files
No more files available
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

考虑到每次轮询最大消息数和 Cron,轮询如何用于 FTP 入站通道适配器 的相关文章

  • headerenricher Spring 集成和 java dsl

    我使用 Spring Integration 和 java dsl 规范来实现我的 IntegrationFlow 我想使用自定义标头丰富器将一些文件名添加到标头 它将类似于 public class FileHeaderNamingEnr
  • 简单文件复制的正确 Java 配置是什么

    我对 Spring 非常陌生 对 Spring Integration 更陌生 所以如果这是一个非常基本的问题 我深表歉意 我想构建一个非常基本的日志文件处理器来学习诀窍 与此非常相似 example http forum spring i
  • ftp 在 java 中无法正确下载文件?

    当我使用以下代码下载文件时 它只是将文件写入本地目的地 但文件大小均为零 有人能说为什么会发生这种情况以及如何解决它吗 import org apache commons net ftp FTPClient import org apach
  • 将文件直接从 FTP 传输到 Azure 文件存储,无需将它们保存在本地内存或磁盘中

    我必须将文件从 FTP 传输到 Azure 文件存储 我的代码工作正常 但我正在内存中传输这些文件 这不是最佳实践 所以首先我将流读取到Byte内存中的数组 然后我将输出上传到 Azure 文件存储 现在我知道最好异步执行此操作 但我不知道
  • 选择 FTP 和 HTTP 传输的缓冲区大小

    在实现低级 HTTP 和 FTP 传输时 如何选择缓冲区的大小 从套接字读取或写入套接字的字节数 以获得最大吞吐量 我的应用程序应该在 130 Kbps 到 3 Mbps 的连接上使用 HTTP 或 FTP 传输数据 我事先知道预期的速度
  • 使用 PHP 测试 FTP 连接

    我正在使用下面的 PHP 脚本来测试 FTP 连接 目前 如果连接成功 它正在打印文件数组 如果能够连接 如何让它也显示消息 就像 连接成功 一样 con ftp connect server or die Couldn t connect
  • Spring 集成 - SFTP 复制后在远程服务器中重命名或移动文件

    我正在尝试移动或重命名远程文件 而不是下载后删除远程文件 我发现可以通过出站网关移动命令来完成此操作 但找不到正确的方法 下载后请帮忙重命名文件 Bean Order Ordered HIGHEST PRECEDENCE public Se
  • 带通配符的 FTP 目录部分列表

    首先我问 ftp 目录列表超时 大量子目录 https stackoverflow com questions 9230485 ftp directory listing timeout huge number of subdirs 我得到
  • 是否可以通过 FTP 代理使用 C# FtpWebRequest?

    据我了解 FtpWebRequest Proxy属性表示 HTTP 代理 我必须通过 FTP 代理向外部服务器发出 FTP 请求 到目前为止 我实现此功能的唯一方法是创建一个使用 Windows FTP 命令的脚本并以这种方式下载 是否可以
  • 使用.net 2.0 连接到 FTP 服务器 [关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我希望连接到现有的 FTP 服务器 上传文件 等待服务器生成报告 然后将该报告下载回 VB NET 2
  • Spring集成流程的优雅关闭

    我从 spring 集成监控示例中了解到https github com spring projects spring integration samples tree master intermediate monitoring http
  • Java:从 FTP 服务器访问文件

    我有一个 FTP 服务器 里面有一堆文件夹和文件 我的程序需要访问该服务器 读取所有文件并显示它们的数据 出于开发目的 我一直在使用硬盘驱动器上 src 文件夹中的文件 但现在服务器已启动并运行 我需要将软件连接到它 基本上我想要做的是获取
  • spring-integration并行分割路由聚合流由于单向MessageHandler而失败

    我想通过拆分项目 将每个项目路由到适当的网关并聚合结果来并行处理项目列表 但是 我的应用程序无法启动 出现以下错误 BeanCreationException The currentComponent is a one way Messag
  • 使用 .NET 进行选择性 FTP 下载

    我有一个 ftp 构建站点 新构建将在其中更新 它将在每个新构建的特定 ftp 位置创建一个名为 Build XXXXX 的新文件夹 我需要从 Build XXXXX 目录中的某个位置下载构建版本 例如 Builds Build XXXXX
  • 从 C# 中的服务器下载后,Zip 文件被损坏

    request MakeConnection uri WebRequestMethods Ftp DownloadFile username password response FtpWebResponse request GetRespo
  • 如何创建检测连接丢失和自动重新连接的 tcp-inbound-gateway?

    我正在尝试配置一组 spring 集成组件来使用来自 TCP 套接字的数据 基本协议是 打开连接后 系统会提示我输入用户名 然后输入密码 然后如果身份验证成功 数据就会在可用时流式传输给我 每 30 秒就会向我发送一条 ping 消息 以便
  • 持久订阅 ActiveMQ

    我正在尝试为我的消息设置持久订阅者 以便即使在服务器重新启动后它们也能保留在主题中 但在配置过程中我收到与 xml 相关的错误 这是我的配置 xml
  • 使用PHP通过FTP递归扫描目录和子目录

    我正在尝试创建目录中所有文件 及其大小 的列表 包括子目录中的所有内容 这些文件位于远程服务器上 所以我的脚本通过 FTP 连接 然后使用以下命令运行递归函数ftp chdir浏览每个目录 如果有其他方法可以做到这一点 我愿意接受建议 fl
  • 如何完善这个FTP(shell)功能?

    我有大量使用以下函数的脚本 Copies files over using FTP Configurations set at the beggining of the script param 1 FTP Host 2 FTP User
  • 我是否应该将 CachingConnectionFactory 与 hornetq 2.4.1 一起使用

    根据有关在 hornetq 中使用 JMSTemplate 的长期信息 我们在连接到服务器时一直使用 CachingConnectionFactory 这是一个示例配置 与我们正在使用的配置非常相似

随机推荐