如何:实现 BatchMessageListenerContainer 以批量使用 JMS 队列

2023-11-30

我最近在 Spring Integration 中遇到了对 JMS 使用者的需求 - 能够消耗大量的突发数据,而不会给我的目标 Oracle 数据库带来过多的提交压力。

DefaultMessageListenerContainer 似乎不支持除消息事务之外的任何内容。

我在 google 上搜索了解决方案,找到了几个 - 但其中很多都不是通过继承 DMLC 来实现的,而是通过克隆和修改相同的原始源代码来实现 - 使其很容易被破坏,以防万一我以后希望转移到spring-jms 的最新版本。此外,被克隆的代码引用了 DMLC 的私有属性,因此必须将其排除在外。为了使这一切正常工作,还需要几个接口和一个自定义消息侦听器。总而言之,我感觉不舒服。

那么该怎么办?


嗯 - 这是一个简单而紧凑的解决方案,完全基于从 DefaultMessageListenerContainer 派生的单个类。

不过,我只使用消息驱动通道适配器和 ChainedTransactionManager 进行了测试 - 因为这是需要执行此类操作时的基本场景。

这是代码:

package dk.itealisten.myservice.spring.components;

import org.springframework.jms.listener.DefaultMessageListenerContainer;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import java.util.ArrayList;
import java.util.Enumeration;

public class BatchMessageListenerContainer extends DefaultMessageListenerContainer {

    public static final int DEFAULT_BATCH_SIZE = 100;

    public int batchSize = DEFAULT_BATCH_SIZE;

    /**
     * Override the method receiveMessage to return an instance of BatchMessage - an inner class being declared further down.
     */
    @Override
    protected Message receiveMessage(MessageConsumer consumer) throws JMSException {
        BatchMessage batch = new BatchMessage();
        while (!batch.releaseAfterMessage(super.receiveMessage(consumer))) ;
        return batch.messages.size() == 0 ? null : batch;
    }

    /**
     * As BatchMessage implements the javax.jms.Message interface it fits perfectly into the DMLC - only caveat is that SimpleMessageConverter dont know how to convert it to a Spring Integration Message - but that can be helped.
     * As BatchMessage will only serve as a container to carry the actual javax.jms.Message's from DMLC to the MessageListener it need not provide meaningful implementations of the methods of the interface as long as they are there.
     */
    protected class BatchMessage implements Message {

        public ArrayList<Message> messages = new ArrayList<Message>();

        /**
         * Add message to the collection of messages and return true if the batch meets the criteria for releasing it to the MessageListener.
         */
        public boolean releaseAfterMessage(Message message) {
            if (message != null) {
                messages.add(message);
            }
            // Are we ready to release?
            return message == null || messages.size() >= batchSize;
        }

        // Below is only dummy-implementations of the abstract methods of javax.jms.Message

        @Override
        public String getJMSMessageID() throws JMSException {
            return null;
        }

        @Override
        public void setJMSMessageID(String s) throws JMSException {

        }

        @Override
        public long getJMSTimestamp() throws JMSException {
            return 0;
        }

        @Override
        public void setJMSTimestamp(long l) throws JMSException {

        }

        @Override
        public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
            return new byte[0];
        }

        @Override
        public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException {

        }

        @Override
        public void setJMSCorrelationID(String s) throws JMSException {

        }

        @Override
        public String getJMSCorrelationID() throws JMSException {
            return null;
        }

        @Override
        public Destination getJMSReplyTo() throws JMSException {
            return null;
        }

        @Override
        public void setJMSReplyTo(Destination destination) throws JMSException {

        }

        @Override
        public Destination getJMSDestination() throws JMSException {
            return null;
        }

        @Override
        public void setJMSDestination(Destination destination) throws JMSException {

        }

        @Override
        public int getJMSDeliveryMode() throws JMSException {
            return 0;
        }

        @Override
        public void setJMSDeliveryMode(int i) throws JMSException {

        }

        @Override
        public boolean getJMSRedelivered() throws JMSException {
            return false;
        }

        @Override
        public void setJMSRedelivered(boolean b) throws JMSException {

        }

        @Override
        public String getJMSType() throws JMSException {
            return null;
        }

        @Override
        public void setJMSType(String s) throws JMSException {

        }

        @Override
        public long getJMSExpiration() throws JMSException {
            return 0;
        }

        @Override
        public void setJMSExpiration(long l) throws JMSException {

        }

        @Override
        public long getJMSDeliveryTime() throws JMSException {
            return 0;
        }

        @Override
        public void setJMSDeliveryTime(long l) throws JMSException {

        }

        @Override
        public int getJMSPriority() throws JMSException {
            return 0;
        }

        @Override
        public void setJMSPriority(int i) throws JMSException {

        }

        @Override
        public void clearProperties() throws JMSException {

        }

        @Override
        public boolean propertyExists(String s) throws JMSException {
            return false;
        }

        @Override
        public boolean getBooleanProperty(String s) throws JMSException {
            return false;
        }

        @Override
        public byte getByteProperty(String s) throws JMSException {
            return 0;
        }

