Spring集成:如何顺序处理文件

2024-03-07

我使用“int-file: inbound-channel-adapter”来加载目录中存在的文件。我喜欢按顺序处理文件:这意味着当第一个文件的处理完成时,我加载第二个文件......等等。

I see a sample https://github.com/spring-projects/spring-integration-samples/blob/master/intermediate/file-processing/src/main/java/org/springframework/integration/samples/fileprocessing/FileProcessor.java但我无法预测处理一个文件所需的时间,这取决于文件的大小。

我的源代码:

    <int-file:inbound-channel-adapter
    directory="${directory.files.local}" id="filesIn" channel="channel.filesIn">
    <int:poller fixed-delay="1000" max-messages-per-poll="1" />

</int-file:inbound-channel-adapter>

一个文件的处理过程是 file:inbound-channel-adapter--->transformer--->splitter---->http:outbound-gateway--->outbound-mail-adapter---->的处理一个文件处理完毕,那么此时,我要处理下一个文件。

我的项目配置太复杂了。 下面,我向您展示更多配置: 配置的第一部分是:

<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
    auto-startup="true" channel="receiveChannel" session-factory="sftpSessionFactory"
    local-directory="file:${directory.files.local}" remote-directory="${directory.files.remote}"
    auto-create-local-directory="true" delete-remote-files="true"
    filename-regex=".*\.txt$">
    <int:poller fixed-delay="${sftp.interval.request}"
        max-messages-per-poll="-1" />
</int-sftp:inbound-channel-adapter>
<!-- <int:poller cron="0 * 17 * * ?"></int:poller> -->

<int-file:inbound-channel-adapter
    filter="compositeFileFilter" directory="${directory.files.local}" id="filesIn"
    channel="channel.filesIn" prevent-duplicates="true">
    <int:poller fixed-delay="1000" max-messages-per-poll="1" />
</int-file:inbound-channel-adapter>

<int:transformer input-channel="channel.filesIn"
    output-channel="channel.file.router" ref="fileTransformer" method="transform" />

<int:recipient-list-router id="fileRouter"
    input-channel="channel.file.router">

    <int:recipient channel="channel.empty.files"
        selector-expression="payload.length()==0" />
    <int:recipient channel="channel.filesRejected"
        selector-expression="payload.toString().contains('rejected')" />
    <int:recipient channel="toSplitter"
        selector-expression="(payload.length()>0) and(!payload.toString().contains('rejected'))" />

</int:recipient-list-router>

然后从通道 tosplitter 中,我的程序逐行读取文件:

    <int-file:splitter input-channel="toSplitter"
    output-channel="router" requires-reply="false" />

<int:recipient-list-router id="recipentRouter"
    input-channel="router">

    <int:recipient channel="channelA"
        selector-expression="headers['file_name'].startsWith('${filenameA.prefix}')" />

    <int:recipient channel="channelB"
        selector-expression="headers['file_name'].startsWith('${filenameB.prefix}')" />

</int:recipient-list-router>

每个通道 A 和 B 应该为每条线路调用两个不同的 WAYS。 每个文件都使用异步调用 ws 代码,文件 A 如下:

<int:header-enricher input-channel="channelA"
    output-channel="channelA.withHeader">
    <int:header name="content-type" value="application/json" />
    <int:header name="key1" expression="payload.split('${line.column.separator}')[0]" />
    <int:header name="key2" expression="payload"></int:header>
</int:header-enricher>

<int:transformer input-channel="channelA.withHeader"
    output-channel="channelA.request" ref="imsiMsgTransformer"
    method="transform">
</int:transformer>


<int:channel id="channelA.request">
    <int:queue capacity="10" />
<int-http:outbound-gateway id="maspUpdatorSimChangedGateway"
    request-channel="channelA.request" 
    url="${url}"
    http-method="PUT" expected-response-type="java.lang.String"       charset="UTF-8"
    reply-timeout="${ws.reply.timeout}" reply-channel="channelA.reply">
    <int-http:uri-variable name="foo" expression="headers['key1']" />
    <int:poller fixed-delay="1000" error-channel="channelA.error"
        task-executor="executorA" />

        <int-http:request-handler-advice-chain>
        <int:retry-advice max-attempts="${ws.max.attempts}"
            recovery-channel="recovery.channelA">
            <int:fixed-back-off interval="${ws.interval.attempts}" />
        </int:retry-advice>
    </int-http:request-handler-advice-chain>

