rocketMq消息队列原生api使用以及rocketMq整合springboot

2023-11-11

rocketMq消息队列

一、RocketMQ原生API使用

使用RocketMQ的原生API开发是最简单也是目前看来最牢靠的方式。这里用SpringBoot来搭建一系列消息生产者和消息消费者,来访问之前搭建的RocketMQ集群。

1、测试环境搭建

首先创建一个基于Maven的SpringBoot工程,引入如下依赖:

<dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.7.1</version>
</dependency>

RocketMQ的官网上有很多经典的测试代码,这些代码虽然依赖的版本比较老,但是还是都可以运行的。以官网上的顺序进行学习。

2、RocketMQ的编程模型

然后RocketMQ的生产者和消费者的编程模型都是有个比较固定的步骤的,掌握这个固定的步骤,对于我们学习源码以及以后使用都是很有帮助的。

  • 消息发送者的固定步骤

    1.创建消息生产者producer,并制定生产者组名

    2.指定Nameserver地址

    3.启动producer

    4.创建消息对象,指定主题Topic、Tag和消息体

    5.发送消息

    6.关闭生产者producer

  • 消息消费者的固定步骤

    1.创建消费者Consumer,制定消费者组名

    2.指定Nameserver地址

    3.订阅主题Topic和Tag

    4.设置回调函数,处理消息

    5.启动消费者consumer

3、RocketMQ的消息样例

RocketMQ都支持类型的消息:

3.1 基本样例

同步发送:能够实时知道消息是否从生产者推送到broke

异步发送:能够异步回调的方式知道消息是否从生产者推送的broke

单向发送:消息发送出去就不管了,无法感知消息是否从生产者推送到broke的状态

基本样例部分我们使用消息生产者分别通过三种方式发送消息同步发送、异步发送以及单向发送

然后使用消费者来消费这些消息。

1、同步发送消息的样例见:org.apache.rocketmq.example.simple.Producer

2、异步发送消息的样例见:org.apache.rocketmq.example.simple.AsyncProducer

等待消息返回后再继续进行下面的操作。

这个示例有个比较有趣的地方就是引入了一个countDownLatch来保证所有消息回调方法都执行完了再关闭Producer。 所以从这里可以看出,RocketMQ的Producer也是一个服务端,在往Broker发送消息的时候也要作为服务端提供服务。

3、单向发送消息的样例:

public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);
        }
        //Wait for sending to complete
        Thread.sleep(5000);        
        producer.shutdown();
    }
}

关键点就是使用producer.sendOneWay方式来发送消息,这个方法没有返回值,也没有回调。就是只管把消息发出去就行了。

4、使用消费者消费消息
消费者消费消息有两种模式

一种是消费者主动去Broker上拉取消息的拉模式

另一种是消费者等待Broker把消息推送过来的推模式

拉模式的样例见:org.apache.rocketmq.example.simple.PullConsumer

推模式的样例见:org.apache.rocketmq.example.simple.PushConsumer

通常情况下,用推模式比较简单。

实际上RocketMQ的推模式也是由拉模式封装出来的。

4.7.1版本中DefaultMQPullConsumerImpl这个消费者类已标记为过期,但是还是可以使用的。替换的类是DefaultLitePullConsumerImpl。

3.2 顺序消息

顺序消息生产者样例见:org.apache.rocketmq.example.order.Producer

顺序消息消费者样例见:org.apache.rocketmq.example.order.Consumer

验证时,可以启动多个Consumer实例,观察下每一个订单的消息分配以及每个订单下多个步骤的消费顺序。
不管订单在多个Consumer实例之前是如何分配的,每个订单下的多条消息顺序都是固定从0~5的。

RocketMQ保证的是消息的局部有序,而不是全局有序。

mqs是什么(一个topic中对应的多个队列)。
再回看我们的样例,实际上,RocketMQ也只保证了每个OrderID的所有消息有序(发到了同一个queue),而并不能保证所有消息都有序。所以这就涉及到了RocketMQ消息有序的原理。要保证最终消费到的消息是有序的,需要从Producer、Broker、Consumer三个步骤都保证消息有序才行。