        @Override
        public short getShortProperty(String s) throws JMSException {
            return 0;
        }

        @Override
        public int getIntProperty(String s) throws JMSException {
            return 0;
        }

        @Override
        public long getLongProperty(String s) throws JMSException {
            return 0;
        }

        @Override
        public float getFloatProperty(String s) throws JMSException {
            return 0;
        }

        @Override
        public double getDoubleProperty(String s) throws JMSException {
            return 0;
        }

        @Override
        public String getStringProperty(String s) throws JMSException {
            return null;
        }

        @Override
        public Object getObjectProperty(String s) throws JMSException {
            return null;
        }

        @Override
        public Enumeration getPropertyNames() throws JMSException {
            return null;
        }

        @Override
        public void setBooleanProperty(String s, boolean b) throws JMSException {

        }

        @Override
        public void setByteProperty(String s, byte b) throws JMSException {

        }

        @Override
        public void setShortProperty(String s, short i) throws JMSException {

        }

        @Override
        public void setIntProperty(String s, int i) throws JMSException {

        }

        @Override
        public void setLongProperty(String s, long l) throws JMSException {

        }

        @Override
        public void setFloatProperty(String s, float v) throws JMSException {

        }

        @Override
        public void setDoubleProperty(String s, double v) throws JMSException {

        }

        @Override
        public void setStringProperty(String s, String s1) throws JMSException {

        }

        @Override
        public void setObjectProperty(String s, Object o) throws JMSException {

        }

        @Override
        public void acknowledge() throws JMSException {

        }

        @Override
        public void clearBody() throws JMSException {

        }

        @Override
        public <T> T getBody(Class<T> aClass) throws JMSException {
            return null;
        }

        @Override
        public boolean isBodyAssignableTo(Class aClass) throws JMSException {
            return false;
        }
    }
}

下面是一个示例,展示了如何在 Spring 应用程序上下文中使用它:

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:jms="http://www.springframework.org/schema/integration/jms"
  xmlns:p="http://www.springframework.org/schema/p"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans     
    http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-4.0.xsd
    http://www.springframework.org/schema/integration/jms
    http://www.springframework.org/schema/integration/jms/spring-integration-jms-4.0.xsd">

<!-- Plug in the BatchMessageListenerContainer in a message-driven-channel-adapter -->
<jms:message-driven-channel-adapter container-class="dk.itealisten.myservice.spring.components.BatchMessageListenerContainer"
  acknowledge="transacted"
  channel="from.mq"
  concurrent-consumers="5"
  max-concurrent-consumers="15"
  connection-factory="jmsConnectionFactory"
  transaction-manager="transactionManager"
  destination="my.mq.queue"
  />

<!-- Flow processing the BatchMessages being posted on the "from.mq" channel -->
<int:chain input-channel="from.mq" output-channel="nullChannel">
  <int:splitter expression="payload.messages" />
  <!-- This is where we deal with conversion to spring messages as the payload is now a single standard javax.jms.Message implementation -->
  <int:transformer ref="smc" method="fromMessage"/>
  <!-- And finally we persist -->
  <int:service-activator ref="jdbcPublisher" method="persist"/>
</int:chain>

<!-- Various supporting beans -->

<!-- A bean to handle the database persistance --> 
<bean id="jdbcPersistor" class="dk.itealisten.myservice.spring.components.JdbcPersistor" p:dataSource-ref="dataSource" />

<!-- A bean to handle the conversion that could not take place in the MessageListener as it don't know how to convert a BatchMessage -->
<bean id="smc" class="org.springframework.jms.support.converter.SimpleMessageConverter"/>

<!-- Transaction manager must make sure messages are committed outbound (JDBC) before cleaned up inbound (JMS). -->
<bean id="transactionManager" class="org.springframework.data.transaction.ChainedTransactionManager">
  <constructor-arg name="transactionManagers">
    <list>
      <bean class="org.springframework.jms.connection.JmsTransactionManager" p:connectionFactory-ref="jmsConnectionFactory" />
      <bean class="org.springframework.jdbc.datasource.DataSourceTransactionManager" p:dataSource-ref="dataSource" />
    </list>
  </constructor-arg>
