如何增加消息头

2023-12-26

Spring Integration Java DSL 有没有办法修改现有的消息头?

我正在使用 SI Java DSL 重新实现下载重试机制,并且希望在发生失败时增加保存下载尝试的消息标头,然后根据与限制相比的尝试次数路由消息。

我的路由基于 SI 中包含的 RouterTests 运行良好。使用 HeaderEnrichers 我可以轻松添加标头,但我看不到修改现有标头的方法。

Thanks

/**
 * Unit test of {@link RetryRouter}.
 * 
 * Based on {@link RouterTests#testMethodInvokingRouter2()}.
 */
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@DirtiesContext
public class RetryRouterTests {

    /** Failed download attempts are sent to this channel to be routed by {@link ContextConfiguration#failedDownloadRouting( ) } */
    @Autowired
    @Qualifier("failed")
    private MessageChannel failed;

    /** Retry attempts for failed downloads are sent to this channel by {@link ContextConfiguration#failedDownloadRouting( ) }*/
    @Autowired
    @Qualifier("retry-channel")
    private PollableChannel retryChannel;

    /** Failed download attempts which will not be retried, are sent to this channel by {@link ContextConfiguration#failedDownloadRouting( ) }*/
    @Autowired
    @Qualifier("exhausted-channel")
    private PollableChannel exhaustedChannel;

    /**
     * Unit test of {@link ContextConfiguration#failedDownloadRouting( ) } and {@link RetryRouter}.
     */
    @Test
    public void retryRouting() {

        final int limit = 2;

        for ( int attempt = 0 ; attempt <= limit + 1 ; attempt++ ){

            this.failed.send( failed( attempt, limit) );

            if ( attempt < limit){

                assertEquals( payload( attempt ) , this.retryChannel.receive( ).getPayload( ) );
                assertNull(this.exhaustedChannel.receive( 0 ) );

            }else{

                assertEquals( payload( attempt ) , this.exhaustedChannel.receive( ).getPayload( ) );
                assertNotNull( this.exhaustedChannel.receive( ).getPayload( ) );
            }
        }

    }

    private Message<String> failed( int retry , int limit ) {

        return MessageBuilder
            .withPayload(  payload( retry ) )
            .setHeader("retries", new AtomicInteger( retry ) )
            .setHeader("limit", limit)
            .build();
    }

    private String payload (int retry){
        return "retry attempt "+retry;
    }


    @Configuration
    @EnableIntegration
    public static class ContextConfiguration {

        @Bean
        public MessageChannel loggerChannel() {
            return MessageChannels.direct().get();
        }

        @Bean(name = "retry-channel")
        public MessageChannel retryChannel() {
            return new QueueChannel();
        }

        @Bean(name = "exhausted-channel")
        public MessageChannel exhaustedChannel() {
            return new QueueChannel();
        }


        /**
         * Decides if a failed download attempt can be retried or not, based upon the number of attempts already made 
         * and the limit to the number of attempts that may be made. Logic is in {@link RetryRouter}.
         * <p>
         * The number of download attempts already made is provided as a header {@link #attempts} from the connector doing the download, 
         * and the limit to the number of attempts is another header {@link #retryLimit} which is originally setup as
         * a header by {@link DownloadDispatcher} from retry configuration.
         * <p>
         * Messages for failed download attempts are listened to on channel {@link #failed}. 
         * Retry attempts are routed to {@link #retryChannel()}
         *  
         * @return
         */
        @Bean
        public IntegrationFlow failedDownloadRouting() {

            return IntegrationFlows.from( "failed" )

                .handle( "headers.retries.getAndIncrement()" )
                .handle( logMessage ( "failed" ) )
                .route(new RetryRouter())
                .get();
        }

        /**
         * Decides if a failed download attempt can be retried or not, based upon the number of attempts already made 
         * and the limit to the number of attempts that may be made. 
         * <p>
         */
        private static class RetryRouter {

            @Router
            public String routeByHeader(@Header("retries") AtomicInteger attempts , @Header("limit") Integer limit) {

                if (attempts.intValue() < limit.intValue()){
                    return "retry-channel";
                }
                return "exhausted-channel";
            }