首先在发送者端:在默认情况下,消息发送者会采取Round Robin轮询方式把消息发送到不同的MessageQueue(分区队列),而消费者消费的时候也从多个MessageQueue上拉取消息,这种情况下消息是不能保证顺序的。而只有当一组有序的消息发送到同一个MessageQueue上时,才能利用MessageQueue先进先出的特性保证这一组消息有序。

而Broker中一个队列内的消息是可以保证有序的。

然后在消费者端:消费者会从多个消息队列上去拿消息。这时虽然每个消息队列上的消息是有序的,但是多个队列之间的消息仍然是乱序的。消费者端要保证消息有序,就需要按队列一个一个来取消息,即取完一个队列的消息后,再去取下一个队列的消息。而给consumer注入的MessageListenerOrderly对象,在RocketMQ内部就会通过锁队列的方式保证消息是一个一个队列来取的。MessageListenerConcurrently这个消息监听器则不会锁队列,每次都是从多个Message中取一批数据(默认不超过32条)。因此也无法保证消息有序。

3.3 广播消息

广播消息的消息生产者样例见:org.apache.rocketmq.example.broadcast.PushConsumer

广播消息并没有特定的消息消费者样例,这是因为这涉及到消费者的集群消费模式。在集群状态(MessageModel.CLUSTERING)下,每一条消息只会被同一个消费者组中的一个实例消费到(这跟kafka和rabbitMQ的集群模式是一样的)。而广播模式则是把消息发给了所有订阅了对应主题的消费者,而不管消费者是不是同一个消费者组。

3.4 延迟消息

延迟消息的生产者案例

public class ScheduledMessageProducer {
   
    public static void main(String[] args) throws Exception {
        // Instantiate a producer to send scheduled messages
        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        // Launch producer
        producer.start();
        int totalMessagesToSend = 100;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
            // This message will be delivered to consumer 10 seconds later.
            message.setDelayTimeLevel(3);
            // Send the message
            producer.send(message);
        }
   
        // Shutdown producer after use.
        producer.shutdown();
    }
       
}

延迟消息实现的效果就是在调用producer.send方法后,消息并不会立即发送出去,而是会等一段时间再发送出去。这是RocketMQ特有的一个功能。

那会延迟多久呢?延迟时间的设置就是在Message消息对象上设置一个延迟级别message.setDelayTimeLevel(3);

开源版本的RocketMQ中,对延迟消息并不支持任意时间的延迟设定(商业版本中支持),而是只支持18个固定的延迟级别,1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。这从哪里看出来的?其实从rocketmq-console控制台就能看出来。而这18个延迟级别也支持自行定义,不过一般情况下最好不要自定义修改。

那这么好用的延迟消息是怎么实现的?这18个延迟级别除了在延迟消息中用,还有什么地方用到了?别急,我们会在后面部分进行详细讲解。

3.5 批量消息

批量消息是指将多条消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。

批量消息的消息生产者样例见:org.apache.rocketmq.example.batch.SimpleBatchProducer和org.apache.rocketmq.example.batch.SplitBatchProducer

相信大家在官网以及测试代码中都看到了关键的注释:如果批量消息大于1MB就不要用一个批次发送,而要拆分成多个批次消息发送。也就是说,一个批次消息的大小不要超过1MB

实际使用时,这个1MB的限制可以稍微扩大点,实际最大的限制是4194304字节,大概4MB。但是使用批量消息时,这个消息长度确实是必须考虑的一个问题。而且批量消息的使用是有一定限制的,这些消息应该有相同的Topic,相同的waitStoreMsgOK。而且不能是延迟消息、事务消息等。

3.6 过滤消息

在大多数情况下,可以使用Message的Tag属性来简单快速的过滤信息。

使用Tag过滤消息的消息生产者案例见:org.apache.rocketmq.example.filter.TagFilterProducer

使用Tag过滤消息的消息消费者案例见:org.apache.rocketmq.example.filter.TagFilterConsumer

主要是看消息消费者。consumer.subscribe(“TagFilterTest”, “TagA || TagC”); 这句只订阅TagA和TagC的消息。

TAG是RocketMQ中特有的一个消息属性。RocketMQ的最佳实践中就建议,使用RocketMQ时,一个应用可以就用一个Topic,而应用中的不同业务就用TAG来区分。

但是,这种方式有一个很大的限制,就是一个消息只能有一个TAG,这在一些比较复杂的场景就有点不足了。 这时候,可以使用SQL表达式来对消息进行过滤。

