Spring Boot集成RocketMQ全部种类消息实现+生产者和消费者配置信息介绍 内含5.x新增可自定义时间的定时/延时消息
- 前言
- 添加POM依赖
- 添加application.yml配置信息
- 创建公共示例对象(只看demo可忽略)
- 消费者相关介绍
- ACK机制介绍
- @RocketMQMessageListener介绍
-
- RocketMQListener<T>接口介绍
- 泛型问题
- 使用MessageExt(可获取完整消息对象:消息体、消息ID、topic、queueId等)
- 使用UserDTO(不需要完整消息对象直接使用消息体类型)
- 发送单向消息
-
- 发送同步消息(响应值为void)
-
- 发送同步消息(响应值为SendResult可以获取消息ID等信息)
-
- 发送异步消息
-
- 发送使用延时等级的延时消息(这种是4.x的延时消息5.x也能使用)
-
- 发送定时消息可指定时间(5.x新增)
-
- 发送顺序消息
- 使用RocketMQTemplate直接发送
- 使用原生DefaultMQProducer发送
- 公共消费者
- 发送带有Tag的消息
-
- 发送事务消息
-
- 消息消费重试问题
-
- 核心参数介绍
-
前言
这里不对RocketMQ做介绍是完全的功能实现没有废话,这里使用@RocketMQMessageListener注解+RocketMQListener接口实现,会对@RocketMQMessageListener注解中的参数做详细介绍。
需要RocketMQ安装教程可以参考:
Docker部署RocketMQ5.x
docker-compose部署RocketMQ5.x
添加POM依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.3.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--要使用RocketMQ5.x的自定义时间延时消息必须要使用2.2.3及以上的版本-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.74</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.2.5</version>
</dependency>
</dependencies>
添加application.yml配置信息
server:
port: 8888
rocketmq:
# name-server服务地址多个用;隔开 例如127.0.0.1:9876;127.0.0.1:9877
name-server: 192.168.10.220:9876
producer: # 生产者配置
group: group1 # 生产者分组
send-message-timeout: 3000 # 消费者发送消息超时时间单位毫秒
创建公共示例对象(只看demo可忽略)
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserDTO {
private String id;
private String nickname;
private Integer age;
private LocalDateTime sendTime;
}
消费者相关介绍
因为这里用的rocketmq-spring-boot-starter包,那么消费者的核心就在于@RocketMQMessageListener注解和RocketMQListener接口,这里会对这这两个部分做一些使用上的说明。
ACK机制介绍
- RocketMQ提供了ack机制(默认是手动ack),以保证消息能够被正常消费。为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功,然后删除消息。中途断电,抛出异常等都不会认为成功。
- 这里使用的rocketmq-spring-boot-starter包算是自动ACK了吧,使用@RocketMQMessageListener注解标记一个实现RocketMQListener接口的消费者类并且实现onMessage方法,只要这个onMessage方法执行不抛出异常则由代理类自动进行ACK处理,如果抛出异常则确认失败消息会继续重试消费。
@RocketMQMessageListener介绍
参数介绍
RocketMQListener接口介绍
泛型问题
使用MessageExt(可获取完整消息对象:消息体、消息ID、topic、queueId等)
如果需要获取获取除了消息体以外的信息可以使用MessageExt作为RocketMQListener的泛型,在做消息幂等时可能会需要使用到消息ID。
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "message-ext-group-01", topic = "message-ext-topic-01")
public class MessageExtComsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
UserDTO userDTO = JSON.parseObject(body, UserDTO.class);
log.info("收到消息: msgId={} topic={} queueId={} body={}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getQueueId(), userDTO);
}
}
使用UserDTO(不需要完整消息对象直接使用消息体类型)
直接使用发送消息时的body传入类型对象会自动序列化成对应类型,不需要完整消息对象直接使用消息体类型是一个很不错的选择,下面例子中大部分都是使用这种方式。
发送单向消息
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
生产者
@Slf4j
@Component
public class OnewayMessageProducer {
@Autowired
RocketMQTemplate rocketMQTemplate;
public void send() {
String id = UUID.randomUUID().toString();
int age = RandomUtil.randomInt(100);
UserDTO userVo = new UserDTO(id , "kerwin"+age, age, LocalDateTime.now());
log.info("开始发送消息 userVo = {}", userVo);
rocketMQTemplate.sendOneWay("simple-sync-topic-01", MessageBuilder.withPayload(userVo).build());
log.info("发送消息成功 userVo = {}",userVo);
}
}
消费者
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "one-way-group-01", topic = "one-way-topic-01")
public class OnewayMessageConsumer implements RocketMQListener<UserDTO> {
@Override
public void onMessage(UserDTO message) {
log.info("收到消息: {}", message);
}
}
发送同步消息(响应值为void)
简单同步消息响应参数为void,如果发送成功则会打印发送消息成功日志,如果发送失败会抛出特定异常
生产者
@Slf4j
@Component
public class SimpleSyncMessageProducer {
@Autowired
RocketMQTemplate rocketMQTemplate;
public void send() {
String id = UUID.randomUUID().toString();
int age = RandomUtil.randomInt(100);
UserDTO userVo = new UserDTO(id , "kerwin"+ges, age, LocalDateTime.now());
log.info("开始发送消息 userVo = {}",userVo);
rocketMQTemplate.send("simple-sync-topic-01", MessageBuilder.withPayload(userVo).build());
log.info("发送消息成功 userVo = {}",userVo);
}
}
消费者
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "simple-sync-group-01", topic = "simple-sync-topic-01")
public class SimpleSyncMessageConsumer implements RocketMQListener<UserDTO> {
@Override
public void onMessage(UserDTO message) {
log.info("收到消息: {}", message);
}
}
发送同步消息(响应值为SendResult可以获取消息ID等信息)
生产者
@Slf4j
@Component
public class SimpleSyncMessageProducer {
@Autowired
RocketMQTemplate rocketMQTemplate;
public void send() {
String id = UUID.randomUUID().toString();
int age = RandomUtil.randomInt(100);
UserDTO userVo = new UserDTO(id , "kerwin"+age, age, LocalDateTime.now());
log.info("开始发送消息 userVo = {}",userVo);
SendResult sendResult = rocketMQTemplate.syncSend("simple-sync-topic-01", MessageBuilder.withPayload(userVo).build());
log.info("发送消息成功 userVo = {}",userVo);
}
}
消费者
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "simple-sync-group-01", topic = "simple-sync-topic-01")
public class SimpleSyncMessageConsumer implements RocketMQListener<UserDTO> {
@Override
public void onMessage(UserDTO message) {
log.info("收到消息: {}", message);
}
}
发送异步消息
生产者
@Slf4j
@Component
public class SimpleSyncMessageProducer {
@Autowired
RocketMQTemplate rocketMQTemplate;
public void send() {
String id = UUID.randomUUID().toString();
int age = RandomUtil.randomInt(100);
UserDTO userVo = new UserDTO(id , "kerwin"+age, age, LocalDateTime.now());
log.info("开始发送消息 userVo = {}",userVo);
rocketMQTemplate.asyncSend("simple-async-topic-01", MessageBuilder.withPayload(userVo).build(),new SendCallback(){
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送消息成功 sendResult = {}",sendResult);
}
@Override
public void onException(Throwable throwable) {
log.info("发送消息失败 errorMessage = {}",throwable.getMessage());
throwable.printStackTrace();
}
});
}
}
消费者
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "simple-async-group-01", topic = "simple-async-topic-01")
public class SimpleAsyncMessageConsumer implements RocketMQListener<UserDTO> {
@Override
public void onMessage(UserDTO message) {
log.info("收到消息: {}", message);
}
}
发送使用延时等级的延时消息(这种是4.x的延时消息5.x也能使用)
该延时消息使用延时等级来控制延时消息发送时间,默认18个等级使用是等级从1开始,可以在borker修改延时等级配置messageDelayLevel ,在前言的安装文档中有borker配置参数介绍,不过不建议修改因为消息重试机制也是使用的messageDelayLevel作为重试间隔时间。
生产者
@Slf4j
@Component
public class DelayLevelMessageProducer {
@Autowired
RocketMQTemplate rocketMQTemplate;
public void send() {
String id = UUID.randomUUID().toString();
int age = RandomUtil.randomInt(100);
UserDTO userVo = new UserDTO(id , "kerwin"+age, age, LocalDateTime.now());
log.info("开始发送消息 userVo = {}", userVo);
rocketMQTemplate.syncSend( "delay-level-topic-01",
MessageBuilder.withPayload(userVo).build(),
3000,
3 );
log.info("发送消息成功 userVo = {}",userVo);
}
}
消费者
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "delay-level-group-01", topic = "delay-level-topic-01")
public class DelayLevelMessageConsumer implements RocketMQListener<UserDTO> {
@Override
public void onMessage(UserDTO message) {
log.info("收到消息: {}", message);
}
}
发送定时消息可指定时间(5.x新增)
延时消息存在着一些不足:
- 1.延时级别只有 18 个,并不能满足所有场景;
- 2.如果通过修改 messageDelayLevel 配置来自定义延时级别,并不灵活,比如一个在大规模的平台上,延时级别成百上千,而且随时可能增加新的延时时间;
- 3.延时时间不准确,后台的定时线程可能会因为处理消息量大导致延时误差大。
为了弥补延时消息的不足,RocketMQ 5.0 引入了定时消息,可指定消息消费时间。
生产者
@Slf4j
@Component
public class DeliverTimeMillsMessageProducer {
@Autowired
RocketMQTemplate rocketMQTemplate;
public void send() {
String id = UUID.randomUUID().toString();
int age = RandomUtil.randomInt(100);
UserDTO userVo = new UserDTO(id , "kerwin"+age, age, LocalDateTime.now());
log.info("开始发送消息 userVo = {}", userVo);
rocketMQTemplate.syncSendDeliverTimeMills( "deliver-time-mills-topic-01",
MessageBuilder.withPayload(userVo).build(),
System.currentTimeMillis() + 3000);
log.info("发送消息成功 userVo = {}",userVo);
}
}
消费者
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "deliver-time-mills-group-01", topic = "deliver-time-mills-topic-01")
public class DeliverTimeMillsMessageConsumer implements RocketMQListener<UserDTO> {
@Override
public void onMessage(UserDTO message) {
log.info("收到消息: {}", message);
}
}
发送顺序消息
- 概念:RocketMQ的Topic在创建时默认会生成4个队列,只能保证在一个队列中保证顺序消息,通过消息的id hash取模获取这条消息应该储存在那个队列中,并且必须使用同步消息,消费者采用ConsumeMode.ORDERLY(同一个队列单线程消费)模式消费,对于指定的一个Topic,每个队列的消息按照严格的先入先出(FIFO)的顺序来发布和消费。
- 适用场景:适用于性能要求不高,所有的消息严格按照FIFO原则来发布和消费的场景。
使用RocketMQTemplate直接发送
RocketMQTemplate提供的syncSendOrderly方法第三个参数会作为计算存储在那一个队列的条件,会根据第三个参数hash取模获取对应队列下标,自己测试时可以将id值固定查看是否都投递到同一个队列中。
@Slf4j
@Component
public class OrderlyMessageProducer {
@Autowired
RocketMQTemplate rocketMQTemplate;
public void send() {
String id = UUID.randomUUID().toString();
int age = RandomUtil.randomInt(100);
UserDTO userVo = new UserDTO(id , "kerwin"+age, age, LocalDateTime.now());
log.info("开始发送消息 userVo = {}", userVo);
rocketMQTemplate.syncSendOrderly("orderly-topic-01",userVo,id);
log.info("发送消息成功 userVo = {}",userVo);
}
}
使用原生DefaultMQProducer发送
- 使用该方法投递消息可以自定义投递算法,也可以使用这种方式发送普通消息自定义消息投递算法。
@Slf4j
@Component
public class OrderlyMessageProducer {
@Autowired
RocketMQTemplate rocketMQTemplate;
@SneakyThrows
public void send() {
DefaultMQProducer producer = rocketMQTemplate.getProducer();
String id = UUID.randomUUID().toString();
int age = RandomUtil.randomInt(100);
UserDTO userVo = new UserDTO(id, "kerwin" + age, age, LocalDateTime.now());
Message message = new Message("orderly-topic-01", JSON.toJSONBytes(userVo));
producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
int queueIndex = Math.abs(o.hashCode() % list.size());
return list.get(queueIndex);
}
}, id);
}
}
公共消费者
参数介绍:
- consumeMode:消费模型 默认是consumeMode.CONCURRENTLY每个队列都是多线程消费,这里设置为consumeMode = ConsumeMode.ORDERLY:topic中每个队列单线程消费 但是topic中每一个队列可以有一个线程同时消费,需要当前消息被消费完成ACK后才会消费下一条消息。
- consumeThreadNumber(5.x新参数):消费线程数 默认值为20,如果是单机消费建议配置成和队列数相同数量一个topic默认是4个队列这里配置成4个线程,如果是两个消费者建议配置成2,每台消费者消费两个队列。
- consumeThreadMax(4.x参数):最大线程数 默认值为64,配置消费线程数量和consumeThreadNumber配置一致,在5.x官方已经不推荐使用,该消费线程池使用LinkedBlockingQueue作为任务队列默认容量有限为(Integer.MAX_VALUE)。
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "orderly-group-01", topic = "orderly-topic-01", consumeThreadNumber = 4, consumeMode = ConsumeMode.ORDERLY)
public class OrderlyMessageConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
UserDTO userDTO = JSON.parseObject(body, UserDTO.class);
log.info("收到消息: threadName={} msgId={} topic={} queueId={} body={}", Thread.currentThread().getName(), messageExt.getMsgId(), messageExt.getTopic(), messageExt.getQueueId(), userDTO);
}
}
发送带有Tag的消息
- Tag,即消息标签,用于对某个Topic下的消息进行分类。消息队列RocketMQ版的生产者在发送消息时,已经指定消息的Tag,消费者需根据已经指定的Tag来进行订阅。
生产者
@Slf4j
@Component
public class TagMessageProducer {
@Autowired
RocketMQTemplate rocketMQTemplate;
public void send() {
String id = UUID.randomUUID().toString();
int age = RandomUtil.randomInt(100);
UserDTO userVo = new UserDTO(id , "kerwin"+age, age, LocalDateTime.now());
log.info("开始发送消息 userVo = {}",userVo);
rocketMQTemplate.send("tag-topic-01"+":"+"tag-01", MessageBuilder.withPayload(userVo).build());
rocketMQTemplate.send("tag-topic-01"+":"+"tag-02", MessageBuilder.withPayload(userVo).build());
rocketMQTemplate.send("tag-topic-01"+":"+"tag-03", MessageBuilder.withPayload(userVo).build());
log.info("发送消息成功 userVo = {}",userVo);
}
}
消费者
selectorType:控制如何选择消息 默认SelectorType.TAG,使用标记选择
selectorExpression :控制可以选择的消息 默认是*,消费全部的消息,也可以指定多个tag以||隔开
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "tag-group-01", topic = "tag-topic-01",selectorType = SelectorType.TAG,selectorExpression = "tag-01||tag-03")
public class TagMessageConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
String tags = messageExt.getTags();
switch (tags) {
case "tag-01":
log.info("tag-01 天下无敌");
break;
case "tag-02":
log.info("tag-02 一人之下");
break;
case "tag-03":
log.info("tag-03 三楼C");
break;
default:
log.info("tags不存在溜了溜了");
break;
}
String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
UserDTO userDTO = JSON.parseObject(body, UserDTO.class);
log.info("收到消息: tags={} msgId={} topic={} queueId={} body={}", tags, messageExt.getMsgId(), messageExt.getTopic(), messageExt.getQueueId(), userDTO);
}
}
发送事务消息
这里的事务消息只做简单示例不做过多介绍,实际结合业务场景使用和这个演示有本质的区别,需要查看实际业务场景代码实现可跳转下面博客:
https://blog.csdn.net/weixin_44606481/article/details/129903032
生产者
@Slf4j
@Component
public class TransationMessageProducer {
@Autowired
RocketMQTemplate rocketMQTemplate;
public void send() {
String msg = "发送事务消息"+RandomUtil.randomInt(100);
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("transation-topic-01", MessageBuilder.withPayload(userVo).build(), null);
String transactionId = result.getTransactionId();
String status = result.getSendStatus().name();
log.info("发送消息成功 userVo = {} transactionId={} status={} ",userVo,transactionId,status);
}
}
生产者消息监听器
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
int index=2;
switch (index){
case 1:
String jsonStr = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
log.info("本地事务回滚,回滚消息,"+jsonStr);
return RocketMQLocalTransactionState.ROLLBACK;
case 2:
log.info("需要等待Broker进行事务状态回查");
return RocketMQLocalTransactionState.UNKNOWN;
default:
log.info("事务提交,消息正常处理");
return RocketMQLocalTransactionState.COMMIT;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String transactionId = message.getHeaders().get("__transactionId__").toString();
log.info("检查本地事务状态,transactionId:{}", transactionId);
return RocketMQLocalTransactionState.COMMIT;
}
消费者
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "transation-group-01", topic = "transation-topic-01")
public class TransationMessageConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
UserDTO userDTO = JSON.parseObject(body, UserDTO.class);
log.info("收到消息: msgId={} topic={} queueId={} body={}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getQueueId(), userDTO);
}
}
消息消费重试问题
- 消费重试前提条件
- 只有在消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,广播消息是不会重试的
- 消费失败/消费超时(默认15分钟)或者直接响应ConsumeConcurrentlyStatus.RECONSUME_LATER
- 重试机制和限制
- 如果消费者在消费时抛出异常或者返回ConsumeConcurrentlyStatus.RECONSUME_LATER,会将这个消息放入一个重试队列%RETRY%topic,重试间隔时间会使用RocketMQ Broker中的messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h参数,默认重试16次如果没有成功会将这个消息放入死信队列%DLQ%topic中。
消费者代码演示
messageModel:默认值也是MessageModel.CLUSTERING可以不用额外配置
maxReconsumeTimes:重试次数默认-1,在MessageModel.CLUSTERING模式中,-1表示16,消费失败后会重试16次,在MessageModel.BROADCASTING模式中,-1表示整数Integer.MAX_VALUE
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "reconsume-group-01", topic = "reconsume-topic-01",messageModel = MessageModel.CLUSTERING,maxReconsumeTimes = 2)
public class ReconsumeMessageConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
int reconsumeTimes = messageExt.getReconsumeTimes();
String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
UserDTO userDTO = JSON.parseObject(body, UserDTO.class);
log.info("收到消息: reconsumeTimes={} msgId={} topic={} queueId={} body={}",reconsumeTimes, messageExt.getMsgId(), messageExt.getTopic(), messageExt.getQueueId(), userDTO);
if(1==1){
throw new RuntimeException();
}
}
}
消费重试次数用完会投递到死信队列中,topic名称规则为%DLQ%+consumerGroup名称
死信队列消费代码演示
- 死信特性
- 死信消息具有以下特性:
- 不会再被消费者正常消费。
- 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。
- 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列,一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "DLQ-reconsume-group-01", topic = "%DLQ%reconsume-group-01")
public class DLQMessageConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
UserDTO userDTO = JSON.parseObject(body, UserDTO.class);
log.info("收到消息: msgId={} topic={} queueId={} body={}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getQueueId(), userDTO);
}
}
核心参数介绍
生产者
rocketmq:
# name-server服务地址多个用;隔开
name-server: 127.0.0.1:9876;127.0.0.1:9877
producer:
#生产者组名,规定在一个应用里面必须唯一
group: group1
#消息发送的超时时间 默认3000ms
send-message-timeout: 3000
#消息达到4096字节的时候,消息就会被压缩。默认 4096
compress-message-body-threshold: 4096
#最大的消息限制,默认为4M
max-message-size: 4194304
#同步消息发送失败重试次数默认为3
retry-times-when-send-failed: 3
#在内部发送失败时是否重试其他代理,这个参数在有多个broker时才生效
retry-next-server: true
#异步消息发送失败重试的次数
retry-times-when-send-async-failed: 3
消费者
因为这里用的rocketmq-spring-boot-starter 2.2.3包,那么消费者的核心就在于@RocketMQMessageListener注解,所有可配置参数都在这个注解中,这里对@RocketMQMessageListener注解中配置信息做详细介绍
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {
String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
String consumerGroup();
String topic();
SelectorType selectorType() default SelectorType.TAG;
String selectorExpression() default "*";
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
MessageModel messageModel() default MessageModel.CLUSTERING;
@Deprecated
int consumeThreadMax() default 64;
int consumeThreadNumber() default 20;
int maxReconsumeTimes() default -1;
long consumeTimeout() default 15L;
int replyTimeout() default 3000;
String accessKey() default ACCESS_KEY_PLACEHOLDER;
String secretKey() default SECRET_KEY_PLACEHOLDER;
boolean enableMsgTrace() default false;
String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
String nameServer() default NAME_SERVER_PLACEHOLDER;
String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
String tlsEnable() default "false";
String namespace() default "";
int delayLevelWhenNextConsume() default 0;
int suspendCurrentQueueTimeMillis() default 1000;
int awaitTerminationMillisWhenShutdown() default 1000;
String instanceName() default "DEFAULT";
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)