RocketMQ第五篇 RocketMQ API基本使用

2023-11-13

目录

生产者Product

 消费者Consumer


前面已经学习了Rocket的基本知识,以及搭建MQ单机版和集群环境,下面开始进行实际开发,根据前面下载的RocketMQ源码,开展讲解RocketMQ 的基本使用:

生产者Product

在RocketMq的源码中的example子项目中,有关系rocketMQ API的使用:

使用Maven开发需要引入rocketMQ依赖:

<dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.7.1</version>
</dependency>

发送消息主要有三种形式:

  1. 同步发送消息
  2. 异步发送消息
  3. 单向发送消息 
  • 同步发送消息
        DefaultMQProducer producer = new DefaultMQProducer("poc-mq-product");
        //集群环境
        // producer.setNamesrvAddr("192.168.36.132:9876;192.168.36.131:9876");
        producer.setNamesrvAddr("192.168.36.131:9876");
        //启动生产者
        producer.start();

        for (int i = 0; i < 2; i++) {
            try {
                User user = new User();
                user.setAddress("河南"+i);
                user.setId(i);
                user.setName("小米"+i);
                //创建一个消息实例,指定主题,标签和消息正文。
                Message msg = new Message("pocTopic" ,
                    "TagA" ,"keys"+i,
                        user.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)
                );
                //messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
                //时间延迟级别
                msg.setDelayTimeLevel(2);
                //呼叫发送消息以将消息传递给其中一个brokers。同步发送
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        //一旦不再使用生产者实例,请关闭
        producer.shutdown();
  • 异步发送消息
       DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);

        int messageCount = 100;
        //由于是异步发送,这里引入一个countDownLatch,保证所有Producer发送消息的回调方法都执行完了再停止Producer服务。
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
        for (int i = 0; i < messageCount; i++) {
            try {
                final int index = i;
                Message msg = new Message("Jodie_topic_1023",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                //回调 SendCallback
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }

                    @Override
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();
  • 单向发送消息

       DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        for (int i = 0; i < 100; i++) {
            Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);
        }
        //Wait for sending to complete
        Thread.sleep(5000);        
        producer.shutdown();

 消费者Consumer

消费者消费消息有两种模式:

一种是消费者主动去Broker上拉取消息的拉模式,另一种是消费者等待Broker把消息推送过来的推模式。

  • push推模式
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("poc-mq-consumer");
        //consumer.setNamesrvAddr("192.168.36.132:9876;192.168.36.131:9876");
        consumer.setNamesrvAddr("192.168.36.131:9876");
        //如果指定的消费群体是一个全新的消费群体,请指定从何处开始。
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("pocTopic", "*");
        //注册监听器 集群消费
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
  • pull拉模式

通常情况下,用推模式比较简单。实际上RocketMQ的推模式也是由拉模式封装出来的。4.7.1版本中DefaultMQPullConsumerImpl这个消费者类已标记为过期,但是还是可以使用的。替换的类是DefaultLitePullConsumerImpl。

1:DefaultLitePullConsumerImpl:

   DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
        litePullConsumer.setNamesrvAddr("localhost:9876");
        litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        litePullConsumer.subscribe("TopicTest", "*");
        litePullConsumer.start();
        try {
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s%n", messageExts);
            }
        } finally {
            litePullConsumer.shutdown();
        }

    DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
    litePullConsumer.setAutoCommit(false);
    litePullConsumer.start();

    Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest");
    List<MessageQueue> list = new ArrayList<>(mqSet);
    List<MessageQueue> assignList = new ArrayList<>();
    for (int i = 0; i < list.size() / 2; i++) {
        assignList.add(list.get(i));
    }
    litePullConsumer.assign(assignList);
    litePullConsumer.seek(assignList.get(0), 10);
    try {
        while (running) {
            List<MessageExt> messageExts = litePullConsumer.poll();
            System.out.printf("%s %n", messageExts);
            litePullConsumer.commitSync();
        }
    } finally {
        litePullConsumer.shutdown();
    }

