如何在 AUTO_ACKNOWLEDGE JMS 会话场景中模拟消息重新传递?

2024-03-16

在下面的测试中,我尝试模拟以下场景:

  1. 消息队列启动。
  2. 启动设计为在消息处理期间失败的消费者。
  3. 产生一条消息。
  4. 消费者开始处理消息。
  5. 在处理过程中抛出异常来模拟消息处理失败。失败的消费者被停止。
  6. 另一个消费者启动的目的是接收重新传递的消息。

但我的测试失败了,消息没有重新传递给新的消费者。我将不胜感激任何对此的提示。

MessageProcessingFailureAndReprocessingTest.java

@ContextConfiguration(locations="com.prototypo.queue.MessageProcessingFailureAndReprocessingTest$ContextConfig",
        loader=JavaConfigContextLoader.class)
public class MessageProcessingFailureAndReprocessingTest  extends AbstractJUnit4SpringContextTests {
    @Autowired
    private FailureReprocessTestScenario testScenario;

    @Before
    public void setUp() {
        testScenario.start();
    }

    @After
    public void tearDown() throws Exception {
        testScenario.stop();
    }

    @Test public void 
    should_reprocess_task_after_processing_failure() {
        try {
            Thread.sleep(20*1000);

            assertThat(testScenario.succeedingWorker.processedTasks, is(Arrays.asList(new String[]{
                    "task-1",
            })));
        } catch (InterruptedException e) {
            fail();
        }
    }

    @Configurable
    public static class FailureReprocessTestScenario {
        @Autowired
        public BrokerService broker;

        @Autowired
        public MockTaskProducer mockTaskProducer;

        @Autowired
        public FailingWorker failingWorker;

        @Autowired
        public SucceedingWorker succeedingWorker;

        @Autowired
        public TaskScheduler scheduler;

        public void start() {
            Date now = new Date();
            scheduler.schedule(new Runnable() {
                public void run() { failingWorker.start(); }
            }, now);

            Date after1Seconds = new Date(now.getTime() + 1*1000);
            scheduler.schedule(new Runnable() {
                public void run() { mockTaskProducer.produceTask(); }
            }, after1Seconds);

            Date after2Seconds = new Date(now.getTime() + 2*1000);
            scheduler.schedule(new Runnable() {
                public void run() {
                    failingWorker.stop();
                    succeedingWorker.start();
                }
            }, after2Seconds);
        }

        public void stop() throws Exception {
            succeedingWorker.stop();
            broker.stop();
        }
    }

    @Configuration
    @ImportResource(value={"classpath:applicationContext-jms.xml",
            "classpath:applicationContext-task.xml"})
    public static class ContextConfig {
        @Autowired
        private ConnectionFactory jmsFactory;

        @Bean
        public FailureReprocessTestScenario testScenario() {
            return new FailureReprocessTestScenario();
        }

        @Bean
        public MockTaskProducer mockTaskProducer() {
            return new MockTaskProducer();
        }

        @Bean
        public FailingWorker failingWorker() {
            TaskListener listener = new TaskListener();
            FailingWorker worker = new FailingWorker(listenerContainer(listener));
            listener.setProcessor(worker);
            return worker;
        }

        @Bean
        public SucceedingWorker succeedingWorker() {
            TaskListener listener = new TaskListener();
            SucceedingWorker worker = new SucceedingWorker(listenerContainer(listener));
            listener.setProcessor(worker);
            return worker;
        }

        private DefaultMessageListenerContainer listenerContainer(TaskListener listener) {
            DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
            listenerContainer.setConnectionFactory(jmsFactory);
            listenerContainer.setDestinationName("tasksQueue");
            listenerContainer.setMessageListener(listener);
            listenerContainer.setAutoStartup(false);
            listenerContainer.initialize();
            return listenerContainer;
        }

    }

    public static class FailingWorker implements TaskProcessor {
        private Logger LOG = Logger.getLogger(FailingWorker.class.getName());

        private final DefaultMessageListenerContainer listenerContainer;

        public FailingWorker(DefaultMessageListenerContainer listenerContainer) {
            this.listenerContainer = listenerContainer;
        }

        public void start() {
            LOG.info("FailingWorker.start()");
            listenerContainer.start();
        }

        public void stop() {
            LOG.info("FailingWorker.stop()");
            listenerContainer.stop();
        }

        @Override
        public void processTask(Object task) {
            LOG.info("FailingWorker.processTask(" + task + ")");
            try {
                Thread.sleep(1*1000);
                throw Throwables.propagate(new Exception("Simulate task processing failure"));
            } catch (InterruptedException e) {
                LOG.log(Level.SEVERE, "Unexpected interruption exception");
            }
        }
    }

    public static class SucceedingWorker implements TaskProcessor {
        private Logger LOG = Logger.getLogger(SucceedingWorker.class.getName());

