simple模型是RabbitMQ队列模型中最简单的一个模型。如图:
“P”是我们的生产者(producer),“C”是我们的消费者(consumer)。中间的红色框是队列(Queue),代表消费者的消息缓冲区即是存放消息的地方。
生产者负责发送消息到队列当中,而消费者则负责从队列中获取消息。这就是三者间的关系。
1.1 新建项目,并导入RabbitMQ相关jar包。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
1.2 生产者如何发送消息到队列当中?
1.2.1 与RabbitMQ服务器获取连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection conn = factory.newConnection();
1.2.2 通过Connection获取通道(Channel)
Channel channel = conn.createChannel();
1.2.3 通过通道声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
参数1为队列的名称,可自行定义,其余参数后期再做描述
1.2.4 此时即可向队列中发送消息
String message = "Hello World......";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
参数2为定义的队列的名称。最后一个参数为需要发送的消息的二进制数据
1.2.5 关闭通道和连接
channel.close();
conn.close();
源码:
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author YKun
* @date 2017年4月23日 上午12:32:11
* @describe
*/
public class Send {
private static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World......";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
conn.close();
}
}
结果如下:
点击该队列查看详情可获取发送的消息文本,如图:
1.3 消费者如何获取消息
前三个步骤与发送者一样
1.3.1 与RabbitMQ服务器获取连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection conn = factory.newConnection();
1.3.2 通过Connection获取通道(Channel)
Channel channel = conn.createChannel();
1.3.3 通过通道声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
1.3.4 定义一个默认的消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
覆写掉handleDelivery方法,方法体可自定义,上面方法体里面的代码即为接受消息的代码,可参照。
1.3.5 对队列实施监控
channel.basicConsume(QUEUE_NAME, true, consumer);
源码:
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* @author YKun
* @date 2017年4月23日 上午12:37:51
* @describe
*/
public class Recv {
private static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
结果如图:
以上为simple模型的简单演示。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)