RabbitMq工作模型
- Mq基础
- RbbitMq工作模型
- RabbitMq基本使用
- 原生api
- Spring集成
- Springboot集成
- RabbitMq进阶知识
-
- 总结
Mq基础
message queue 消息队列
特点:
1、独立部署,解耦
2、数据结构是队列,FIFO
3、具有发布订阅模型
为什么使用MQ:
1、异步
2、解耦
3、削峰
4、能广播通信
带来的问题:
1、增加运维成本
2、系统可用性降低
3、系统复杂性提高
AMQP
AMQP协议,所有的MQ都遵循这个协议
RbbitMq工作模型
Broker
服务器
Connection
生产者和消费者都和Broker建立连接,这是一个TCP长链接
Channel
消息通道,为了减少TCP长链接的创建和释放
Queue
消息存放的地方,队列其实有自己的数据库
Consumer
消费者消费有两种模式:
pull 消费者主动拉取,消息存放咋服务端
push 推送给消费者,消息存放在消费端
RabbitMq两种都实现了,kafka和RocketMq只实现了pull
消费者-队列 多对多,一般使用时一个消费者只取一个队列的消息
Vhost
虚拟主机,不同的系统可以使用不同的vhost,建自己的用户,交换机和队列
rabitmq安装时有默认的vhost,名字是 "/"
Exchange 交换机
负责分发消息,交换机跟队列有绑定关系
交换机有三种
1、direct 直联,类似于配置写死 basicPublish("交换机名","binding-key",message)
2、topic主题,模糊匹配,#和*,#是不限制,*是代表一个单词
3、fanout广播,不需要binding-key,给所有队列发
RabbitMq基本使用
原生api
交换机有三种类型,channel.exchangeDeclare 时可以创建三种交换机
消费者
配置ip
端口:5672
虚拟机:VHost
设置用户密码:
建立链接:
创建消息通道
创建交换机:可以创建三种
声明队列
绑定交换机和队列
创建消费者
获取消息
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"direct",false, false, null);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" Waiting for message....");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"gupao.best");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException
{
String msg = new String(body, "UTF-8");
System.out.println("Received message : '" + msg + "'");
System.out.println("consumerTag : " + consumerTag );
System.out.println("deliveryTag : " + envelope.getDeliveryTag() );
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
生产者
配置ip
端口:5672
虚拟机:VHost
设置用户密码:
建立链接:
创建消费通道
发送消息:basicPublish(交换机,bindling-key,消息)
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
String msg = "Hello world, Rabbit MQ,111";
channel.basicPublish(EXCHANGE_NAME, "gupao.best", null, msg.getBytes());
channel.close();
conn.close();
Spring集成
依赖
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
基于xml配置rabbitMq的依赖关系
<rabbit:connection-factory id="connectionFactory" virtual-host="/" username="guest" password="guest" host="127.0.0.1" port="5672" />
<rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
<rabbit:queue name="MY_FIRST_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />
<rabbit:direct-exchange name="MY_DIRECT_EXCHANGE" durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="MY_FIRST_QUEUE" key="FirstKey">
</rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="MY_DIRECT_EXCHANGE" />
<bean id="messageReceiver" class="com.gupaoedu.consumer.FirstConsumer"></bean>
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="MY_FIRST_QUEUE" ref="messageReceiver" />
</rabbit:listener-container>
消费者
public class ThirdConsumer implements MessageListener {
private Logger logger = LoggerFactory.getLogger(ThirdConsumer.class);
public void onMessage(Message message) {
logger.info("The third cosumer received message : " + message);
}
}
发送消息
@Autowired
@Qualifier("amqpTemplate")
private AmqpTemplate amqpTemplate;
.....
amqpTemplate.convertAndSend("FirstKey", "[Direct,FirstKey] "+message);
amqpTemplate.convertAndSend("SecondKey", "[Direct,SecondKey] "+message);
Springboot集成
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置
@Configuration
public class RabbitConfig {
@Bean("topicExchange")
public TopicExchange getTopicExchange(){
return new TopicExchange("TOPIC_EXCHANGE");
}
@Bean("fanoutExchange")
public FanoutExchange getFanoutExchange(){
return new FanoutExchange("FANOUT_EXCHANGE");
}
@Bean("firstQueue")
public Queue getFirstQueue(){
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl",6000);
Queue queue = new Queue("FIRST_QUEUE", false, false, true, args);
return queue;
}
@Bean("secondQueue")
public Queue getSecondQueue(){
return new Queue("SECOND_QUEUE");
}
@Bean("thirdQueue")
public Queue getThirdQueue(){
return new Queue("THIRD_QUEUE");
}
@Bean
public Binding bindSecond(@Qualifier("secondQueue") Queue queue,@Qualifier("topicExchange") TopicExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("#.gupao.#");
}
@Bean
public Binding bindThird(@Qualifier("thirdQueue") Queue queue,@Qualifier("fanoutExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
}
消费者
@Component
@RabbitListener(queues = "FIRST_QUEUE")
public class FirstConsumer {
@RabbitHandler
public void process(String msg){
System.out.println(" first queue received msg : " + msg);
}
}
生产消息
@Component
public class MyProvider {
@Autowired
AmqpTemplate amqpTemplate;
public void send(){
amqpTemplate.convertAndSend("","FIRST_QUEUE","-------- a direct msg");
amqpTemplate.convertAndSend("TOPIC_EXCHANGE","shanghai.gupao.teacher","-------- a topic msg : shanghai.gupao.teacher");
amqpTemplate.convertAndSend("TOPIC_EXCHANGE","changsha.gupao.student","-------- a topic msg : changsha.gupao.student");
amqpTemplate.convertAndSend("FANOUT_EXCHANGE","","-------- a fanout msg");
}
}
RabbitMq进阶知识
订单延迟关闭
1、入库,定时扫描,太low
2、利用rabbitMq的死信队列 TTL
下单的时候,同时发一条消息到mq,30分钟后消费这条消息
1、消息设置过期时间,死性交换机,消息过期了,把消息转到死性交换机-死性队列上,一个队列只有一个死性交换机
2、队列设置过期时间,如果队列跟消息过期时间都有,取小的一个
3、延迟投递交换机
流程:生产者-原交换机-原队列(超时之后)-----死信交换机----死信队列----最终消费者
死信交换机缺点
1、如果用队列设置死信时间,存在不同时间的消息,需要很多的交换机和队列
2、如果设置消息的TTL,可能会造成阻塞,前一条是10秒,后一条是3秒,第二条无法投递
3、存在时间偏差
使用插件时间延迟队列功能,声明x-delayed-message的类型,所以交换机可以有4种
队列满了
队列有俩属性控制长度
x-max-length:存储最大消息数,超过数量,消息被丢弃
x-max-length-bytes:存储最大消息容量,超过容量,消息丢弃
内存控制
检测物理内存数值,超过设置比例后,报警
磁盘超过设置容量后,触发流控措施
消费端限流
basciQos(2)超过2条消息没有发送ACK,不再接受消息
总结
支持多客户端,主流语言都支持客户端实现
灵活的路由:通过交换机实现消息的灵活路由
权限管理:用户和虚拟机
支持插件拓展
与spring集成
高可靠性
集群与拓展性
高可用队列
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)