        private final DefaultMessageListenerContainer listenerContainer;

        public final List<String> processedTasks;

        public SucceedingWorker(DefaultMessageListenerContainer listenerContainer) {
            this.listenerContainer = listenerContainer;
            this.processedTasks = new ArrayList<String>();
        }

        public void start() {
            LOG.info("SucceedingWorker.start()");
            listenerContainer.start();
        }

        public void stop() {
            LOG.info("SucceedingWorker.stop()");
            listenerContainer.stop();
        }

        @Override
        public void processTask(Object task) {
            LOG.info("SucceedingWorker.processTask(" + task + ")");
            try {
                TextMessage taskText = (TextMessage) task;
                processedTasks.add(taskText.getText());
            } catch (JMSException e) {
                LOG.log(Level.SEVERE, "Unexpected exception during task processing");
            }
        }
    }

}

任务监听器.java

public class TaskListener implements MessageListener {

    private TaskProcessor processor;

    @Override
    public void onMessage(Message message) {
        processor.processTask(message);
    }

    public void setProcessor(TaskProcessor processor) {
        this.processor = processor;
    }

}

MockTaskProducer.java

@Configurable
public class MockTaskProducer implements ApplicationContextAware {
    private Logger LOG = Logger.getLogger(MockTaskProducer.class.getName());

    @Autowired
    private JmsTemplate jmsTemplate;

    private Destination destination;

    private int taskCounter = 0;

    public void produceTask() {
        LOG.info("MockTaskProducer.produceTask(" + taskCounter + ")");

        taskCounter++;

        jmsTemplate.send(destination, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage message = session.createTextMessage("task-" + taskCounter);
                return message;
            }
        });
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext)
            throws BeansException {
        destination = applicationContext.getBean("tasksQueue", Destination.class);
    }
}

显然是我昨天查看的文档来源创建健壮的 JMS 应用程序 http://docs.oracle.com/javaee/1.4/tutorial/doc/JMS6.html以某种方式误导我(或者我可能理解错误)。尤其是那段摘录:

在 JMS 消息被确认之前,它不被认为是 成功消耗。消息消费成功 通常分三个阶段进行。

  1. 客户端收到消息。
  2. 客户端处理消息。
  3. 该消息已被确认。确认由 JMS 提供者或客户端发起,具体取决于会话 确认模式。

我以为自动确认正是这样做的 - 在侦听器方法返回结果后确认消息。但根据 JMS 规范,它有点不同,Spring 监听器容器正如预期的那样不会尝试改变 JMS 规范的行为。这就是javadoc的内容抽象消息监听容器不得不说——我已经强调了重要的句子:

侦听器容器提供以下消息确认 选项:

  • “sessionAcknowledgeMode”设置为“AUTO_ACKNOWLEDGE”(默认):监听器执行前自动消息确认;抛出异常时不会重新投递。
  • “sessionAcknowledgeMode”设置为“CLIENT_ACKNOWLEDGE”:监听器执行成功后自动消息确认;不 抛出异常时重新投递。
  • “sessionAcknowledgeMode”设置为“DUPS_OK_ACKNOWLEDGE”:侦听器执行期间或之后延迟消息确认;潜在的 抛出异常时重新投递。
  • “sessionTransacted”设置为“true”:侦听器成功执行后的事务确认;在抛出异常的情况下保证重新交付。

所以我的解决方案的关键是listenerContainer.setSessionTransacted(true);

我面临的另一个问题是,JMS 提供程序不断将失败的消息重新传递给在消息处理过程中失败的同一个使用者。我不知道 JMS 规范是否给出了提供者在这种情况下应该做什么的规定,但对我有用的是使用listenerContainer.shutdown();为了断开失败的消费者的连接并允许提供者重新传递消息并为另一个消费者提供机会。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 AUTO_ACKNOWLEDGE JMS 会话场景中模拟消息重新传递? 的相关文章

