Rabbitmq 入门概念
首先来介绍下Rabbitmq的一些概念:
- Producer:生产者,生产者负责发送信息(messages)
- Queue:队列,队列是RabbitMQ中的信箱,唯一区别是信箱里的是信件,而队列中的是数据
- Consuming:消费者,消费者负责接受消息。
这三个概念就足以支撑MQ的一个简单模式了。请看如下代码:
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
}
}
这里的Connection 对象负责和mq联络,因此我们需要告诉创建Connection对象的工厂mq的地址(Host)。
Cannel是大多数完成工作用到的API所在的位置。
queueDeclare 负责声明一个队列,队列的概念上面已经解释过了。
如果我们想发送消息,只需要调用basicPublish方法即可:
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
工作队列模式:
Work Queue
当实现了简单的发送和接受后,我们来看工作队列方式:
我们的消费者可能不止一个,这种时候mq将消息分给消费者。
分的方式就有说法了,默认情况下,mq采用Round-robin方式分发消息。
比如有1 2 3 4 5 6 7 8条消息,两个消费者,那么1 3 5 7 会分给消费者1,而2 4 6 8分给消费者2.
这是一种很平均的方式,但也会有问题,比如1 3 5 7这几个任务每个都很耗时,而2 4 6 8不耗时,这就导致了消费者2处于一种很闲而消费者1很忙的情况。
好在有一个参数可以调节mq的工作方式,这个参数用于worker中,worker通过channel调用basicQos方法,
该方法可以指定我这个工作者每次取几个消息,例如设置为1,那么当我还有消息没有处理完的时候,就不能再往我这里发送消息。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
mq怎么知道你是否处理完了?
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
autoAck置为true是,表示消费者一收到消息就立马返回ack,mq就会认为你已经处理完了。
如果你不喜欢这种方式,想要求当我真正处理完了,觉得OK了才真正的告诉mq我处理完了。这需要将autoAck置为false。
具体如下:
channel.basicQos(1); // accept only one unack-ed message at a time (see below)
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
注意basicAck这个方法,这里就是自定义位置来回应mq,告诉mq我已经处理完了。
忘记回应ack是很危险的!
忘记回复ACK很可能造成内存泄漏,而且mq默认的工作方式就是autoAck = false的。意味着你不回复ack,mq就不会删除这个消息,而且mq的消息是没有超时限制的,所以会一直保存。
后面会继续介绍发布订阅模式~
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)