</int-http:outbound-gateway>

<int:service-activator input-channel="recovery.channelA"
    ref="updateImsiHttpResponseErrorHandler" method="handleMessage" output-channel="updateImsi.channel.error.toenricher">
</int:service-activator>

<int:service-activator input-channel="channelA.reply"
    ref="updateImsiHttpResponseMessageHandler" method="handleMessage">
    <int:poller fixed-delay="1000"></int:poller>
</int:service-activator>

在(回复通道和恢复通道)的每个激活器中,我计算文件的进度,直到文件完成,此时我应该加载第二个文件 A2 或文件 B ...等


这是默认行为,只要

  1. 轮询器没有task-executor(你的没有)。
  2. Only DirectChannel是(默认)用于适配器的下游 - 这意味着不QueueChannels or ExecutorChannel(即没有task-executor or <queue/>在频道上)。

在这种情况下,在当前轮询完成之前,甚至不会考虑下一次轮询 - 流程在轮询器线程上运行,并且一次只能处理一个轮询。

The fixed-delay直到当前文件完全处理完毕后才开始。

EDIT

如果需要对流程使用异步处理,则需要使用条件轮询器 http://docs.spring.io/spring-integration/reference/html/messaging-channels-section.html#conditional-pollers或者一个简单的投票跳过建议 http://docs.spring.io/spring-integration/reference/html/messaging-channels-section.html#_pollable_message_source.

你会提供一个PollSkipStrategy在文件完成之前将返回 false 的实现。

这样,后续的民意调查将被跳过,直到您做出决定。

EDIT2

像这样的东西...

/*
 * Copyright 2015 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.springframework.integration.scheduling;

/**
 * @author Gary Russell
 * @since 4.3
 *
 */
public class SimplePollSkipStrategy implements PollSkipStrategy {

    private volatile boolean skip;

    @Override
    public boolean skipPoll() {
        return this.skip;
    }

    public void skipPolls() {
        this.skip = true;
    }

    public void reset() {
        this.skip = false;
    }
}
  • 将其添加为<bean/>根据您的情况。
  • 将其添加到轮询器的建议链中PollSkipAdvice
  • 当您想跳过投票时,请致电skipPolls().
  • 当您完成文件处理后,请致电reset().
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spring集成:如何顺序处理文件 的相关文章

