https://www.cnblogs.com/wyy1234/p/10868416.html
1 消息确认
1 生产者端消息确认(tx机制和Confirm模式)
2 消费者端消息确认(自动确认和显示确认)
2 消息持久化/优先级
1 消息持久化(Persistent)
2 消息优先级(Priority)
消息确认
在一些场合,如转账、付费时每一条消息都必须保证成功的被处理。AMQP是金融级的消息队列协议,有很高的可靠性,这里介绍在使用RabbitMQ时怎么保证消息被成功处理的。消息确认可以分为两种:一种是生产者发送消息到Broke时,Broker给生产者发送确认回执,用于告诉生产者消息已被成功发送到Broker;一种是消费者接收到Broker发送的消息时,消费者给Broker发送确认回执,用于通知消息已成功被消费者接收。
下边分别介绍生产者端和消费者端的消息确认方法
生产者端消息确认(tx机制和Confirm模式)
生产者端的消息确认:当生产者将消息发送给Broker,Broker接收到消息给生产者发送确认回执。生产者端的消息确认有两种方式:tx机制和Confirm模式。
1.tx机制
tx机制可以叫做事务机制,RabbitMQ中有三个与tx机制的方法:txSelect(), txCommit()和txRollback()。 channel.txSelect() 用于将当前channel设置成transaction模式, channel.txCommit() 提交事务, channel.txRollback() 回滚事务。使用tx机制,我们首先要通过txSelect方法开启事务,然后发布消息给broker服务器了,如果txCommit提交成功了,则说明消息成功被broker接收了;如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们可以捕获异常,通过txRollback回滚事务。看一个tx机制的简单实现:
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "xchen32",
Password = "rockwell123@RA"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
Console.WriteLine("生产者准备就绪....");
string message = "";
while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
{
message = Console.ReadLine();
var body = Encoding.UTF8.GetBytes(message);
channel.QueueDeclare("hello", true, false, false, null);
channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout,durable:true);
var props = channel.CreateBasicProperties();
props.Persistent = true;
try
{
channel.TxSelect();
channel.BasicPublish(exchange: "logs",
routingKey: "hello",
basicProperties: props,
body: body);
channel.TxCommit();
Console.WriteLine($"【{message}】发送到Broke成功!");
}
catch (Exception)
{
Console.WriteLine($"【{message}】发送到Broker失败!");
channel.TxRollback();
}
}
}
}
Console.ReadKey();
}
2 Confirm模式
C#的RabbitMQ API中,有三个与Confirm相关的方法:ConfirmSelect(),WaitForConfirms()和WaitForConfirmOrDie。 channel.ConfirmSelect() 表示开启Confirm模式; channel.WaitForConfirms() 等待所有消息确认,如果所有的消息都被服务端成功接收返回true,只要有一条没有被成功接收就返回false。 channel.WaitForConfirmsOrDie() 和WaitForConfirms作用类型,也是等待所有消息确认,区别在于该方法没有返回值(Void),如果有任意一条消息没有被成功接收,该方法会立即抛出一个OperationInterrupedException类型异常。看一个Confirm模式的简单实现:
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "wyy",
Password = "123321"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
Console.WriteLine("生产者准备就绪....");
string message = "";
while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
{
message = Console.ReadLine();
var body = Encoding.UTF8.GetBytes(message);
channel.ConfirmSelect();
channel.BasicPublish(exchange: "myexchange",
routingKey: "mykey",
basicProperties: null,
body: body);
if (channel.WaitForConfirms())
{
Console.WriteLine($"【{message}】发送到Broke成功!");
}
}
}
}
Console.ReadKey();
}
2 消费者端消息确认(自动确认和显示确认)
从Broke发送到消费者时,RabbitMQ提供了两种消息确认的方式:自动确认和显示确认。
1 自动确认
自动确认:当RabbbitMQ将消息发送给消费者后,消费者端接收到消息后,不等待消息处理结束,立即自动回送一个确认回执。自动确认的用法十分简单,设置消费方法的参数autoAck为true即可,我们前边的例子都是使用的自动确认,这里不再详细演示,如下:
channel.BasicConsume(queue: "myqueue",autoAck: true, consumer: consumer);
注意:Broker会在接收到确认回执时删除消息,如果消费者接收到消息并返回了确认回执,然后这个消费者在处理消息时挂了,那么这条消息就再也找不回来了。
2 显示确认
我们知道自动确认可能会出现消息丢失的问题,我们不免会想到:Broker收到回执后才删除消息,如果可以让消费者在接收消息时不立即返回确认回执,等到消息处理完成后(或者完成一部分的逻辑)再返回确认回执,这样就保证消费端不会丢失消息了!这正是显式确认的思路。使用显示确认也比较简单,首先将Resume方法的参数autoAck设置为false,然后在消费端使用代码 channel.BasicAck()/BasicReject()等方法 来确认和拒绝消息。看一个栗子:
生产者代码如下:
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "xchen32",
Password = "rockwell123@RA"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
Console.WriteLine("生产者准备就绪....");
string message = "";
while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
{
message = Console.ReadLine();
var body = Encoding.UTF8.GetBytes(message);
channel.QueueDeclare("hello", true, false, false, null);
channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout,durable:true);
var props = channel.CreateBasicProperties();
props.Persistent = true;
try
{
channel.TxSelect();
channel.BasicPublish(exchange: "logs",
routingKey: "hello",
basicProperties: props,
body: body);
channel.TxCommit();
Console.WriteLine($"【{message}】发送到Broke成功!");
}
catch (Exception)
{
Console.WriteLine($"【{message}】发送到Broker失败!");
channel.TxRollback();
}
}
}
}
Console.ReadKey();
}
消费者代码如下:
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "xchen32",
Password = "rockwell123@RA"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout,durable:true);
channel.QueueBind(queue: "hello",
exchange: "logs",
routingKey: "");
Console.WriteLine(" [*] Waiting for logs.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
string message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"recive 【{message}】");
if (message.StartsWith("news"))
{
Console.WriteLine($"【{message}】news");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
else
{
Console.WriteLine($"【{message}】reject");
channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
}
};
Console.WriteLine("ready ....");
channel.BasicConsume(queue: "hello",
autoAck: false,
consumer: consumer);
Console.ReadKey();
}
}
介绍一下两个方法: channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 方法用于确认消息,deliveryTag参数是分发的标记,multiple表示是否确认多条。 channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false); 方法用于拒绝消息,deliveryTag也是指分发的标记,requeue表示消息被拒绝后是否重新放回queue中,true表示放回queue中,false表示直接丢弃。
2 消息持久化/优先级
1 消息持久化(Persistent)
在前边已经介绍了exchange和queue的持久化,把exchange和queue的durable属性设置为true,重启rabbitmq服务时( 重启命令:rabbitmqctl stop_app ;rabbitmqctl start_app ),exchange和queue也会恢复。我们需要注意的是:如果queue设置durable=true,rabbitmq服务重启后队列虽然会存在,但是队列内的消息会丢全部丢失。那么怎么实现消息的持久化呢?实现的方法很简单:将exchange和queue都设置durable=true,然后在消息发布的时候设置persistent=true即可。
消息优先级(Priority)
我们知道queue是先进先出的,即先发送的消息,先被消费。但是在具体业务中可能会遇到要提前处理某些消息的需求,如一个常见的需求:普通客户的消息按先进先出的顺序处理,Vip客户的消息要提前处理。消息实现优先级控制的实现方式是:首先在声明queue是设置队列的x-max-priority属性,然后在publish消息时,设置消息的优先级等级即可。为了演示方便,约定所有vip客户的信息都以vip开头,看一下代码实现:
生产者代码:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ConsoleApp2
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "xchen32",
Password = "rockwell123@RA"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "myexchange",
type: ExchangeType.Direct,
durable: true,
autoDelete: false,
arguments: null);
channel.QueueDeclare(queue: "myqueue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string, object>() {
{"x-max-priority",10 }
});
channel.QueueBind(queue: "myqueue", exchange: "myexchange", routingKey: "mykey");
Console.WriteLine("生产者准备就绪....");
string[] msgs = { "vip1", "hello2", "world3", "common4", "vip5" };
var props = channel.CreateBasicProperties();
foreach (string msg in msgs)
{
if (msg.StartsWith("vip"))
{
props.Priority = 9;
channel.BasicPublish(exchange: "myexchange",
routingKey: "mykey",
basicProperties: props,
body: Encoding.UTF8.GetBytes(msg));
}
else
{
props.Priority = 1;
channel.BasicPublish(exchange: "myexchange",
routingKey: "mykey",
basicProperties: props,
body: Encoding.UTF8.GetBytes(msg));
}
}
}
}
Console.ReadKey();
}
static void test()
{
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "xchen32",
Password = "rockwell123@RA"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
string message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"recive 【{message}】");
if (message.StartsWith("news"))
{
Console.WriteLine($"【{message}】news");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
else
{
Console.WriteLine($"【{message}】reject");
channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
}
};
Console.WriteLine("ready ....");
channel.BasicConsume(queue: "hello",
autoAck: false,
consumer: consumer);
Console.ReadKey();
}
}
}
}
}
消费者
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ConsoleApp2
{
class Program
{
static void Main(string[] args)
{
test();
}
static void test()
{
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "xchen32",
Password = "rockwell123@RA"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueBind(queue: "myqueue",
exchange: "myexchange",
routingKey: "mykey");
Console.WriteLine(" [*] Waiting for logs.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
string message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"recive 【{message}】");
if (message.StartsWith("news"))
{
Console.WriteLine($"【{message}】news");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
else
{
Console.WriteLine($"【{message}】reject");
channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
}
};
Console.WriteLine("ready ....");
channel.BasicConsume(queue: "myqueue",
autoAck: false,
consumer: consumer);
Console.ReadKey();
}
}
}
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)