SQL过滤的消息生产者案例见:org.apache.rocketmq.example.filter.SqlFilterProducer

SQL过滤的消息消费者案例见:org.apache.rocketmq.example.filter.SqlFilterConsumer

,>=,<,<=,BETWEEN,=;**
* 字符比较,比如:=,<>,IN;
* IS NULL 或者 IS NOT NULL;
* 逻辑符号 AND,OR,NOT;

常量支持类型为:

* 数值,比如:123,3.1415;
* 字符,比如:‘abc’,必须用单引号包裹起来;
* NULL,特殊的常量
* 布尔值,TRUEFALSE

使用注意:只有推模式的消费者可以使用SQL过滤。拉模式是用不了的。

大家想一下,这个消息过滤是在Broker端进行的还是在Consumer端进行的?

3.7 事务消息

这个事务消息是RocketMQ提供的一个非常有特色的功能,需要着重理解。

首先,我们了解下什么是事务消息。官网的介绍是:事务消息是在分布式系统中保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两个操作的原子性,也就是这两个操作一起成功或者一起失败。

其次,我们来理解下事务消息的编程模型。事务消息只保证消息发送者的本地事务与发消息这两个操作的原子性,因此,事务消息的示例只涉及到消息发送者,对于消息消费者来说,并没有什么特别的。

事务消息生产者的案例见:org.apache.rocketmq.example.transaction.TransactionProducer

事务消息的关键是在TransactionMQProducer中指定了一个TransactionListener事务监听器,这个事务监听器就是事务消息的关键控制器。源码中的案例有点复杂,我这里准备了一个更清晰明了的事务监听器示例

public class TransactionListenerImpl implements TransactionListener {
  //在提交完事务消息后执行。
  //返回COMMIT_MESSAGE状态的消息会立即被消费者消费到。
  //返回ROLLBACK_MESSAGE状态的消息会被丢弃。
  //返回UNKNOWN状态的消息会由Broker过一段时间再来回查事务的状态。
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String tags = msg.getTags();
        //TagA的消息会立即被消费者消费到
        if(StringUtils.contains(tags,"TagA")){
            return LocalTransactionState.COMMIT_MESSAGE;
        //TagB的消息会被丢弃
        }else if(StringUtils.contains(tags,"TagB")){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        //其他消息会等待Broker进行事务状态回查。
        }else{
            return LocalTransactionState.UNKNOW;
        }
    }
  //在对UNKNOWN状态的消息进行状态回查时执行。返回的结果是一样的。
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    String tags = msg.getTags();
        //TagC的消息过一段时间会被消费者消费到
        if(StringUtils.contains(tags,"TagC")){
            return LocalTransactionState.COMMIT_MESSAGE;
        //TagD的消息也会在状态回查时被丢弃掉
        }else if(StringUtils.contains(tags,"TagD")){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        //剩下TagE的消息会在多次状态回查后最终丢弃
        }else{
            return LocalTransactionState.UNKNOW;
        }
    }
}

然后,我们要了解下事务消息的使用限制:

1、事务消息不支持延迟消息和批量消息。

2、为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。

回查次数是由BrokerConfig.transactionCheckMax这个参数来配置的,默认15次,可以在broker.conf中覆盖。
然后实际的检查次数会在message中保存一个用户属性MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES。这个属性值大于transactionCheckMax,就会丢弃。 这个用户属性值会按回查次数递增,也可以在Producer中自行覆盖这个属性。

​ 3、事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。

由BrokerConfig.transactionTimeOut这个参数来配置。默认6秒,可以在broker.conf中进行修改。
另外,也可以给消息配置一个MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS属性来给消息指定一个特定的消息回查时间。
msg.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, “10000”); 这样就是10秒。

4、事务性消息可能不止一次被检查或消费。

5、提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。

6、事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

接下来,我们还要了解下事务消息的实现机制,参见下图:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-E53Lro88-1688484594136)(image/image_C0hrAMgJzB.png)]

事务消息机制的关键是在发送消息时,会将消息转为一个half半消息,并存入RocketMQ内部的一个 RMQ_SYS_TRANS_HALF_TOPIC 这个Topic,这样对消费者是不可见的。再经过一系列事务检查通过后,再将消息转存到目标Topic,这样对消费者就可见了。