随机推荐

  • 在 android 5 上膨胀类 android.webkit.WebView 时出错

    我在 Android 5 0 API 21 上进行测试时出现以下错误 在其他操作系统版本上测试效果良好 java lang RuntimeException 无法启动活动 ComponentInfo ui activities naviga
  • 如何在 VSCode 中复制代码行的 URL?

    当我使用 VSCode 工作并且需要将代码行 URL 从存储库发送给同事时 这种情况经常发生 VSCode 对此没有本机支持 如果您使用 Bitbucket 则 Atlassian 插件会启用一项功能 然而 我一直在寻找更通用的选择 我使用
  • 菜单被裁剪

    我有一个MFC项目 它支持 40 多种语言 我的电脑上有两个显示器 它们都是不同尺寸的显示器和不同的分辨率 如果我将应用程序移至较小的显示器上 则不会显示完整的语言菜单 我知道它会显示滚动条 为什么不是呢 我的菜单是标准菜单 没有什么花哨
  • “java.lang.ClassFormatError: Invalid pc in LineNumberTable”的可能原因

    今天我开始编写一个使用 sqlite 的项目 当我尝试测试它时 我收到了java lang ClassFormatError LineNumberTable 中的 pc 无效 希望你能帮助我 因为我迷路了 我搜索了这个错误 并发现了一些针对
  • GWT 列表框多选

    我需要添加一个列表框 组合框 允许用户选择多个值 我知道 GWT API 中已经有一个可用的ListBox http google web toolkit googlecode com svn javadoc 1 5 com google
  • 使用 Doctrine 和 Symfony2 查询多对多关系

    我试图理解 Doctrine 和 Symfony2 中的多对多关系是如何运作的 我重新创建了官方文档 goo gl GYcVE0 中显示的示例 并且我有两个实体类 User and Group正如你在下面看到的
  • 错误 0x80005000 和 DirectoryServices

    我正在尝试使用 Net 中的目录服务运行简单的 LDAP 查询 DirectoryEntry directoryEntry new DirectoryEntry LDAP someserver contoso com DC contoso
  • 为什么 Xcode 7 显示 *.tbd 而不是 *.dylib?

    Xcode 7 在目标 gt 构建阶段 gt 链接二进制文件与库 gt 点击 按钮 选择要添加的框架时 您找不到 dylib 而是看到 tbd 这是什么原因呢 需要dylib的人 从这里开始post https stackoverflow
  • Python 将 3d 数组重塑为 2d

    我想将 numpy 数组重塑为所描绘的形状 从 3D 变为 2D 不幸的是 顺序不正确 假设有一个 numpy 数组 1024 64 100 并希望将其转换为 1024 100 64 有人知道如何维持秩序吗 我有一个样本数据 data 0
  • Maven - 对等点未经过身份验证

    几天前 我的专家停止工作了 更具体地说 它会停止下载依赖项 下面我记录了我为找到解决方案而执行的一系列信息和步骤 我仔细检查了 settings xml 这个文件也被我的同事使用 他们没有任何问题 我安装了 Maven 3 0 4 3 0
  • 存储批处理作业的密码

    我有一个小的java prog 它使用需要授权的网络服务 因此java prog 将使用Windows任务调度程序运行 需要有一个用户 密码参数 如何将它们存储在某个地方而不将它们作为纯文本放在文件中 到目前为止 我已经尝试使用runtim
  • 无法在将新附加的字符串列保存到 numpy 数组时执行 np.savetxt

    我有 numpy 数组mfcc具有 mfcc 值 且形状为 5911 20 我有一份清单a 其中有 5911 个标签 例如apple cow dog 我想将这些标签附加到mfccnumpy 数组 STEP1我将带有标签的列表转换为数组 at
  • 自定义 Django 字段来存储电子邮件地址列表

    我正在尝试向 Django 模型添加一个字段来表示电子邮件地址列表 我希望用户在管理中的表单中输入逗号分隔的地址列表 然后我的应用程序将解析该列表以发送一系列电子邮件 我当前的实现涵盖了基本思想 但有很大的局限性 在管理中 如果我输入类似的
  • 增加 JMeter 执行期间的线程数

    我有一个性能测试JMeter并想用它来测试最大系统性能 吞吐量 因此 当错误率低于 2 时 应增加活动线程数 我发现Constant Throughput Timer 把它放入Thread Group但它只会暂停或减慢线程 我尝试将其定义如
  • 从 Postman 向 Microsoft Bot 发送消息

    我正在尝试向我创建并发布到 azure 服务的机器人发送消息 以便该机器人可以开始向其某些用户发送消息 我尝试首先在 Postman 上发出请求 然后我可以为该交互构建一个控制器 我正在执行以下请求 POST https login mic
  • 单击手风琴中元素 id 的链接时打开 JQuery 手风琴

    这是我的问题http jsfiddle net uJ3W5 12 http jsfiddle net uJ3W5 12 正如您所看到的 顶部的 4 个按钮链接到手风琴第 1 部分中的元素 然而 当手风琴关闭时 这些链接不起作用 我需要它 这
  • 如何比较yaml文件中的键?

    有两个 Ruby on Rails 国际化 yaml 文件 一份文件完整 另一份文件缺少密钥 如何比较两个 yaml 文件并查看第二个文件中缺少的键 有没有工具可以做到这一点 假设file1是正确的版本并且file2是缺少密钥的版本 def
  • 将 VS2019 与 TFS2018 vnext 构建系统结合使用,无需服务器端解决方法

    我最近在尝试将 VS2019 与 TFS2018 vnext 构建系统一起使用时遇到了问题 在 Visual Studio Build 步骤中不能选择VS2019 选择 Latest 则不会使用Visual Studio 2019 在测试服
  • 是否可以对 feed 进行过滤

    是否可以对流中的提要实施过滤 我希望允许用户关注其他用户帖子 但将这些帖子标记为各种类别 我们的愿望是选择一个 足球 类别 并仅查看我关注的人发布的具有该标签的帖子 或者如果该类别是 所有体育 则可能是一个标签集合 查看 api 和文档 提
  • Spring集成:如何顺序处理文件

    我使用 int file inbound channel adapter 来加载目录中存在的文件 我喜欢按顺序处理文件 这意味着当第一个文件的处理完成时 我加载第二个文件 等等 I see a sample https github com