2:DefaultMQPullConsumer:

private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();

    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();

        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("broker-a");
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        consumer.shutdown();
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if (offset != null){
            return offset;
        }

        return 0;
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSE_TABLE.put(mq, offset);
    }

}

在实际开发中会遇到顺序发送消息,顺序消费消息等情况

  1. 顺序生产消息
            MQProducer producer = new DefaultMQProducer("orderProduce");
            producer.start();

            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                int orderId = i % 10;
                Message msg =
                    new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            }

            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
  1. 顺序消费消息
   DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                this.consumeTimes.incrementAndGet();
                if ((this.consumeTimes.get() % 2) == 0) {
                    return ConsumeOrderlyStatus.SUCCESS;
                } else if ((this.consumeTimes.get() % 3) == 0) {
                    return ConsumeOrderlyStatus.ROLLBACK;
                } else if ((this.consumeTimes.get() % 4) == 0) {
                    return ConsumeOrderlyStatus.COMMIT;
                } else if ((this.consumeTimes.get() % 5) == 0) {
                    context.setSuspendCurrentQueueTimeMillis(3000);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");

验证时,可以启动多个Consumer实例,观察下每一个订单的消息分配以及每个订单下多个步骤的消费顺序。不管订单在多个Consumer实例之前是如何分配的,每个订单下的多条消息顺序都是固定从0~5的。RocketMQ保证的是消息的局部有序,而不是全局有序。

先从控制台上看下List mqs是什么。再回看我们的样例,实际上,RocketMQ也只保证了每个OrderID的所有消息有序(发到了同一个queue),而并不能保证所有消息都有序。所以这就涉及到了RocketMQ消息有序的原理。要保证最终消费到的消息是有序的,需要从Producer、Broker、Consumer三个步骤都保证消息有序才行。

首先在发送者端:在默认情况下,消息发送者会采取Round Robin 轮询方式把消息发送到不同的MessageQueue(分区队列),而消费者消费的时候也从多个MessageQueue上拉取消息,这种情况下消息是不能保证顺序的。而只有当一组有序的消息发送到同一个MessageQueue上时,才能利用MessageQueue先进先出的特性保证这一组消息有序。

而Broker中一个队列内的消息是可以保证有序的。然后在消费者端:消费者会从多个消息队列上去拿消息。这时虽然每个消息队列上的消息是有序的,但是多个队列之间的消息仍然是乱序的。消费者端要保证消息有序,就需要按队列一个一个来取消息,即取完一个队列的消息后,再去取下一个队列的消息。而给consumer注入的MessageListenerOrderly对象,在RocketMQ内部就会通过锁队列的方式保证消息是一个一个队列来取的。MessageListenerConcurrently这个消息监听器则不会锁队列,每次都是从多个Message中取一批数据(默认不超过32条)。因此也无法保证消息有序。

  • 广播消费
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.setMessageModel(MessageModel.BROADCASTING);

        consumer.subscribe("TopicTest", "TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

广播消息并没有特定的消息消费者样例,这是因为这涉及到消费者的集群消费模式。在集群状态(MessageModel.CLUSTERING)下,每一条消息只会被同一个消费者组中的一个实例消费到(这跟kafka和rabbitMQ的集群模式是一样的)。而广播模式则是把消息发给了所有订阅了对应主题的消费者,而不管消费者是不是同一个消费者组。

  • 延迟消息
   // Instantiate a producer to send scheduled messages
         DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
         // Launch producer
         producer.start();
         int totalMessagesToSend = 100;
         for (int i = 0; i < totalMessagesToSend; i++) {
             Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
             // This message will be delivered to consumer 10 seconds later.
             message.setDelayTimeLevel(3);
             // Send the message
             producer.send(message);
         }
    
         // Shutdown producer after use.
         producer.shutdown();

          延迟消息实现的效果就是在调用producer.send方法后,消息并不会立即发送出去,而是会等一段时间再发送出去。这是RocketMQ特有的一个功能。那会延迟多久呢?延迟时间的设置就是在Message消息对象上设置一个延迟级别message.setDelayTimeLevel(3);开源版本的RocketMQ中,对延迟消息并不支持任意时间的延迟设定(商业版本中支持),而是只支持18个固定的延迟级别,1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。这从哪里看出来的?其实从rocketmq-console控制台就能看出来。而这18个延迟级别也支持自行定义,不过一般情况下最好不要自定义修改。那这么好用的延迟消息是怎么实现的?这18个延迟级别除了在延迟消息中用,还有什么地方用到了?别急,我们会在后面部分进行详细讲解。

  • 批量消息

批量消息是指将多条消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。批量消息的消息生产者样例见:org.apache.rocketmq.example.batch.SimpleBatchProducer和org.apache.rocketmq.example.batch.SplitBatchProducer

    DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
        producer.start();

        //If you just send messages of no more than 1MiB at a time, it is easy to use batch
        //Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));

        producer.send(messages);
org.apache.rocketmq.example.batch.SplitBatchProducer;
public class SplitBatchProducer {

    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
        producer.start();

        //large batch
        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>(100 * 1000);
        for (int i = 0; i < 100 * 1000; i++) {
            messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
        }

        //split the large batch into small ones:
        ListSplitter splitter = new ListSplitter(messages);
        while (splitter.hasNext()) {
            List<Message> listItem = splitter.next();
            producer.send(listItem);
        }
    }

}

class ListSplitter implements Iterator<List<Message>> {
    private int sizeLimit = 1000 * 1000;
    private final List<Message> messages;
    private int currIndex;

    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; //for log overhead
            if (tmpSize > sizeLimit) {
                //it is unexpected that single message exceeds the sizeLimit
                //here just let it go, otherwise it will block the splitting process
                if (nextIndex - currIndex == 0) {
                    //if the next sublist has no element, add this one and then break, otherwise just break
                    nextIndex++;
                }
                break;
            }
            if (tmpSize + totalSize > sizeLimit) {
                break;
            } else {
                totalSize += tmpSize;
            }

        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException("Not allowed to remove");
    }
}

       相信大家在官网以及测试代码中都看到了关键的注释:如果批量消息大于1MB就不要用一个批次发送,而要拆分成多个批次消息发送。也就是说,一个批次消息的大小不要超过1MB实际使用时,这个1MB的限制可以稍微扩大点,实际最大的限制是4194304字节,大概4MB。但是使用批量消息时,这个消息长度确实是必须考虑的一个问题。而且批量消息的使用是有一定限制的,这些消息应该有相同的Topic,相同的waitStoreMsgOK。而且不能是延迟消息、事务消息等。

  • 过滤消息