最后,我们还需要思考下事务消息的作用。

大家想一下这个事务消息跟分布式事务有什么关系?为什么扯到了分布式事务相关的两阶段提交上了?事务消息只保证了发送者本地事务和发送消息这两个操作的原子性,但是并不保证消费者本地事务的原子性,所以,事务消息只保证了分布式事务的一半。但是即使这样,对于复杂的分布式事务,RocketMQ提供的事务消息也是目前业内最佳的降级方案

3.8 ACL权限控制

权限控制(ACL)主要为RocketMQ提供Topic资源级别的用户访问控制。用户在使用RocketMQ权限控制时,可以在Client客户端通过 RPCHook注入AccessKey和SecretKey签名;同时,将对应的权限控制属性(包括Topic访问权限、IP白名单和AccessKey和SecretKey签名等)设置在$ROCKETMQ_HOME/conf/plain_acl.yml的配置文件中。Broker端对AccessKey所拥有的权限进行校验,校验不过,抛出异常; ACL客户端可以参考:org.apache.rocketmq.example.simple包下面的AclClient代码。

注意,如果要在自己的客户端中使用RocketMQ的ACL功能,还需要引入一个单独的依赖包

<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.7.1</version>
</dependency>

​ 而Broker端具体的配置信息可以参见源码包下docs/cn/acl/user_guide.md。主要是在broker.conf中打开acl的标志:aclEnable=true。然后就可以用plain_acl.yml来进行权限配置了。并且这个配置文件是热加载的,也就是说要修改配置时,只要修改配置文件就可以了,不用重启Broker服务。我们来简单分析下源码中的plan_acl.yml的配置:

#全局白名单,不受ACL控制
#通常需要将主从架构中的所有节点加进来
globalWhiteRemoteAddresses:
- 10.10.103.*
- 192.168.0.*

accounts:
#第一个账户
- accessKey: 自定义1
  secretKey: 自定义1
  whiteRemoteAddress:
  admin: false 
  defaultTopicPerm: DENY #默认Topic访问策略是拒绝
  defaultGroupPerm: SUB #默认Group访问策略是只允许订阅
  topicPerms:
  - topicA=DENY #topicA拒绝
  - topicB=PUB|SUB #topicB允许发布和订阅消息
  - topicC=SUB #topicC只允许订阅
  groupPerms:
  # the group should convert to retry topic
  - groupA=DENY
  - groupB=PUB|SUB
  - groupC=SUB
#第二个账户,只要是来自192.168.1.*的IP,就可以访问所有资源
- accessKey: 自定义2
  secretKey: 自定义2
  whiteRemoteAddress: 192.168.1.*
  # if it is admin, it could access all resources
  admin: true

二、SpringBoot整合RocketMQ

1、快速实战

这部分我们看下SpringBoot如何快速集成RocketMQ。

在使用SpringBoot的starter集成包时,要特别注意版本。因为SpringBoot集成RocketMQ的starter依赖是由Spring社区提供的,目前正在快速迭代的过程当中,不同版本之间的差距非常大,甚至基础的底层对象都会经常有改动。例如如果使用rocketmq-spring-boot-starter:2.0.4版本开发的代码,升级到目前最新的rocketmq-spring-boot-starter:2.1.1后,基本就用不了了。

我们创建一个maven工程,引入关键依赖:

<dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-webmvc</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.1.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
    </dependencies>

rocketmq-spring-boot-starter:2.1.1引入的SpringBoot包版本是2.0.5.RELEASE,这里把SpringBoot的依赖包升级了一下。

然后我们以SpringBoot的方式,快速创建一个简单的Demo

启动类:

@SpringBootApplication
public class RocketMQScApplication {

    public static void main(String[] args) {
        SpringApplication.run(RocketMQScApplication.class,args);
    }
}

配置文件 application.properties

#NameServer地址
rocketmq.name-server=192.168.232.128:9876
#默认的消息生产者组
rocketmq.producer.group=springBootGroup

消息生产者

package com.roy.rocket.basic;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;

/**
 * @author :
 * @date :Created in 2020/10/22
 * @description:
 **/
@Component
public class SpringProducer {

