1、客户端配置
相对于RocketMQ的Broker集群,生产者和消费者都是客户端。
2、客户端寻址方式
RocketMQ可以令客户端找到Name Server, 然后通过Name Server再找到Broker。如下所示有多种配置方式,优先级由高到低,高优先级会覆盖低优先级。
- 代码中指定Name Server地址,多个namesrv地址之间用分号分割
producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876
export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
客户端启动后,会定时访问一个静态HTTP服务器,地址如下:http://jmenv.tbsite.net:8080/rocketmq/nsaddr,这个URL的返回内容如下:
192.168.0.1:9876;192.168.0.2:9876
客户端默认每隔2分钟访问一次这个HTTP服务器,并更新本地的Name Server地址。URL已经在代码中硬编码,可通过修改/etc/hosts文件来改变要访问的服务器,例如在/etc/hosts增加如下配置:
10.232.22.67 jmenv.tbsite.net
推荐使用HTTP静态服务器寻址方式,好处是客户端部署简单,且Name Server集群可以热升级。
3、客户端配置
DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPullConsumer都继承于ClientConfig类,ClientConfig为客户端的公共配置类。客户端的配置都是get、set形式,每个参数都可以用spring来配置,也可以在代码中配置,例如namesrvAddr这个参数可以这样配置,producer.setNamesrvAddr("192.168.0.1:9876"),其他参数同理。
3.1 客户端的公共配置
参数名 |
默认值 |
说明 |
namesrvAddr |
|
Name Server地址列表,多个NameServer地址用分号隔开 |
clientIP |
本机IP |
客户端本机IP地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定 |
instanceName |
DEFAULT |
客户端实例名称,客户端创建的多个Producer、Consumer实际是共用一个内部实例(这个实例包含网络连接、线程资源等) |
clientCallbackExecutorThreads |
4 |
通信层异步回调线程数 |
pollNameServerInteval |
30000 |
轮询Name Server间隔时间,单位毫秒 |
heartbeatBrokerInterval |
30000 |
向Broker发送心跳间隔时间,单位毫秒 |
persistConsumerOffsetInterval |
5000 |
持久化Consumer消费进度间隔时间,单位毫秒 |
3.2 Producer配置
参数名 |
默认值 |
说明 |
producerGroup |
DEFAULT_PRODUCER |
Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组 |
createTopicKey |
TBW102 |
在发送消息时,自动创建服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。 |
defaultTopicQueueNums |
4 |
在发送消息,自动创建服务器不存在的topic时,默认创建的队列数 |
sendMsgTimeout |
3000 |
发送消息超时时间,单位毫秒 |
compressMsgBodyOverHowmuch |
4096 |
消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节 |
retryAnotherBrokerWhenNotStoreOK |
FALSE |
如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送 |
retryTimesWhenSendFailed |
2 |
如果消息发送失败,最大重试次数,该参数只对同步发送模式起作用 |
maxMessageSize |
4MB |
客户端限制的消息大小,超过报错,同时服务端也会限制,所以需要跟服务端配合使用。 |
transactionCheckListener |
|
事务消息回查监听器,如果发送事务消息,必须设置 |
checkThreadPoolMinSize |
1 |
Broker回查Producer事务状态时,线程池最小线程数 |
checkThreadPoolMaxSize |
1 |
Broker回查Producer事务状态时,线程池最大线程数 |
checkRequestHoldMax |
2000 |
Broker回查Producer事务状态时,Producer本地缓冲请求队列大小 |
RPCHook |
null |
该参数是在Producer创建时传入的,包含消息发送前的预处理和消息响应后的处理两个接口,用户可以在第一个接口中做一些安全控制或者其他操作。 |
3.3 PushConsumer配置
参数名 |
默认值 |
说明 |
consumerGroup |
DEFAULT_CONSUMER |
Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 |
messageModel |
CLUSTERING |
消费模型支持集群消费和广播消费两种 |
consumeFromWhere |
CONSUME_FROM_LAST_OFFSET |
Consumer启动后,默认从上次消费的位置开始消费,这包含两种情况:一种是上次消费的位置未过期,则消费从上次中止的位置进行;一种是上次消费位置已经过期,则从当前队列第一条消息开始消费 |
consumeTimestamp |
半个小时前 |
只有当consumeFromWhere值为CONSUME_FROM_TIMESTAMP时才起作用。 |
allocateMessageQueueStrategy |
AllocateMessageQueueAveragely |
Rebalance算法实现策略 |
subscription |
|
订阅关系 |
messageListener |
|
消息监听器 |
offsetStore |
|
消费进度存储 |
consumeThreadMin |
20 |
消费线程池最小线程数 |
consumeThreadMax |
20 |
消费线程池最大线程数 |
consumeConcurrentlyMaxSpan |
2000 |
单队列并行消费允许的最大跨度 |
pullThresholdForQueue |
1000 |
拉消息本地队列缓存消息最大数 |
pullInterval |
0 |
拉消息间隔,由于是长轮询,所以为0,但是如果应用为了流控,也可以设置大于0的值,单位毫秒 |
consumeMessageBatchMaxSize |
1 |
批量消费,一次消费多少条消息 |
pullBatchSize |
32 |
批量拉消息,一次最多拉多少条 |
3.4 PullConsumer配置
参数名 |
默认值 |
说明 |
consumerGroup |
DEFAULT_CONSUMER |
Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 |
brokerSuspendMaxTimeMillis |
20000 |
长轮询,Consumer拉消息请求在Broker挂起最长时间,单位毫秒 |
consumerTimeoutMillisWhenSuspend |
30000 |
长轮询,Consumer拉消息请求在Broker挂起超过指定时间,客户端认为超时,单位毫秒 |
consumerPullTimeoutMillis |
10000 |
非长轮询,拉消息超时时间,单位毫秒 |
messageModel |
BROADCASTING |
消息支持两种模式:集群消费和广播消费 |
messageQueueListener |
|
监听队列变化 |
offsetStore |
|
消费进度存储 |
registerTopics |
|
注册的topic集合 |
allocateMessageQueueStrategy |
AllocateMessageQueueAveragely |
Rebalance算法实现策略 |
3.5 Message数据结构
字段名 |
默认值 |
说明 |
Topic |
null |
必填,消息所属topic的名称 |
Body |
null |
必填,消息体 |
Tags |
null |
选填,消息标签,方便服务器过滤使用。目前只支持每个消息设置一个tag |
Keys |
null |
选填,代表这条消息的业务关键词,服务器会根据keys创建哈希索引,设置后,可以在Console系统根据Topic、Keys来查询消息,由于是哈希索引,请尽可能保证key唯一,例如订单号,商品Id等。 |
Flag |
0 |
选填,完全由应用来设置,RocketMQ不做干预 |
DelayTimeLevel |
0 |
选填,消息延时级别,0表示不延时,大于0会延时特定的时间才会被消费 |
WaitStoreMsgOK |
TRUE |
选填,表示消息是否在服务器落盘后才返回应答。 |
参考:
https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md