在大多数情况下,可以使用Message的Tag属性来简单快速的过滤信息。

使用Tag过滤消息的消息生产者案例见:org.apache.rocketmq.example.filter.TagFilterProducer

使用Tag过滤消息的消息消费者案例见:org.apache.rocketmq.example.filter.TagFilterConsumer

public class TagFilterProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC"};
        for (int i = 0; i < 60; i++) {
            Message msg = new Message("TagFilterTest",  tags[i % tags.length],
            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}
public class TagFilterConsumer {
    public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        consumer.subscribe("TagFilterTest", "TagA || TagC");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

但是,这种方式有一个很大的限制,就是一个消息只能有一个TAG,这在一些比较复杂的场景就有点不足了。 这时候,可以使用SQL表达式来对消息进行过滤。

SQL过滤的消息生产者案例见:org.apache.rocketmq.example.filter.SqlFilterProducer

SQL过滤的消息消费者案例见:org.apache.rocketmq.example.filter.SqlFilterConsumer

这个模式的关键是在消费者端使用MessageSelector.bySql(String sql)返回的一个MessageSelector。这里面的sql语句是按照SQL92标准来执行的。sql中可以使用的参数有默认的TAGS和一个在生产者中加入的a属性。SQL92语法:RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。数值比较,比如:>,>=,字符比较,比如:=,<>,IN;IS NULL 或者 IS NOT NULL;逻辑符号 AND,OR,NOT;常量支持类型为:数值,比如:123,3.1415;字符,比如:'abc',必须用单引号包裹起来;NULL,特殊的常量布尔值,TRUE 或 FALSE使用注意:只有推模式的消费者可以使用SQL过滤。拉模式是用不了的。

大家想一下,这个消息过滤是在Broker端进行的还是在Consumer端进行的?


public class SqlFilterProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC"};
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("SqlFilterTest",
                tags[i % tags.length],
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            msg.putUserProperty("a", String.valueOf(i));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}
public class SqlFilterConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        // Don't forget to set enablePropertyFilter=true in broker
        consumer.subscribe("SqlFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
                "and (a is not null and a between 0 and 3)"));
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}
  • 事务消息

       这个事务消息是RocketMQ提供的一个非常有特色的功能,需要着重理解。​ 首先,我们了解下什么是事务消息。官网的介绍是:事务消息是在分布式系统中保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两个操作的原子性,也就是这两个操作一起成功或者一起失败。