    @Resource
    private RocketMQTemplate rocketMQTemplate;
  //发送普通消息的示例
    public void sendMessage(String topic,String msg){
        this.rocketMQTemplate.convertAndSend(topic,msg);
    }
  //发送事务消息的示例
    public void sendMessageInTransaction(String topic,String msg) throws InterruptedException {
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            Message<String> message = MessageBuilder.withPayload(msg).build();
            String destination =topic+":"+tags[i % tags.length];
            SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message,destination);
            System.out.printf("%s%n", sendResult);

            Thread.sleep(10);
        }
    }
}

消息消费者

package com.roy.rocket.basic;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * @author :
 * @date :Created in 2020/10/22
 * @description:
 **/
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
public class SpringConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("Received message : "+ message);
    }
}

SpringBoot集成RocketMQ,消费者部分的核心就在这个@RocketMQMessageListener注解上。所有消费者的核心功能也都会集成到这个注解中。所以我们还要注意下这个注解里面的属性:

例如:消息过滤可以由里面的selectorType属性和selectorExpression来定制

消息有序消费还是并发消费则由consumeMode属性定制。

消费者是集群部署还是广播部署由messageModel属性定制。

然后关于事务消息,还需要配置一个事务消息监听器:

package com.roy.rocket.config;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.StringMessageConverter;

import java.util.concurrent.ConcurrentHashMap;

/**
 * @author :
 * @date :Created in 2020/11/5
 * @description:
 **/

@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionImpl implements RocketMQLocalTransactionListener {

    private ConcurrentHashMap<Object, String> localTrans = new ConcurrentHashMap<>();
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        Object id = msg.getHeaders().get("id");
        String destination = arg.toString();
        localTrans.put(id,destination);
        org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),"UTF-8",destination, msg);
        String tags = message.getTags();
        if(StringUtils.contains(tags,"TagA")){
            return RocketMQLocalTransactionState.COMMIT;
        }else if(StringUtils.contains(tags,"TagB")){
            return RocketMQLocalTransactionState.ROLLBACK;
        }else{
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        //SpringBoot的消息对象中,并没有transactionId这个属性。跟原生API不一样。
//        String destination = localTrans.get(msg.getTransactionId());
        return RocketMQLocalTransactionState.COMMIT;
    }
}

这样我们启动应用后,就能够通过访问 http://localhost:8080/MQTest/sendMessage?message=123 接口来发送一条简单消息。并在SpringConsumer中消费到。

也可以通过访问http://localhost:8080/MQTest/sendTransactionMessage?message=123 ,来发送一条事务消息。

这里可以看到,对事务消息,SpringBoot进行封装时,就缺少了transactionId,这在事务控制中是非常关键的。

2、其他更多消息类型:

对于其他的消息类型,文档中就不一一记录了。具体可以参见源码中的junit测试案例。

3、总结:

  • SpringBoot 引入org.apache.rocketmq:rocketmq-spring-boot-starter依赖后,就可以通过内置的RocketMQTemplate来与RocketMQ交互。相关属性都以rockemq.开头。具体所有的配置信息可以参见org.apache.rocketmq.spring.autoconfigure.RocketMQProperties这个类。
  • SpringBoot依赖中的Message对象和RocketMQ-client中的Message对象是两个不同的对象,这在使用的时候要非常容易弄错。例如RocketMQ-client中的Message里的TAG属性,在SpringBoot依赖中的Message中就没有。Tag属性被移到了发送目标中,与Topic一起,以Topic:Tag的方式指定。
  • 最后强调一次,一定要注意版本。rocketmq-spring-boot-starter的更新进度一般都会略慢于RocketMQ的版本更新,并且版本不同会引发很多奇怪的问题。apache有一个官方的rocketmq-spring示例,rocketmq-spring.git 以后如果版本更新了,可以参考下这个示例代码。

三、SpringCloudStream整合RocketMQ

SpringCloudStream是Spring社区提供的一个统一的消息驱动框架,目的是想要以一个统一的编程模型来对接所有的MQ消息中间件产品。我们还是来看看SpringCloudStream如何来集成RocketMQ。

1、快速实战

创建Maven工程,引入依赖:

<dependencies>
    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.7.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-acl</artifactId>
      <version>4.7.1</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
      <version>2.2.3.RELEASE</version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-client</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-acl</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
      <version>2.3.3.RELEASE</version>
    </dependency>
  </dependencies>