            /** This method is not used but is required by Spring Integration otherwise application context doesn't load because of
             * {@code Caused by: java.lang.IllegalArgumentException: Target object of type 
             * [class org.springframework.integration.dsl.test.routers.RetryRouterTests$RetryRouter] has no eligible methods for handling Messages.}
             * 
             * @throws UnsupportedOperationException if called
             */
            @SuppressWarnings("unused")
            public String routeMessage(Message<?> message) {

                throw new UnsupportedOperationException( "should not be used." );
            }
        }
    }

有一种方法可以在不修改标题的情况下完成您需要的操作:

.enrichHeaders(h -> h.header("downloadRetries", new AtomicInteger()))

然后,当您需要增加它时,您应该这样做:

.handle(m -> m.getHeaders().get("downloadRetries", AtomicInteger.class).getAndIncrement())

并将此句柄作为重试服务的发布订阅者通道上的第一个单向第一个订阅者。

UPDATE

是一种单向“MessageHandler”,不适合配置“outputChannel”。这是集成流程的结束。

感谢您分享有关此事的配置:现在我遇到了一个问题,而您误解了。解决方案必须是这样的:

        return IntegrationFlows.from( "failed" )

            .publishSubscribeChannel(s -> s
                .subscribe(f -> f
                        .handle(m -> m.getHeaders().get("downloadRetries",
                                  AtomicInteger.class).getAndIncrement()))
            .handle( logMessage ( "failed" ) )
            .route(new RetryRouter())
            .get();
    }

我们在哪里有一个PublishSubscribeChannel, the .subscribe()子流中是第一个订阅者,并且.handle( logMessage ( "failed" ) )主流程中是第二个订阅者。在第一个订阅者的工作完成之前,最后一个订阅者不会被调用。

查看 Spring 集成参考手册 http://docs.spring.io/spring-integration/docs/latest-ga/reference/html/messaging-channels-section.html#channel-implementations-publishsubscribechannel and Java DSL 手册 https://github.com/spring-projects/spring-integration-java-dsl/wiki/Spring-Integration-Java-DSL-Reference#subflows了解更多信息。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何增加消息头 的相关文章

随机推荐

  • 在 Java 中,可以在迭代列表时修改列表吗?

    据我所知 在Java中Collection
  • 使用已知物体尺寸从图片中测量物体

    所以我需要做的是从普通用户拍摄的图像中测量脚长 该图像将包含一只穿着黑色袜子的脚 一枚硬币 或其他已知尺寸的物体 和一张白纸 例如 A4 其他两个物体将位于其上 我已经拥有什么了 我已经使用过 opencv 但只是简单的项目 我已经开始阅读
  • JqueryUI 自动完成:每个列表项仅显示一个字符

    我在用着jquery 1 4 2 min and jquery ui 1 8 6 custom要在 jsp 页面上获取自动完成数据 代码片段如下 document ready function input airportfrom autoc
  • rMarkdown 中是否可以有可排序(交互式)表格?

    我在用kable from knitr包来显示 html 文档上的表格 是否可以使其可排序 一些示例代码 title Test output html document r echo FALSE comment kable data fra
  • 如何在 mysql 存储过程中使用 case-when 语句?

    我想使用 request time 参数自动设置 session id 因此我选择了包含 case 语句的 mysql 存储过程 就这样 create procedure upd userinput in request time time
  • Plotly - 我想根据条件用不同的颜色为每个 X 轴值着色

    语言 JavaScript框架 情节 我有一个要求 我希望 x 轴值单独着色 而不是对 x 轴中的所有值着色 我尝试了下面的代码 但它对轴上的所有值执行相同的操作 此处 颜色 红色 应用于 x 轴上的所有值 我需要根据下面代码中数组 col
  • 如何在本地主机上测试打开的图