​      其次,我们来理解下事务消息的编程模型。事务消息只保证消息发送者的本地事务与发消息这两个操作的原子性,因此,事务消息的示例只涉及到消息发送者,对于消息消费者来说,并没有什么特别的。事务消息生产者的案例见:org.apache.rocketmq.example.transaction.TransactionProducer

      事务消息的关键是在TransactionMQProducer中指定了一个TransactionListener事务监听器,这个事务监听器就是事务消息的关键控制器。源码中的案例有点复杂,我这里准备了一个更清晰明了的事务监听器示例。

public class TransactionListenerImpl implements TransactionListener {
	//在提交完事务消息后执行。
	//返回COMMIT_MESSAGE状态的消息会立即被消费者消费到。
	//返回ROLLBACK_MESSAGE状态的消息会被丢弃。
	//返回UNKNOWN状态的消息会由Broker过一段时间再来回查事务的状态。
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String tags = msg.getTags();
        //TagA的消息会立即被消费者消费到
        if(StringUtils.contains(tags,"TagA")){
            return LocalTransactionState.COMMIT_MESSAGE;
        //TagB的消息会被丢弃
        }else if(StringUtils.contains(tags,"TagB")){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        //其他消息会等待Broker进行事务状态回查。
        }else{
            return LocalTransactionState.UNKNOW;
        }
    }
	//在对UNKNOWN状态的消息进行状态回查时执行。返回的结果是一样的。
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
		String tags = msg.getTags();
        //TagC的消息过一段时间会被消费者消费到
        if(StringUtils.contains(tags,"TagC")){
            return LocalTransactionState.COMMIT_MESSAGE;
        //TagD的消息也会在状态回查时被丢弃掉
        }else if(StringUtils.contains(tags,"TagD")){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        //剩下TagE的消息会在多次状态回查后最终丢弃
        }else{
            return LocalTransactionState.UNKNOW;
        }
    }
}
public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        //定义了一个线程池
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

然后,我们要了解下事务消息的使用限制:

​ 1、事务消息不支持延迟消息和批量消息。

​ 2、为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax )则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。回查次数是由BrokerConfig.transactionCheckMax这个参数来配置的,默认15次,可以在broker.conf中覆盖。 然后实际的检查次数会在message中保存一个用户属性MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES。这个属性值大于transactionCheckMax,就会丢弃。 这个用户属性值会按回查次数递增,也可以在Producer中自行覆盖这个属性。

​ 3、事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。

     由BrokerConfig.transactionTimeOut这个参数来配置。默认6秒,可以在broker.conf中进行修改。 另外,也可以给消息配置一个MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS属性来给消息指定一个特定的消息回查时间。      msg.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, "10000"); 这样就是10秒。

