无法让 ActiveMQ 重新发送我的消息

2023-12-19

我有一个用 Java 编写的单线程 ActiveMQ 使用者。我想做的就是从队列中接收()一条消息,尝试将其发送到网络服务,如果成功则确认()它。如果 Web 服务调用失败,我希望消息保留在队列中,并在超时后重新发送。

除了重新发送部分之外,它或多或少都在工作:每次我重新启动消费者时,它都会为仍在队列中的每条消息获取一条消息,但在发送失败后,这些消息永远不会重新发送。

我的代码如下所示:

public boolean init() throws JMSException, FileNotFoundException, IOException {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
    RedeliveryPolicy policy = new RedeliveryPolicy();
    policy.setInitialRedeliveryDelay(500);
    policy.setBackOffMultiplier(2);
    policy.setUseExponentialBackOff(true);

    connectionFactory.setRedeliveryPolicy(policy);
    connectionFactory.setUseRetroactiveConsumer(true); // ????
    Connection connection = connectionFactory.createConnection();

    connection.setExceptionListener(this);
    connection.start();

    session = connection.createSession(transacted, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
    destination = session.createQueue(subject); //???

    consumer = session.createConsumer(destination);
    //consumer.setMessageListener(this); // message listener had same behaviour

}

private void process() {
    while(true) {
        System.out.println("Waiting...");
        try {
            Message message = consumer.receive();
            onMessage(message);
        } catch (JMSException e) {
            e.printStackTrace();
        }
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

@Override
public void onMessage(Message message) {
    System.out.println("onMessage");
    messagesReceived++;

    if (message instanceof TextMessage) {
        try {
            TextMessage txtMsg = (TextMessage) message;
            String msg = txtMsg.getText();

            if(!client.sendMessage(msg)) {
                System.out.println("Webservice call failed. Keeping message");
                //message.
            } else {
                message.acknowledge();
            }

            if (transacted) {
                if ((messagesReceived % batch) == 0) {
                    System.out.println("Commiting transaction for last " + batch + " messages; messages so far = " + messagesReceived);
                    session.commit();
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

我目前没有使用交易(也许我应该使用?)。

我确信我错过了一些简单的事情,很快就会拍打我的额头,但我似乎不知道这是如何运作的。谢谢!


编辑:我自己无法回答这个问题,因为代表不够:

好吧,经过更多的实验,事实证明交易是做到这一点的唯一方法。这是新代码:

public boolean init() throws JMSException, FileNotFoundException, IOException {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
    RedeliveryPolicy policy = new RedeliveryPolicy();
    policy.setInitialRedeliveryDelay(1000L);
    policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES);

    connectionFactory.setRedeliveryPolicy(policy);
    connectionFactory.setUseRetroactiveConsumer(true);
    Connection connection = connectionFactory.createConnection();

    connection.setExceptionListener(this);
    connection.start();

    session = connection.createSession(transacted, ActiveMQSession.CLIENT_ACKNOWLEDGE);
    destination = session.createQueue(subject);

    consumer = session.createConsumer(destination);
}

@Override
public void onMessage(Message message) {
    System.out.println("onMessage");
    messagesReceived++;

    if (message instanceof TextMessage) {
        try {
            TextMessage txtMsg = (TextMessage) message;
            String msg = txtMsg.getText();

            if(client.sendMessage(msg)) {
                if(transacted) {
                    System.out.println("Call succeeded - committing message");
                    session.commit();
                }
                //message.acknowledge();
            } else {
                if(transacted) {
                    System.out.println("Webservice call failed. Rolling back message");
                    session.rollback();
                }
            }

        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

现在,按照重新传递策略中的规定,每 1000 毫秒重新发送一次消息。

希望这对其他人有帮助! :)


您不必使用事务,CLIENT_ACK/Session.recover() 也可以工作......

当发生以下任一情况时,消息将重新传送到客户端:

  • 使用事务处理会话并调用 rollback()。
  • 事务会话在调用提交之前关闭。
  • 会话正在使用 CLIENT_ACKNOWLEDGE 并调用 Session.recover()。

see http://activemq.apache.org/message-redelivery-and-dlq-handling.html http://activemq.apache.org/message-redelivery-and-dlq-handling.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

无法让 ActiveMQ 重新发送我的消息 的相关文章

随机推荐

  • 为什么 hive 不允许使用 CTAS 创建外部表?

    在hive中 通过CTAS创建外部表是语义错误 为什么 CTAS创建的表是原子的 而外部表意味着删除表时数据不会被删除 它们似乎并不冲突 在 Hive 中 当我们创建表 非外部 时 数据将存储在 user hive warehouse 中
  • 如何使用 Qt 禁用 Alt + F4 窗口关闭?

    我使用以下行从对话框中禁用了 Qt 中的 X 按钮 myDialog gt setWindowFlags Qt Dialog Qt Desktop but I couldn t detect Alt F4 using this code v
  • 在 ember js 和 mvc 中使用验证码

    我正在使用创建一个网站ember js and asp net mvc 在我之前的asp net我用过的网站captchaMVC http captchamvc codeplex com http captchamvc codeplex c
  • 无奈地迷失了 openCV 和 HoughCircles

    我正在尝试检测这个黑色圆圈here https i stack imgur com clpR6 png 应该不会太困难 但由于某种原因 我到处都是 0 个圆圈或大约 500 个圆圈 具体取决于参数 但没有中间立场 感觉就像我已经尝试了几个小
  • 以编程方式一步提取 tar.gz(在使用 7-Zip 的 Windows 上)

    Problem 我希望能够一步提取 tar gz 文件 这让我的问题almost与此相同 tar gz 的堆栈溢出问题 https stackoverflow com questions 651018 opening a tar gz fi
  • scala 的鼠标事件有效吗?如何?

    我正在尝试在 scala 中创建类似链接的标签 但没有鼠标事件对我有用 他们应该如何工作 class Hyperlink extends Label text hyperlink reactions case MouseClicked gt
  • 如果我不关闭 StardandInput,C# 重定向其他控制台应用程序 StandardOutput 会失败

    我的控制台应用程序遇到了一个棘手的问题 我试图从中重定向 StandardInput StandardOutput 和 StandardError 我已经为其他控制台应用程序找到了一个可行的解决方案 这对我来说并不是什么新鲜事 但这个应用程
  • 有序列表索引

    有什么方法可以获取a的编号 索引 li有序列表中的标签 我正在尝试获取侧面显示的数字 列表编号 我知道传统的方法是使用存储行号的 id 但这意味着如果在中间添加一行 则必须编辑大量 id 尽管我为此开发了一种算法 但它的效率并不高 我正在寻
  • Java中的2d ArrayList添加数据

    我在家庭作业上几乎不需要帮助 我必须创建一个 10 x 10ArrayList 不是数组 这就是我所拥有的 我只需要有关如何执行 for 循环将日期添加到 2D 的提示ArrayList 顺便说一下 这是为了放置成绩数据 从 100 到 8
  • 提取 Javascript 数字的指数和尾数

    有没有一种相当快速的方法可以从 JavaScript 中的数字中提取指数和尾数 AFAIK 没有办法获取 Javascript 中数字后面的位 这让我觉得我正在研究一个因式分解问题 找到m and n这样2 n m k对于给定的k 由于整数
  • || 到底是什么?意思是?

    return empty neededRole strcasecmp role admin 0 strcasecmp role neededRole 0 到底是什么 在这个声明中是什么意思 有人可以帮我把这个翻译成英文吗 我保证我已经用谷歌
  • 32 位应用程序未更新 64 位注册表项

    C 中提到的路径中的注册表项未更新 string path Software Microsoft Windows NT CurrentVersion Windows RegistryKey myKey Microsoft Win32 Reg
  • 绑定不适用于在 XAML 中创建的 DependencyObject

    我尝试使用自定义类在 XAML 中传递多个 CommandParameters 我创建了一个名为值命令参数继承自依赖对象并有两个依赖属性 我们称它们为Value1 and Value2对于这个例子 应该调用命令并传递该对象的按钮如下所示
  • 核心数据在后台保存对象问题

    简而言之 我想要做的是使用后台队列将从 Web 服务提取的 JSON 对象保存到 Core Data Sqlite3 数据库 保存发生在我通过 GCD 创建的序列化后台队列上 并保存到为该后台队列创建的 NSManagedObjectCon
  • Python:包中的“私有”模块

    我有一个包裹mypack带模块mod a and mod b在里面 我想要包裹本身mod a自由进口 import mypack import mypack mod a 不过 我想保留mod b专供mypack 那是因为它的存在只是为了组织
  • 如何在应用程序中使用 iPhone 铃声

    我想在我的应用程序中使用 iPhone 的铃声 这可能吗 请帮助我解决这一点 Thanks 您无法以编程方式使用或更改可用的铃声 很遗憾地说 苹果公司实在是太保守了
  • 添加 TabBarController 作为视图的子视图

    当我的应用程序启动时 我正在加载启动屏幕 然后我想加载一个TabBarController 它是ViewControllers 但是 我的 TabBarController 窗口无法缩放到屏幕尺寸 底部的 TabBar 的 3 4 可能被切
  • 用Windows Forms数据控件和DataTable实现图库式的显示?

    我有一个datatable看起来像下面这样 Room Cook Waiter BG Image 201 Joe Jim Green png 202 Jack Mary Red png 203 Jet Mark Yellow png 204
  • Google 登录 - “access_token”与“id_token”与“代码”

    在我们的网站中 我们过去在使用 Google Sign In 登录人员时使用 access token 首先 我们将用户重定向到 google 用户将 access token 带给我们 我们验证该令牌以确保该用户是实际的 Google 用
  • 无法让 ActiveMQ 重新发送我的消息

    我有一个用 Java 编写的单线程 ActiveMQ 使用者 我想做的就是从队列中接收 一条消息 尝试将其发送到网络服务 如果成功则确认 它 如果 Web 服务调用失败 我希望消息保留在队列中 并在超时后重新发送 除了重新发送部分之外 它或