应用启动类:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;

/**
 * @author :
 * @date :Created in 2020/10/22
 * @description:
 **/
@EnableBinding({Source.class, Sink.class})
@SpringBootApplication
public class ScRocketMQApplication {

    public static void main(String[] args) {
        SpringApplication.run(ScRocketMQApplication.class,args);
    }
}

注意这个@EnableBinding({Source.class, Sink.class})注解,这是SpringCloudStream引入的Binder配置。

然后增加配置文件application.properties

#ScStream通用的配置以spring.cloud.stream开头
spring.cloud.stream.bindings.input.destination=TestTopic
spring.cloud.stream.bindings.input.group=scGroup
spring.cloud.stream.bindings.output.destination=TestTopic
#rocketMQ的个性化配置以spring.cloud.stream.rocketmq开头
#spring.cloud.stream.rocketmq.binder.name-server=192.168.232.128:9876;192.168.232.129:9876;192.168.232.130:9876
spring.cloud.stream.rocketmq.binder.name-server=192.168.232.128:9876

SpringCloudStream中,一个binding对应一个消息通道。这其中配置的input,是在Sink.class中定义的,对应一个消息消费者。而output,是在Source.class中定义的,对应一个消息生产者。

然后就可以增加消息消费者:

package com.roy.scrocket.basic;

import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;

/**
 * @author :
 * @date :Created in 2020/10/22
 * @description:
 **/
@Component
public class ScConsumer {

    @StreamListener(Sink.INPUT)
    public void onMessage(String messsage){
        System.out.println("received message:"+messsage+" from binding:"+ Sink.INPUT);
    }
}

消息生产者:

package com.roy.scrocket.basic;

import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

/**
 * @author :
 * @date :Created in 2020/10/22
 * @description:
 **/
@Component
public class ScProducer {

    @Resource
    private Source source;

    public void sendMessage(String msg){
        Map<String, Object> headers = new HashMap<>();
        headers.put(MessageConst.PROPERTY_TAGS, "testTag");
        MessageHeaders messageHeaders = new MessageHeaders(headers);
        Message<String> message = MessageBuilder.createMessage(msg, messageHeaders);
        this.source.output().send(message);
    }
}

最后增加一个Controller类用于测试:

package com.roy.scrocket.controller;

import com.roy.scrocket.basic.ScProducer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @author :
 * @date :Created in 2020/10/27
 * @description:
 **/
@RestController
@RequestMapping("/MQTest")
public class MQTestController {

    @Resource
    private ScProducer producer;
    @RequestMapping("/sendMessage")
    public String sendMessage(String message){
        producer.sendMessage(message);
        return "消息发送完成";
    }
}

启动应用后,就可以访问http://localhost:8080/MQTest/sendMessage?message=123,给RocketMQ发送一条消息到TestTopic,并在ScConsumer中消费到了。

2、总结

  • 关于SpringCloudStream。这是一套几乎通用的消息中间件编程框架,例如从对接RocketMQ换到对接Kafka,业务代码几乎不需要动,只需要更换pom依赖并且修改配置文件就行了。但是,由于各个MQ产品都有自己的业务模型,差距非常大,所以使用使用SpringCloudStream时要注意业务模型转换。并且在实际使用中,要非常注意各个MQ的个性化配置属性。例如RocketMQ的个性化属性都是以spring.cloud.stream.rocketmq开头,只有通过这些属性才能用上RocketMQ的延迟消息、排序消息、事务消息等个性化功能。
  • SpringCloudStream是Spring社区提供的一套统一框架,但是官方目前只封装了kafka、kafka Stream、RabbitMQ的具体依赖。而RocketMQ的依赖是交由厂商自己维护的, 也就是由阿里巴巴自己来维护。这个维护力度显然是有不小差距的。所以一方面可以看到之前在使用SpringBoot时着重强调的版本问题,在使用SpringCloudStream中被放大了很多。spring-cloud-starter-stream-rocketmq目前最新的2.2.3.RELEASE版本中包含的rocketmq-client版本还是4.4.0。这个差距就非常大了。另一方面,RocketMQ这帮大神不屑于写文档的问题也特别严重,SpringCloudStream中关于RocketMQ的个性化配置几乎很难找到完整的文档。
  • 总之,对于RocketMQ来说SpringCloudStream目前来说还并不是一个非常好的集成方案。这方面跟kafka和Rabbit还没法比。所以使用时要慎重
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