​ 4、事务性消息可能不止一次被检查或消费。

​ 5、提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。

​ 6、事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

事务消息实现机制原理图如下:

​ 事务消息机制的关键是在发送消息时,会将消息转为一个half半消息,并存入RocketMQ内部的一个 RMQ_SYS_TRANS_HALF_TOPIC 这个Topic,这样对消费者是不可见的。再经过一系列事务检查通过后,再将消息转存到目标Topic,这样对消费者就可见了。

事务消息方案:

  1. 同步发送+多次重试 最通用的方案
  2. RocketMQ提供的事务消息机制

​ 最后,我们还需要思考下事务消息的作用。

​ 大家想一下这个事务消息跟分布式事务有什么关系?为什么扯到了分布式事务相关的两阶段提交上了?事务消息只保证了发送者本地事务和发送消息这两个操作的原子性,但是并不保证消费者本地事务的原子性,所以,事务消息只保证了分布式事务的一半。但是即使这样,对于复杂的分布式事务,RocketMQ提供的事务消息也是目前业内最佳的降级方案。

  •  ACL权限控制

     ​ 权限控制(ACL)主要为RocketMQ提供Topic资源级别的用户访问控制。用户在使用RocketMQ权限控制时,可以在Client客户端通过 RPCHook注入AccessKey和SecretKey签名;同时,将对应的权限控制属性(包括Topic访问权限、IP白名单和AccessKey和SecretKey签名等)设置在$ROCKETMQ_HOME/conf/plain_acl.yml的配置文件中。Broker端对AccessKey所拥有的权限进行校验,校验不过,抛出异常; ACL客户端可以参考:org.apache.rocketmq.example.simple包下面的AclClient代码。

注意,如果要在自己的客户端中使用RocketMQ的ACL功能,还需要引入一个单独的依赖包

<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-acl</artifactId>
	<version>4.7.1</version>
</dependency>

​ 而Broker端具体的配置信息可以参见源码包下docs/cn/acl/user_guide.md。主要是在broker.conf中打开acl的标志:aclEnable=true。然后就可以用plain_acl.yml来进行权限配置了。并且这个配置文件是热加载的,也就是说要修改配置时,只要修改配置文件就可以了,不用重启Broker服务。我们来简单分析下源码中的plan_acl.yml的配置:

#全局白名单,不受ACL控制
#通常需要将主从架构中的所有节点加进来
globalWhiteRemoteAddresses:
- 10.10.103.*
- 192.168.0.*

accounts:
#第一个账户
- accessKey: RocketMQ
  secretKey: 12345678
  whiteRemoteAddress:
  admin: false 
  defaultTopicPerm: DENY #默认Topic访问策略是拒绝
  defaultGroupPerm: SUB #默认Group访问策略是只允许订阅
  topicPerms:
  - topicA=DENY #topicA拒绝
  - topicB=PUB|SUB #topicB允许发布和订阅消息
  - topicC=SUB #topicC只允许订阅
  groupPerms:
  # the group should convert to retry topic
  - groupA=DENY
  - groupB=PUB|SUB
  - groupC=SUB
#第二个账户,只要是来自192.168.1.*的IP,就可以访问所有资源
- accessKey: rocketmq2
  secretKey: 12345678
  whiteRemoteAddress: 192.168.1.*
  # if it is admin, it could access all resources
  admin: true