随机推荐

  • 如何使用 Backbone 处理单页应用程序的 UI 状态

    请耐心等待 因为我来自传统的 Web 开发背景 使用 ASP Net 甚至服务器端 MVC 我正在尝试使用 Backbone js 构建一个高度交互的单页应用程序 以帮助组织我的 javascript 代码并构建 UI 我在构建 UI 组件
  • JAVA_HOME 未正确定义 编译时出错

    我正在 cocoa 中开发一个使用一些 java 类的应用程序 我收到错误 JAVA HOME 未正确定义我们无法执行 System Library Frameworks JavaVM framework Home bin java 我不知
  • Java,默认编码[重复]

    这个问题在这里已经有答案了 可能的重复 jvm默认编码是什么 https stackoverflow com questions 1006276 what is the default encoding of jvm Hello 当用于处理
  • 函数式反应式编程的“信号”表示是否正确?

    我一直在研究 FRP 并发现了许多不同的实现 我见过的一种模型我将其称为 信号 表示 这一本质将事件和行为结合成一个实体 首先 信号是一个对象 其值是一种行为 其次 信号具有一个事件 流 可以将其视为标准数据结构并对其进行操作 您可以在信号
  • 为什么控制器对路线更新没有响应?

    我想保留控制器的实例而不重新加载 我设置reloadOnSearch为 false 我在控制器中管理路由更改 这是代码 这是我的链接的示例 a href products page 2 next a 我定义了以下模块 angular mod
  • jquery mobile 一页具有不同的元视口设置

    我尝试过调整我的元数据viewportjQM 上的标签beforepageshow等等它根本不起作用 我想这是很明显的原因 我有一个页面上有一个 highcharts 图表 我的视口标签内没有任何内容 通常有width device wid
  • Python 中的元组声明

    在 python 中 可以用括号显式声明一个元组 如下所示 gt gt gt x 0 25 0 25 0 25 0 25 gt gt gt x 0 25 0 25 0 25 0 25 gt gt gt type x
  • 有没有办法对 Neo4j 数据进行分片和复制?

    我正在考虑为我正在从事的一些新项目选择 Neo4j 对于给定的数据需求 本质上基于图形 neo4j 非常适合 并且快速原型为我提供了良好的响应时间 我想了解的是如何扩展 neo4j 部署 具体来说 如何跨 neo4j 部署对数据进行分片 由
  • 我可以从 BitBucket 克隆 git 存储库吗?

    我正在尝试使用 BitBucket git 存储库 但我个人更喜欢使用 hg 这有效 git clone https email protected cdn cgi l email protection projectuser projec
  • Flask-SQLAlchemy – 您可以在模型中进行查询吗?

    我正在构建一个使用 Flask SQLAlchemy 的 Flask Web 应用程序 并且我还在考虑使用 Flask Login 来处理会话并保护某些视图 Flask Login 需要某些方法 我认为这些方法对应用程序的各个部分都很有用
  • MySQL标记问题:如何选择已标记为X、Y和Z的项目?

    我正在处理一个数据库 其中的项目被 标记 一定次数 item 100k 行 id name 其他的东西 tag 10k 行 id name item2tag 1 000 000 行 item id tag id count 我正在寻找最快的
  • C# 中集合的 XML 序列化

    我有两个课程如下 public class Info XmlAttribute public string language public int version public Book book public Info public In
  • Swagger-Web Api 文档(Swashbuckle 中缺少 Bootstrapper)

    我正在尝试使用 swagger 作为我的 Web api 文档 为此我已经从 nuget 包安装了 Swashbuckle 但我无法在 swaggerconfig cs 类中获取 Bootstrapper 包 那么是否有其他替代方法可以在
  • 如何在 Windows 7 的 git bash 终端中不使用鼠标选择文本?

    我使用的是 Windows 7 安装 Windows 版 Git 后可以使用 Git Bash I found mouse inconvenient to select copy paste in terminal window What
  • C++CLI。本机部分是用纯 C++ 编写的,但在 CLI 中编译的速度与纯本机 C++ 一样快吗?

    我想将音频计算委托给 C 层 但通过 WPF GUI 处理和编辑音频内容 我简要了解了 C CLI 我想知道 我应该使用 C CLI 作为 C GUI 和 C 音频管理之间的中间层吗 或者我应该简单地将代码放入 C CLI 中并期望它以相同
  • 如何在 if 语句中使用 UIActionSheet? [关闭]

    Closed 这个问题是无法重现或由拼写错误引起 help closed questions 目前不接受答案 我有一个if声明工作正常 但我需要添加第二个if其中的声明 我似乎不知道如何正确执行 这是我的代码 IBAction xButto
  • 在运行时指定多个谓词

    STL中有一些运算符类 如less equal to greater equal等 如何轻松地将它们组合起来与remove if函数一起使用 例如我想删除向量中大于 0 且小于 3 且不等于 2 的元素 那么它会是这样的 remove if
  • Java 6 的 WatchService [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 Java 7 推出WatchService用于连续监控文件系统 Java 6 有向后移植吗 是否有具有
  • JUnit 测试:通过模拟抑制枚举构造函数?

    我知道可以模拟单个枚举 使用如何使用 Mockito Powermock 模拟枚举单例类 https stackoverflow com questions 15939023 how to mock an enum singleton cl
  • 如何在 AUTO_ACKNOWLEDGE JMS 会话场景中模拟消息重新传递?

    在下面的测试中 我尝试模拟以下场景 消息队列启动 启动设计为在消息处理期间失败的消费者 产生一条消息 消费者开始处理消息 在处理过程中抛出异常来模拟消息处理失败 失败的消费者被停止 另一个消费者启动的目的是接收重新传递的消息 但我的测试失败