    我做了很多研究 但还没有找到明确的答案 有没有办法在本地主机上测试开放图 我在 locahost 上使用图形 api 没有任何问题 我已经在应用程序设置中更改了我的网站网址 甚至尝试在主机文件中设置一个域 但打开图的调试器 linter 尝
  • Stimulsoft - 如何在 asp.net core 中渲染报告并以角度显示

    刺激软件报告 https www stimulsoft com en documentation online programming manual index html 如何在 asp net core 中渲染报告及其变量和参数并以角度显
  • 如何为 IE 提供特殊的 CSS?

    我想为 ie8 使用一些不同的 CSS 但只保留一个 CSS 文件 谁能告诉我最好的 黑客 是什么 是的 我知道 hack 不好 但我想至少暂时保留一个 CSS 文件 例如 在非 IE8 浏览器中我希望浏览器看到以下内容 div conte
  • R:具有重复时间索引条目的时间序列

    我是 R 的 n00b 和堆栈溢出的 n00b 刚刚加入 所以如果我未能使用标记 我不知道 或错过了自述文件中的某些内容 请原谅我 如果您不介意 我将在这里解决我的完整问题 因为也许您可能会好心地告诉我应该如何最好地解决这个问题 Stage
  • Symfony:是否可以为组件设置模板?

    组件没有 setTemplate 我知道 但也许还有另一种方法可以做到这一点 问题似乎是关于 php 框架 http www symfony project org http www symfony project org sfCompon
  • 使用 Jumbotron 容器的输入组大于 Bootstrap 3 中的输入

    我正在使用 Bootstrap 3 输入组尝试一种奇怪的行为 当我将输入组插件 文本或图标 添加到大屏幕内的表单时 输入组高度大于其输入高度 在这里你可以找到一个 JsFiddle 和有问题的屏幕截图 div class jumbotron
  • 检查某项是否存在,如果不存在则报错

    我想使用 PowerShell 检查 IIS Web 应用程序是否存在 或可能存在其他类型的项目 我可以这样做Get Item 但是如果该项目不存在 则会报告错误 这会误导运行脚本的用户 看起来好像出了问题 而实际上一切都很好 我该如何做到
  • 如何终止 MySQL 连接

    我正在使用 MySQL 构建一个网站 我正在使用 TOAD for MySQL 突然无法连接到数据库 因为出现错误 太多联系 Toad for MySQL 是否有任何方法可以查看现有连接以便能够终止它们或简单地关闭所有连接 不 有没有内置
  • 休眠:内存问题?缺点?

    我正在使用 Hibernate 3 6 直到今天我还没有发现使用它的任何缺点 但今天有人告诉我 当项目变大时 使用 Hibernate 的应用程序会出现内存问题 发生这种情况是因为与不使用 hibernate 的应用程序相比 需要创建和存储
  • Flutter 在 facebook 应用 android 和 ios 中打开 facebook 链接

    在我的应用程序中 我存储了 Facebook 网址 我想在 Facebook 应用程序中打开它们 而不是在浏览器中 我尝试使用flutter url launcher 包但它在默认浏览器中打开链接 我想要的是直接打开链接进入脸书应用程序 谁
  • Python 中的 RAII - 离开范围时自动销毁

    我一直在努力寻找RAII https en wikipedia org wiki Resource acquisition is initialization在Python中 资源分配即初始化是 C 中的一种模式 其中 对象在创建时就被初始
  • C 中的字符与多个字符的比较

    如何在不使用 if 的情况下将 C 中的字符与其他字符进行比较 有大量的 例如 假设我有一个名为 i 的字符 我想将其与其他 8 个字符进行比较 而这些字符之间没有任何联系 如果 i 至少等于这 8 个字符中的一个 则表达式为 true 像
  • Admob ( GoogleMobileAds 8.0.0 ) iOS SDK - 未找到 GADInterstitial API,如何使用 GADInterstitialAd - 请提供示例代码?

    以下行没有错误 import
  • 如何增加消息头

    Spring Integration Java DSL 有没有办法修改现有的消息头 我正在使用 SI Java DSL 重新实现下载重试机制 并且希望在发生失败时增加保存下载尝试的消息标头 然后根据与限制相比的尝试次数路由消息 我的路由基于