</bean>
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何:实现 BatchMessageListenerContainer 以批量使用 JMS 队列 的相关文章

  • 方法不必要地被调用?

    我有一个 BaseActivity 它可以通过其他所有活动进行扩展 问题是 每当用户离开 暂停 活动时 我都会将音乐静音 我也不再接听电话 问题是 onPause每当用户在活动之间切换时就会被调用 这意味着应用程序不必要地静音和停止tele
  • 目的地为必填项

    在 Wildfly 8 1 0 服务器中部署 Ear 时出现以下错误 1 Error Caused by javax resource spi InvalidPropertyException Destination is mandator
  • bool() 和operator.truth() 有什么区别?

    bool https docs python org 3 library functions html bool and operator truth https docs python org 3 library operator htm
  • 为什么 cross_val_predict 比 KNeighborsClassifier 的拟合慢得多?

    在 Jupyter 笔记本上本地运行并使用 MNIST 数据集 28k 条目 每个图像 28x28 像素 以下内容为27秒 from sklearn neighbors import KNeighborsClassifier knn clf
  • FTP 入站通道适配器的 FTP 问题

    我们的项目使用 ftp inbound channel adapter 从 FTP 服务器轮询文件 它工作正常 但在轮询之间不起作用 当我看到 FTP 服务器日志时 我看到 425 无法打开数据连接 现在 当我重新启动或停止并再次启动 ft
  • 超慢的表格布局性能

    我遇到了糟糕的 TableLayout 性能 我在这里读过一些帖子 谈论同样的事情 Android 动态创建表 性能不佳 https stackoverflow com questions 9813427 android dynamical
  • 双端队列与队列速度

    我正在研究 LeetCode 上的一个问题 Here https leetcode com problems moving average from data stream 当我完成这个问题后 我想出了 class MovingAverag
  • 动态 SQL 和 where case 哪个更好?

    我需要创建一个带有 12 个参数的存储过程 并使用这些参数的不同组合来过滤查询 所有 12 个参数都不是强制性的 就好像我传递 3 5 或 12 个参数取决于用户输入的搜索输入一样 我可以通过两种方式创建 即使用动态 SQL 查询或使用 C
  • 是否存在比 SVN 更快的集中版本控制?

    我已经使用 SVN 很长时间了 现在我们正在尝试使用 Git 我在这里谈论的不是中心化 去中心化的争论 我唯一关心的是速度 后一个工具要快得多 但有时 我需要使用一种集中式方法 这种方法比分散式方法更简单 更简单 学习曲线非常快 这节省了大
  • 在 C# 中存储矩阵值的快速且有用的方法

    我需要用 C 为 3D 引擎创建一个 4x4 矩阵类 我见过一些其他引擎将矩阵值存储在单个浮点成员变量 字段中 如下所示 float m11 m12 m13 m14 float m21 m22 m23 m24 float m31 m32 m
  • Emacs 行编号性能

    我试过了linum and nlinum 两者对于超过 100k 行的文件的性能都很糟糕 for x in 1 100000 do echo x done gt 100k txt emacs q 100k txt M x load libr
  • R:使用带有 .Call 和 C/C++ 包装器的 Fortran 子例程而不是 .Fortran 的优点?

    我有一个 R 包 它使用大量 Fortran 子例程来进行递归线性代数计算的嵌套循环 很大程度上依赖于 BLAS 和 LAPACK 例程 作为 Fortran 的接口 我使用 Fortran功能 我刚刚读过乔纳森卡拉汉的博客文章 http
  • C# 中单个 & 符号的第二个含义是什么?

    我在 C 中使用了单个与号 来表示 检查second条件语句即使第一个是false 但以下似乎是不同的意思 of 总而言之 谁能解释一下如何i 1在下面的例子中有效吗 List
  • 为什么 Java 11 中对于空白字符串 String.strip() 比 String.trim() 快 5 倍

    我遇到过一个有趣的场景 因为某些原因strip 针对空白字符串 仅包含空格 明显快于trim 在Java 11中 基准 public class Test public static final String TEST STRING 3 w
  • 优化数据可视化 Web 应用程序的性能

    我正在重写 3 年前编写的数据可视化网络工具 从那时起 浏览器的 JavaScript 引擎变得更快 所以我正在考虑将部分工作从服务器转移到客户端 在页面上 数据在表格和地图 或图表 中可视化 它使用相同的数据 但以不同的方式 因此准备显示
  • 在所有浏览器中启用我的网站的平滑滚动

    我正在开发一个视差滚动网站Stellar http markdalgleish com projects stellar js and Skrollr https github com Prinzhorn skrollr图书馆 该网站在 F
  • 在单个 mongodb 查询中查找并计数

    我的文档看起来像这样 id ObjectId 572c4bffd073dd581edae045 name What s New in PHP 7 description PHP 7 is the first new major versio
  • 如果目标上的消费者已关闭,则通知 ActiveMQ 生产者

    我正在使用 ActiveMQ 消息代理 并且我有一个要求 即生产者应用程序想要知道在特定目标上使用的消费者应用程序是否已启动 我怎样才能实现这个目标 Thanks 你应该结帐咨询信息 http activemq apache org adv
  • 优化 LATERAL join 中的慢速聚合

    在我的 PostgreSQL 9 6 2 数据库中 我有一个查询 该查询根据一些股票数据构建计算字段表 它为表中的每一行计算 1 到 10 年的移动平均窗口 并将其用于周期性调整 具体来说 CAPE CAPB CAPC CAPS 和 CAP
  • 当我使用可变参数而不是常量参数时,为什么我的内联表 UDF 慢得多?

    我有一个表值内联 UDF 我想过滤该 UDF 的结果以获得一个特定值 当我使用常量参数指定过滤器时 一切都很好 并且性能几乎是瞬时的 当我使用可变参数指定过滤器时 它会花费明显更大的时间块 大约是逻辑读取的 500 倍和持续时间的 20 倍

随机推荐