rocketMq消息队列原生api使用以及rocketMq整合springboot 的相关文章

  • 学习笔记(5):MySQL数据库从入门到实战应用-数据完整性

    立即学习 https edu csdn net course play 27328 362521 utm source blogtoedu 实体完整性 要求每张表都有唯一标识符 每张表主键字段不为空且不能重复 唯一性约束 主键约束 标识列
  • MVC框架增删改查

    mvc对表单内容的增删改查 1 首先把所需的包导入项目内 2 连接数据库的帮助类DBAccess package com Liuyujian Dao import java io InputStream import java sql Co
  • 邮件服务器-postfix服务器

    Postfix 是一种电子邮件服务器 它是由任职于IBM华生研究中心 T J Watson Research Center 的荷兰籍研究员Wietse Venema为了改良sendmail邮件服务器而产生的 最早在1990年代晚期出现 是一
  • Java设计模式之策略模式+工厂模式(反射和注解)

    现在我们有一个需求 我们通常的实现方式是这样的 假设有3种会员 分别为会员 超级会员以及金牌会员和普通顾客 针对不同类别的会员 有不同的打折方式 并且一个顾客每消费10000就增加一个级别 以上四种级别分别采用原价 普通顾客 九折 会员 八
  • [Android] Toast问题深度剖析(二)

    欢迎大家前往云 社区 获取更多腾讯海量技术实践干货哦 作者 QQ音乐技术团队 题记 Toast 作为 Android 系统中最常用的类之一 由于其方便的api设计和简洁的交互体验 被我们所广泛采用 但是 伴随着我们开发的深入 Toast 的
  • ORA-00936: missing expression

    关注微信公共号 小程在线 关注CSDN博客 程志伟的博客 造成这个错误的原因是 选取的最后一个字段与from之间有逗号 解决方法 将字段与from之间的逗号去掉