上面主要讲解了RocketMQ API开发主要使用的方法,以及Acl权限控制的讲解和配置。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RocketMQ第五篇 RocketMQ API基本使用 的相关文章

  • 消息中间件 RocketMQ 源码解析:Message拉取&消费(上)

    摘要 原创出处 http www iocoder cn RocketMQ message pull and consume first 芋道源码 欢迎转载 保留摘要 谢谢 本文主要基于 RocketMQ 4 0 x 正式版 1 概述 2 C
  • centos安装rocketmq

    centos安装rocketmq 1 下载rocketmq二进制包 2 解压二进制包 3 修改broker conf 4 修改runbroker sh和runserver sh的JVM参数 5 启动NameServer和Broker 6 安
  • 使用RabbitMQ实现延时队列

    之前公司是一个类电商公司 会有用户下单后未支付取消订单的场景 解决方案是使用RabbitMQ的死信队列来实现一个延时队列 下单时 将订单丢进消息队列 设置过期时间 订单失效时间 然后到时候检查订单状态 如果未支付则取消订单 1 什么是死信
  • OSAL

    OSAL为 Operating System Abstraction Layer 即操作系统抽象层 支持多任务运行 它并不是一个传统意义上的操作系统 但是实现了部分类似操作系统的功能 OSAL概念是由TI公司在ZIGBEE协议栈引入 他的意
  • 非阻塞的connect使用方式

    connect 函数的调用涉及到3次握手 默认connect函数为阻塞连接状态 通常connect 会阻塞到三次握手的完成和失败 而这个connect阻塞超时时间会依赖于系统 一般为75s到几分钟时间 一种方式可以通过该系统配置 proc
  • rocketmq顺序发送消息

    1 概念 严格顺序消息模式下 消费者收到的所有消息均是有顺序的 消息有序指的是可以按照消息的发送顺序来消费 FIFO RocketMQ可以严格的保证消息有序 可以分为分区有序或者全局有序 顺序消费的原理解析 在默认的情况下消息发送会采取Ro
  • 报错:ImportError: rocketmq dynamic library not found解决方法

    目录 一 ImportError rocketmq dynamic library not found 二 OSError librocketmq so cannot open shared object file No such file
  • RocketMQ第五篇 RocketMQ API基本使用

    目录 生产者Product 消费者Consumer 前面已经学习了Rocket的基本知识 以及搭建MQ单机版和集群环境 下面开始进行实际开发 根据前面下载的RocketMQ源码 开展讲解RocketMQ 的基本使用 生产者Product 在
  • 代码技巧——如何关闭订单?延迟任务的实现方案【建议收藏】

    先思考个问题 为什么要关闭订单 业务上 1 提供待付款时间 而不是简单的 一次付款机会 提高业务指标之一的成单率 成单率 成功下单的人数 发起支付的人数 2 下单成功意味着这个商品被当前订单占用 库存已经预扣减 如果迟迟不支付则需要回收库存
  • Kafka实战——简单易懂的生产者消费者demo

    单线程版本适合本地调试 多线程版本适合做压测 1 引入maven依赖
  • RocketMQ占用内存过大的解决方法

    目录 一 问题描述 二 解决方法 1 runserver sh 修改 2 runbroker sh 修改 一 问题描述 RocketMQ 启动后 一下子把内存撑爆了 二 解决方法 修改启动参数 分别对 bin 目录下的 runserver
  • 【Python】记录一次 Linux + Python + RocketMQ 辛酸历程

    文章目录 安装Python 准备环境 编译安装 遇到问题 安装openssl 重新编译 安装依赖库 准备代码 验证 这是记录一次辛酸的Linux Python RocketMQ使用历程 需求背景是需要验证线上一个RocketMQ服务和里面的
  • 5分钟学会RocketMQ

    RocketMQ 简介 RocketMQ 是一个队列模型的消息中间件 具有高性能 高可用 高实时等特性 它并不支持JMS java消息服务 规范 但参考了JMS规范和kafak等的思想 Producer Consumer 队列都可以分布式
  • docker安装rocketmq4.6.1(精简版)

    一 创建文件 mkdir p usr local rocketmq server logs usr local rocketmq server store usr local rocketmq broker logs usr local r
  • Apache RocketMQ 远程代码执行漏洞(CVE-2023-33246)

    漏洞简介 RocketMQ 5 1 0及以下版本 在一定条件下 存在远程命令执行风险 RocketMQ的NameServer Broker Controller等多个组件外网泄露 缺乏权限验证 攻击者可以利用该漏洞利用更新配置功能以Rock
  • springboot-rocketmq日志rocketmq_client.log问题

    问题描述 springboot配置rocketmq后 会写入日志到rocketmqlogs目录下的rocketmq client log文件中 且日志过于庞大 解决 1 启动类增加代码 System setProperty ClientLo
  • RocketMQ源码(26)—DefaultMQPushConsumer事务消息源码【一万字】

    事务消息是RocketMQ的一大特性 其被用来实现分布式事务 关于RocketMQ的事务消息的相关原理的介绍见这篇博客 RocketMQ的分布式事务机制 事务消息 关于事务消息的基本案例看这里 消息事务样例 本文主要介绍RocketMQ的事
  • RT-Thread记录(七、IPC机制之邮箱、消息队列)

    讲完了线程同步的机制 我们要开始线程通讯的学习 线程通讯中的邮箱消息队列也属于 RT Thread 的IPC机制 目录 前言 一 邮箱 1 1 邮箱控制块 1 2 邮箱操作 1 2 1 创建和删除 1 2 2 初始化和脱离 1 2 3 发送
  • 高可用:如何实现消息队列的 HA?

    管理学上有一个木桶理论 一只水桶能装多少水取决于它最短的那块木板 这个理论推广到分布式系统的可用性上 就是系统整体的可用性取决于系统中最容易出现故障 或者性能最低的组件 系统中的各个组件都要进行高可用设计 防止单点故障 消息队列也不例外 本
  • 为什么Thread.sleep(0)可以阻止rocketmq中的gc?

    最近我阅读了RocketMQ的源代码 但我无法理解这段代码 为什么这段代码可以阻止gc呢 https github com apache rocketmq blob master store src main java org apache

随机推荐

  • nodejs HelloWorld

    nodejs 服务器端 HelloWorld 程序 a hello js d02 hollo js var http require http http createServer function request response 请求对象
  • C++&Qt 各种数据类型转换

    1 uint64转QString QString strfilerename QString 1 arg nFileID nFileID为uint64类型 QString number nFileID 2 QString转超长数字串 QSt
  • 计算机图形学GAMES101(三)变换(模型、视图、投影)

    补充内容 R 是逆时针方向旋转的矩阵 R 是顺时针方向旋转的矩阵 可以发现R T R 1 像这样的矩阵叫做正交矩阵 以后如果要求往相反的方向旋转相同角度的变换 R 只需要求正向旋转的矩阵然后转置就可以了 本节涉及内容 仿射变换 线性变换 平
  • LeetCode-Python-389. 找不同

    给定两个字符串 s 和 t 它们只包含小写字母 字符串 t 由字符串 s 随机重排 然后在随机位置添加一个字母 请找出在 t 中被添加的字母 示例 输入 s abcd t abcde 输出 e 解释 e 是那个被添加的字母 第一种思路 转成
  • Java笔记:泛型、限定通配符与非限定通配符

    目录 1 泛型 2 限定通配符与非限定通配符 2 1 限定通配符 2 2 非限定通配符 3 PECS Producer Extends Consumer Super 原则 3 1 Producer Extends 3 2 Consumer
  • jar文件怎么打开 查看jar文件内容操作方法

    jar文件怎么打开 查看jar文件内容操作方法 jar文件是java项目生成的一个小的文件项目 也可以描述为一个java压缩包 里面封装了 许多java类以及方法 变量 很多用户想要查看jar文件内容 可是却不知道jar文件怎么打开 下面小
  • TorchServe环境构建+模型更新+新模型注册

    目录 1 背景 2 torchserve环境搭建 2 1jdk环境搭建 2 2 python 环境搭建 2 3 启动服务 2 3 1 注册模型 2 3 2 模型查看 2 3 3 接口调用 3 进阶功能 3 1 模型多版本管理 3 2 新模型
  • NLP神器Gensim库(一):入门操作

    Gensim是一款开源的第三方Python工具包 用于从原始的非结构化的文本中 无监督地学习到文本隐层的主题向量表达 它支持包括TF IDF LSA LDA 和word2vec在内的多种主题模型算法 支持流式训练 并提供了诸如相似度计算 信
  • 【值得收藏的种子搜索引擎】

    种子搜索引擎和磁力搜索引擎是用于搜索和下载种子文件和磁力链接的工具 本文将介绍五个值得收藏的子搜索引擎和磁力搜索引擎 并提供两个示例说明 BT Kitty BT Kitty是一个功能强大的子搜索引 可以搜索各种类型的种子文件和磁力链接 它的
  • nextjs开发 + vercel 部署 ssr ssg

    前言 最近想实践下ssr 就打算用nextjs 做一个人博客 vercel 部署 提供免费域名 来学习实践下ssr ssg nextjs 一个轻量级的react服务端渲染框架 vercel 由 Next js 的创建者制作 支持nextjs
  • FlinkCDC-自定义序列化器

    package com lcy app customer import com alibaba fastjson JSONObject import com alibaba ververica cdc debezium DebeziumDe
  • 【前端】React使用react-markdown+antd实现引入渲染markdown文件

    项目中遇见一个需求 要求直接在浏览器打开markdown文件进行预览 初次使用遇见一些坎坷 以下记录实现过程 将其封装成了一个组件 1 下载依赖 yarn add react markdown 其余样式插件 yarn add remark
  • centos7安装mysql5.7

    一 下载mysql5 7 1 下载地址 点击跳转 2 然后上传到服务器上面 解压命令 tar xvf mysql 5 7 36 1 el7 x86 64 rpm bundle tar 3 解压后得到以下的rpm包 4 依次安装所需要的rpm
  • 因为错误消息指示这是由于上一个问题导致的错误,没有写入 apport 报告。

    依赖关系问题 仍未被配置dpkg 依赖关系问题使得 smbclient 的配置工作不能继续 smbclient 依赖于 samba common 2 3 5 8 dfsg 1ubuntu2 3 然而 软件包 samba common 尚未配
  • ubuntu 安装 多版本 cuda 11.4 11.8

    显卡 rtx3060 笔记本已经安装了 cuda 11 4 和 对应的cudnn 现在想要安装 cuda 11 8 和 cudnn 8 8 原理 新的 driver 可以 兼容 旧的 cuda sdk 旧的 driver 不能 兼容 新的c
  • matlab神经网络工具箱的使用

    单变量 单变量取数据 data load ex1data1 txt X data 1 y data 2 多变量取数据 data load ex1data2 txt X data 1 2 y data 3 运行train后弹出 对应的图 比如
  • gradle 添加构建依赖项

    gradle 添加构建依赖项 参考 添加构建依赖项 利用 Android Studio 中的 Gradle 构建系统 您可以轻松地将外部二进制文件或其他库模块作为依赖项添加到您的 build 中 这些依赖项可位于您的计算机上或远程代码库中
  • prometheus监控示例

    prometheus架构图 prometheus 各组件介绍 Prometheus Server 使用pull方式采集监控数据 在该组件上配置监控数据的采集和告警规则 Client Library 客户端库 为需要监控的服务生成相应的 me
  • 安装Redhat

    1 新建虚拟机 选典型 2 下一步 选择稍后安装操作系统 3 下一步 选择Linux 版本选择Red Hat Enterprise 8 版本是什么就选择什么 4 下一步 设置虚拟机名称以及位置 5 下一步 设置虚拟机磁盘容量 6 下一步 点
  • RocketMQ第五篇 RocketMQ API基本使用

    目录 生产者Product 消费者Consumer 前面已经学习了Rocket的基本知识 以及搭建MQ单机版和集群环境 下面开始进行实际开发 根据前面下载的RocketMQ源码 开展讲解RocketMQ 的基本使用 生产者Product 在