随机推荐

  • Spring MVC使用JSON的过程与步骤

    活动地址 CSDN21天学习挑战赛 目录 JSON数据交互 RESTful支持 JSON数据交互 1 用eclipse创建一个动态web项目 将项目依赖的jar包放到lib目录下 2 在WEB INF目录下创建web xml 对Spring
  • 关于JPEG的那点事儿:JPEG原理篇

    前言 本文其实于差不多正好1年前写成 是关于JPEG的那点事儿的补充 但是由于实战篇一直烂尾 拖到现在 前几天看到Google发了个JPEG新算法 说是可以将JPEG的体积同质量情况下再压缩35 突然想起了这文了 为了说清楚Google为什
  • python题目55:单词接龙

    单词接龙的规则是 可用于接龙的单词首字母必须要与前一个单词的尾字母相同 当存在多个首字母相同的单词时 取长度最长的单词 如果长度也相等则取词典序最小的单词 已经参与接龙的单词不能重复使用 现给定一组全部由小写字母组成的单词数组 并指定其中的
  • 勿以专家自居

    对于权威 我心存芥蒂 我在 StrongOpinions Weakly Held 观点鲜明 但不固执己见 一文中曾经说过 当我了解到别人把我视为专家或者权威 而不是像伙伴一样的志趣相投者时 我就会觉得非常困扰 如果非要说我在迄今为止的职业生
  • PCL学习之点云可视化:坐标字段、随机、单一颜色、法向量

    pcl中几种常见的点云渲染方式 1 颜色区别深度 此方法在PointCloudColorHandlerGenericField类中实现 该将不同的深度值显示为不同的颜色 实现以颜色区分深度的目的 PointCloudColorHandler
  • TCP/IP校验和计算算法

    ICMP IP UDP TCP报头部分都有checksum 检验和 字段 ICMP和IP报头校验和的计算都很简单 过程如下 1 把校验和字段置为0 2 对IP头部中的每16bit进行二进制求和 3 如果和的高16bit不为0 则将和的高16
  • ubuntu16.04\18.04安装Azure Kinect SDK+配置ros版 超全详细踩坑记录

    一些参考 1 官网教程Azure Kinect Sensor SDK 官网教程Azure Kinect ROS Driver 2 Azure Kinect SDK Ubuntu 16 04 18 04安装配置方法 3 ubuntu16 04
  • 无监督学习分类

    把输入数据看成一个行 m 为特征 列 N 为样本的矩阵 则从数据角度 可以将无监督学习分为三类 将数据按列划分 即将相似的样本聚到同类 即对数据进行聚类 代表算法k means 层次聚类 将数据按行划分 把高维空间的向量转化到低维空间的向量
  • 《吃透 MQ 系列》之Kafka精妙的高性能设计(下篇)

    在 上一篇文章 中 指出了高性能设计的两个关键维度 计算和 IO 可以将它们理解成 道 同时给出了 Kafka 高性能设计的全景图 可以理解成 术 图 1 Kafka 高性能设计的全景图 这篇文章将继续对存储消息和消费消息的 8 条高性能设
  • 基于C语言的栈

    基于王道数据结构 include
  • 开源静态代码检测工具Splint

    如果想用一个有效的工具察看C C 源代码中的错误 遗漏 不确定的构建过程 以及移植问题等等 你应该来看看Lint 可以把Lint当成一个编译器 除了不产生代码之外 对于错误和警告的报告来说已经非常足够了 通常 一个C C 的编译器假设程序是
  • Java实现人脸登录、注册等功能【完整版】

    推荐 前些天发现了一个巨牛的人工智能学习网站 通俗易懂 风趣幽默 忍不住分享一下给大家 点击跳转到网站 前言 这段时间由于学校实行静态化管理 寝室门和校门都是用了人脸识别的装置 每次经过都会激发我的好奇心 也想自己搞一个人脸识别玩玩 随着开
  • python机器学习 transform,fit_transform

    首先使用transfer StandardScaler 来实例化一个转换器 我们要对训练集和测试集进行相同的归一化 标准化处理 先处理训练集 x train transfer fit transform x train fit transf
  • 【纯干货】学python的,这些能快速月入过万的兼职途径,你不会还不知道吧

    我想辞职 在这个疫情当下的时代 许多打工人都有过这么一个想法 或许是因为工作待遇 亦或许是其他原因 但是却仍然屹立在工位上 有的甚至天天喊辞职 月月拿满勤 这是为什么呢 因为他们虽然无数次筹谋辞职 却也无数次的担心裸辞之后的压力 而作为平平
  • Hyper Terminal 配置体验分享

    Hyper Terminal 简介 Hyper is an Electron based terminal Built on HTML CSS JS Fully extensible 以上内容来自Hyper Terminal官网对该终端的介
  • 基于卷积神经网络-门控循环单元(CNN-GRU)多输入多输出预测,CNN-GRU回归预测。

    清空环境变量 warning off 关闭报警信息 close all 关闭开启的图窗 clear 清空变量 clc 清空命令行 导入数据 res xlsread 数据 xlsx 数据分析 num size 0 8 训练集占数据集比例 ou
  • vue解决弹出图片显示在弹框下方

    弹出的图片显示在弹框下面怎么办 问题来源 问题分析 解决方法 问题来源 在写前端vue项目时 在用到ele的 el image 这个组件时 有时会出现图片显示在弹框即dialog下面 后面发现是因为el image组件 默认的z index
  • 【ffmpeg基础】ffmpeg的下载安装

    一 ffmpeg的下载 1 ffmpeg github下载路径 https github com FFmpeg FFmpeg git 在ffmpeg的github上可以下载任意版本的源码 比如最新的matser上的源码 以及各个分支上 如f
  • unity 屏幕虚拟键盘

    工作上碰到许多程序需要用到键盘输入功能 调用的电脑自带键盘使用也不方便 自己写的一个键盘工具 功能 键盘大小写状态监测 设置了输入法提示词位置的定位 定位根据屏幕分辨率设置 故编辑器模式下位置有偏移 可自行调整 工具连接 https dow
  • rocketMq消息队列原生api使用以及rocketMq整合springboot

    rocketMq消息队列 文章目录 rocketMq消息队列 一 RocketMQ原生API使用 1 测试环境搭建 2 RocketMQ的编程模型 3 RocketMQ的消息样例 3 1 基本样例 3 2 顺序消息 3 3 广播消息 3 4