C# rabbitmq 消息确认以及持久化

2023-05-16

https://www.cnblogs.com/wyy1234/p/10868416.html

1 消息确认

1 生产者端消息确认(tx机制和Confirm模式)
2 消费者端消息确认(自动确认和显示确认)
2 消息持久化/优先级

1 消息持久化(Persistent)
2 消息优先级(Priority)

消息确认
在一些场合,如转账、付费时每一条消息都必须保证成功的被处理。AMQP是金融级的消息队列协议,有很高的可靠性,这里介绍在使用RabbitMQ时怎么保证消息被成功处理的。消息确认可以分为两种:一种是生产者发送消息到Broke时,Broker给生产者发送确认回执,用于告诉生产者消息已被成功发送到Broker;一种是消费者接收到Broker发送的消息时,消费者给Broker发送确认回执,用于通知消息已成功被消费者接收。

下边分别介绍生产者端和消费者端的消息确认方法
生产者端消息确认(tx机制和Confirm模式)
  生产者端的消息确认:当生产者将消息发送给Broker,Broker接收到消息给生产者发送确认回执。生产者端的消息确认有两种方式:tx机制和Confirm模式。

1.tx机制
  tx机制可以叫做事务机制,RabbitMQ中有三个与tx机制的方法:txSelect(), txCommit()和txRollback()。 channel.txSelect() 用于将当前channel设置成transaction模式, channel.txCommit() 提交事务, channel.txRollback() 回滚事务。使用tx机制,我们首先要通过txSelect方法开启事务,然后发布消息给broker服务器了,如果txCommit提交成功了,则说明消息成功被broker接收了;如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们可以捕获异常,通过txRollback回滚事务。看一个tx机制的简单实现:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在设备ip,这里就是本机
                HostName = "127.0.0.1",
                UserName = "xchen32",//用户名
                Password = "rockwell123@RA"//密码
            };
            //创建连接connection
            using (var connection = factory.CreateConnection())
            {
                //创建通道channel
                using (var channel = connection.CreateModel())
                {
                    Console.WriteLine("生产者准备就绪....");
                    string message = "";
                    //发送消息
                    //在控制台输入消息,按enter键发送消息
                    while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
                    {
                        message = Console.ReadLine();
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.QueueDeclare("hello", true, false, false, null);//创建一个名称为hello的消息队列
                        channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout,durable:true);
                        //设置消息持久化
                        var props = channel.CreateBasicProperties();
                        props.Persistent = true;
                        try
                        {
                            //开启事务机制
                            channel.TxSelect();
                            //发送消息
                            channel.BasicPublish(exchange: "logs",
                                                 routingKey: "hello",
                                                 basicProperties: props,
                                                 body: body);
                            //事务提交
                            channel.TxCommit();
                            Console.WriteLine($"【{message}】发送到Broke成功!");
                        }
                        catch (Exception)
                        {
                            Console.WriteLine($"【{message}】发送到Broker失败!");
                            channel.TxRollback();
                        }
                    }
                }
            }
            Console.ReadKey();
        }

2 Confirm模式
  C#的RabbitMQ API中,有三个与Confirm相关的方法:ConfirmSelect(),WaitForConfirms()和WaitForConfirmOrDie。 channel.ConfirmSelect() 表示开启Confirm模式; channel.WaitForConfirms() 等待所有消息确认,如果所有的消息都被服务端成功接收返回true,只要有一条没有被成功接收就返回false。 channel.WaitForConfirmsOrDie() 和WaitForConfirms作用类型,也是等待所有消息确认,区别在于该方法没有返回值(Void),如果有任意一条消息没有被成功接收,该方法会立即抛出一个OperationInterrupedException类型异常。看一个Confirm模式的简单实现:

static void Main(string[] args)
{
    var factory = new ConnectionFactory()
    {
        //rabbitmq-server所在设备ip,这里就是本机
        HostName = "127.0.0.1",
        UserName = "wyy",//用户名
        Password = "123321"//密码
    };
    //创建连接connection
    using (var connection = factory.CreateConnection())
    {
        //创建通道channel
        using (var channel = connection.CreateModel())
        {
            Console.WriteLine("生产者准备就绪....");
            string message = "";
            //在控制台输入消息,按enter键发送消息
            while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
            {
                message = Console.ReadLine();
                var body = Encoding.UTF8.GetBytes(message);

                //开启Confirm模式
                channel.ConfirmSelect();
                //发送消息
                channel.BasicPublish(exchange: "myexchange",
                                     routingKey: "mykey",
                                     basicProperties: null,
                                     body: body);
                //WaitForConfirms确认消息(可以同时确认多条消息)是否发送成功
                if (channel.WaitForConfirms())
                {
                    Console.WriteLine($"【{message}】发送到Broke成功!");
                }
            }
        }
    }
    Console.ReadKey();
}

2 消费者端消息确认(自动确认和显示确认)
  从Broke发送到消费者时,RabbitMQ提供了两种消息确认的方式:自动确认和显示确认。

1 自动确认
  自动确认:当RabbbitMQ将消息发送给消费者后,消费者端接收到消息后,不等待消息处理结束,立即自动回送一个确认回执。自动确认的用法十分简单,设置消费方法的参数autoAck为true即可,我们前边的例子都是使用的自动确认,这里不再详细演示,如下:

 channel.BasicConsume(queue: "myqueue",autoAck: true, consumer: consumer);

注意:Broker会在接收到确认回执时删除消息,如果消费者接收到消息并返回了确认回执,然后这个消费者在处理消息时挂了,那么这条消息就再也找不回来了。

2 显示确认
  我们知道自动确认可能会出现消息丢失的问题,我们不免会想到:Broker收到回执后才删除消息,如果可以让消费者在接收消息时不立即返回确认回执,等到消息处理完成后(或者完成一部分的逻辑)再返回确认回执,这样就保证消费端不会丢失消息了!这正是显式确认的思路。使用显示确认也比较简单,首先将Resume方法的参数autoAck设置为false,然后在消费端使用代码 channel.BasicAck()/BasicReject()等方法 来确认和拒绝消息。看一个栗子:

生产者代码如下:

    static void Main(string[] args)
    {
        var factory = new ConnectionFactory()
        {
            //rabbitmq-server所在设备ip,这里就是本机
            HostName = "127.0.0.1",
            UserName = "xchen32",//用户名
            Password = "rockwell123@RA"//密码
        };
        //创建连接connection
        using (var connection = factory.CreateConnection())
        {
            //创建通道channel
            using (var channel = connection.CreateModel())
            {
                Console.WriteLine("生产者准备就绪....");
                string message = "";
                //发送消息
                //在控制台输入消息,按enter键发送消息
                while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
                {
                    message = Console.ReadLine();
                    var body = Encoding.UTF8.GetBytes(message);
                    channel.QueueDeclare("hello", true, false, false, null);//创建一个名称为hello的消息队列
                    channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout,durable:true);
                    //设置消息持久化
                    var props = channel.CreateBasicProperties();
                    props.Persistent = true;
                    try
                    {
                        //开启事务机制
                        channel.TxSelect();
                        //发送消息
                        channel.BasicPublish(exchange: "logs",
                                             routingKey: "hello",
                                             basicProperties: props,
                                             body: body);
                        //事务提交
                        channel.TxCommit();
                        Console.WriteLine($"【{message}】发送到Broke成功!");
                    }
                    catch (Exception)
                    {
                        Console.WriteLine($"【{message}】发送到Broker失败!");
                        channel.TxRollback();
                    }
                }
            }
        }
        Console.ReadKey();
    }

消费者代码如下:

var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在设备ip,这里就是本机
                HostName = "127.0.0.1",
                UserName = "xchen32",//用户名
                Password = "rockwell123@RA"//密码
            };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {

                    channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout,durable:true);

                    //var queueName = channel.QueueDeclare().QueueName;
                    channel.QueueBind(queue: "hello",
                                      exchange: "logs",
                                      routingKey: "");

                    Console.WriteLine(" [*] Waiting for logs.");

                    var consumer = new EventingBasicConsumer(channel);

                    consumer.Received += (model, ea) =>
                    {
                        string message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"recive 【{message}】");
                        //以news开头表示是新闻类型,处理完成后确认消息
                        if (message.StartsWith("news"))
                        {
                            //这里处理消息balabala
                            Console.WriteLine($"【{message}】news");
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        }
                        //不以news开头表示不是新闻类型,不进行处理,把消息退回到queue中
                        else
                        {
                            Console.WriteLine($"【{message}】reject");
                            channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
                        }
                    };
                    Console.WriteLine("ready ....");
                    //第五步:处理消息
                    channel.BasicConsume(queue: "hello",
                                           autoAck: false,
                                           consumer: consumer);
                    Console.ReadKey();
                }
            }

介绍一下两个方法: channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 方法用于确认消息,deliveryTag参数是分发的标记,multiple表示是否确认多条。 channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false); 方法用于拒绝消息,deliveryTag也是指分发的标记,requeue表示消息被拒绝后是否重新放回queue中,true表示放回queue中,false表示直接丢弃。

2 消息持久化/优先级
1 消息持久化(Persistent)
  在前边已经介绍了exchange和queue的持久化,把exchange和queue的durable属性设置为true,重启rabbitmq服务时( 重启命令:rabbitmqctl stop_app ;rabbitmqctl start_app ),exchange和queue也会恢复。我们需要注意的是:如果queue设置durable=true,rabbitmq服务重启后队列虽然会存在,但是队列内的消息会丢全部丢失。那么怎么实现消息的持久化呢?实现的方法很简单:将exchange和queue都设置durable=true,然后在消息发布的时候设置persistent=true即可

消息优先级(Priority)
  我们知道queue是先进先出的,即先发送的消息,先被消费。但是在具体业务中可能会遇到要提前处理某些消息的需求,如一个常见的需求:普通客户的消息按先进先出的顺序处理,Vip客户的消息要提前处理。消息实现优先级控制的实现方式是:首先在声明queue是设置队列的x-max-priority属性,然后在publish消息时,设置消息的优先级等级即可。为了演示方便,约定所有vip客户的信息都以vip开头,看一下代码实现:

生产者代码:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApp2
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在设备ip,这里就是本机
                HostName = "127.0.0.1",
                UserName = "xchen32",//用户名
                Password = "rockwell123@RA"//密码
            };
            //创建连接connection
            using (var connection = factory.CreateConnection())
            {
                //创建通道channel

                using (var channel = connection.CreateModel())
                {
                    //声明交换机exchang
                    channel.ExchangeDeclare(exchange: "myexchange",
                                            type: ExchangeType.Direct,
                                            durable: true,
                                            autoDelete: false,
                                            arguments: null);
                    //声明队列queue
                    channel.QueueDeclare(queue: "myqueue",
                                       durable: true,
                                       exclusive: false,
                                       autoDelete: false,
                                       arguments: new Dictionary<string, object>() {
                                           //队列优先级最高为10,不加x-max-priority的话,计算发布时设置了消息的优先级也不会生效
                                             {"x-max-priority",10 }
                                       });
                    //绑定exchange和queue
                    channel.QueueBind(queue: "myqueue", exchange: "myexchange", routingKey: "mykey");
                    Console.WriteLine("生产者准备就绪....");
                    //一些待发送的消息
                    string[] msgs = { "vip1", "hello2", "world3", "common4", "vip5" };
                    //设置消息优先级
                    var props = channel.CreateBasicProperties();
                    foreach (string msg in msgs)
                    {
                        //vip开头的消息,优先级设置为9
                        if (msg.StartsWith("vip"))
                        {
                            props.Priority = 9;
                            channel.BasicPublish(exchange: "myexchange",
                                                 routingKey: "mykey",
                                                 basicProperties: props,
                                                 body: Encoding.UTF8.GetBytes(msg));
                        }
                        //其他消息的优先级为1
                        else
                        {
                            props.Priority = 1;
                            channel.BasicPublish(exchange: "myexchange",
                                                 routingKey: "mykey",
                                                 basicProperties: props,
                                                 body: Encoding.UTF8.GetBytes(msg));
                        }
                    }
                }
                        
            }
            Console.ReadKey();
        }


        static void test()
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在设备ip,这里就是本机
                HostName = "127.0.0.1",
                UserName = "xchen32",//用户名
                Password = "rockwell123@RA"//密码
            };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //定义消费者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        string message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"recive 【{message}】");
                        //以news开头表示是新闻类型,处理完成后确认消息
                        if (message.StartsWith("news"))
                        {
                            //这里处理消息balabala
                            Console.WriteLine($"【{message}】news");
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        }
                        //不以news开头表示不是新闻类型,不进行处理,把消息退回到queue中
                        else
                        {
                            Console.WriteLine($"【{message}】reject");
                            channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
                        }
                    };
                    Console.WriteLine("ready ....");
                    //第五步:处理消息
                    channel.BasicConsume(queue: "hello",
                                           autoAck: false,
                                           consumer: consumer);
                    Console.ReadKey();
                }
            }
        }
    }
}

消费者

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApp2
{
    class Program
    {
        static void Main(string[] args)
        {
            test();
           
        }


        static void test()
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在设备ip,这里就是本机
                HostName = "127.0.0.1",
                UserName = "xchen32",//用户名
                Password = "rockwell123@RA"//密码
            };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueBind(queue: "myqueue",
                                      exchange: "myexchange",
                                      routingKey: "mykey");

                    Console.WriteLine(" [*] Waiting for logs.");

                    var consumer = new EventingBasicConsumer(channel);

                    consumer.Received += (model, ea) =>
                    {
                        string message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"recive 【{message}】");
                        //以news开头表示是新闻类型,处理完成后确认消息
                        if (message.StartsWith("news"))
                        {
                            //这里处理消息balabala
                            Console.WriteLine($"【{message}】news");
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        }
                        //不以news开头表示不是新闻类型,不进行处理,把消息退回到queue中
                        else
                        {
                            Console.WriteLine($"【{message}】reject");
                            channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
                        }
                    };
                    Console.WriteLine("ready ....");
                    //第五步:处理消息
                    channel.BasicConsume(queue: "myqueue",
                                           autoAck: false,
                                           consumer: consumer);
                    Console.ReadKey();
                }
            }
        }
    }
}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

C# rabbitmq 消息确认以及持久化 的相关文章

  • 使用RabbitMQ(Java客户端),有没有办法确定消费期间网络连接是否关闭?

    我在 RHEL 5 3 上使用 Java 客户端使用 RabbitMQ 我有 2 个节点 机器 Node1 正在使用 Java 帮助器类 QueueingConsumer 消费 Node2 上队列中的消息 QueueingConsumer
  • 如何优雅地结束 spring @Schedule 任务?

    我正在尝试让 Spring Boot 服务优雅地结束 它有一个方法 Scheduled注解 该服务使用 spring data 作为数据库 使用 spring cloud stream 作为 RabbitMQ 在计划的方法结束之前 数据库和
  • RabbitMQ 用户在预先创建的队列上发布/订阅的权限

    我有一个用例 我需要创建一个用户并授予他仅在现有队列上发布 订阅的权限 这是一个示例 虚拟主机 mainvhost 对于所有用户都相同 在虚拟主机内 我有 A foo 和 Q bar 队列 用户 foo 只能发布 订阅到 Q foo 用户
  • 使用spring-amqp和rabbitmq实现带退避的非阻塞重试

    我正在寻找一种使用 spring amqp 和 Rabbit MQ 的退避策略来实现重试的好方法 但要求是侦听器不应被阻止 因此可以自由地处理其他消息 我在这里看到了类似的问题 但它不包括 后退 的解决方案 RabbitMQ 和 Sprin
  • 在 django 中使用 pika 的 Rabbitmq 监听器

    我有一个 django 应用程序 我想使用来自rabbit mq 的消息 我希望监听器在启动 django 服务器时开始使用 我正在使用 pika 库连接到rabbitmq 提供一些代码示例确实会有帮助 首先 您需要在 django 项目开
  • 没有连接的 AMQP/RabbitMQ 通道什么时候会死亡?

    我有一个简单的 RabbitMQ 测试程序 随机将消息排队 另一个读取它们 所有这些都使用 Spring AMQP 如果消费者死亡 例如 在没有机会关闭其连接或通道的情况下终止进程 则它尚未确认的任何消息似乎将永远保持未确认状态 我看过很多
  • 如何在 celery 内为每个用户生成队列?

    因此 我尝试将 Web 请求中的阻塞内容移至后台任务并利用队列 我对消息传递和发布 订阅也很陌生 用户将数据推送到那里并进行处理 稍后用户会收到相关通知 我为此做了一个 celery 设置 发现它不能满足我为每个用户分配自己的任务的专用队列
  • 与 RabbitMQ 相比,Amazon SQS 的性能较慢

    我想在我的 Web 应用程序中集成消息队列中间层 我测试了 Rabbitmq 和 Amazon SQS 但发现 Amazon SQS 速度很慢 我在 Amazon SQS 中每秒收到 80 个请求 而在 Rabbitmq 中每秒收到 200
  • Celery 3.0.1 中的框架错误

    我最近从 2 3 0 升级到 Celery 3 0 1 所有任务都运行良好 很遗憾 我经常收到 帧错误 异常 我还运行主管来重新启动线程 但由于这些线程从未真正被杀死 主管无法知道 celery 需要重新启动 有没有人见过这个 2012 0
  • 使用AWS SQS作为Aurora数据库的写入队列来提高系统性能是否有效

    我正在 AWS 上开发一个 Web 应用程序服务器 需要支持高吞吐量的读写 我的老板给了我这样的高级设计 我被困在 写入队列 上 团队告诉我 我们需要它来提高写入性能 因为我们只能有 1 个可以写入的主副本 我对 SQS 和 RabbitM
  • Spring AMQP + RabbitMQ 3.3.5 ACCESS_REFUSED - 使用身份验证机制 PLAIN 拒绝登录

    我遇到以下异常 org springframework amqp AmqpAuthenticationException com rabbitmq client AuthenticationFailureException ACCESS R
  • 列出与rabbitmq java客户端API交换的绑定

    我似乎在文档中找不到任何信息 所以我想知道是否可以通过某种方式使用 java RabbitMQ API 获取与交换相关的所有绑定 我在查询 api bindings 时正在寻找类似 http api 结果的内容 api definition
  • RabbitMQ C# API:如何检查绑定是否存在?

    使用 RabbitMQ C API 我如何检查给定队列到给定交换是否存在绑定 很多 RabbitMQ 调用都是幂等的 所以有些人可能会说在这些情况下检查是不必要的 但我认为它们在测试中很有用 您可以使用他们的 REST API 来调用并查看
  • 无法从 docker 将 RabbitMQ 连接到我的应用程序 [重复]

    这个问题在这里已经有答案了 我目前被这个问题困扰了大约一周 确实找不到合适的解决方案 问题是 当我尝试连接到 dockerized RabbitMQ 时 它每次都会给出相同的错误 wordofthedayapp wordofthedayap
  • RabbitMQ 中多个消费者如何订阅同一主题并获取同一消息

    首先 我知道类似问题已经有答案了here https stackoverflow com questions 10620976 rabbitmq amqp single queue multiple consumers for same m
  • 何时使用 RabbitMQ 而不是 Kafka? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我被要求评估 RabbitMQ 而不是 Kafka 但发现很难找到消息队列比 Kafka 更合适的情况 有谁知道消息队列在吞吐量 耐用性 延迟或
  • 多个队列在一个通道中消耗

    我使用rabbitMq 来管理和使用队列 我有多个队列 它们的数量并不具体 我使用直接交换来发布消息 我怎样才能仅使用一个队列来消费每个队列的所有消息 基于routing key 渠道 此时我假设我有 5 个队列 我使用了 for 循环并为
  • 死信交换 RabbitMQ 丢弃消息

    我正在尝试在 RabbitMQ 中实现 dlx 队列 场景很简单 我有 2 个队列 1 活着 2 死亡 x dead letter exchange 立即 x message ttl 5000 以及 立即 交换 这必然是 1 活着 我尝试运
  • RabbitMQ 失败,错误:无法连接到节点rabbit@TPAJ05421843:nodedown

    在 Windows 7 Enterprise 计算机上 我全新安装了 Erlang 17 4 和 RabbitMQ 3 4 3 x64 安装成功且顺利 我还没有尝试创建我的第一个队列或交换器 但我已经看到了麻烦 这个问题类似于另一个SO帖子
  • Amazon EC2 实例上和本地的 RabbitMQ?

    是否可以设置一个RabbitMQ服务器上的Amazon EC2 instance 并将我办公室的机器连接到此RabbitMQ服务器并向其发送 接收消息 我会被收取费用吗Amazon对于流入 流出我的带宽 消息RabbitMQ EC2 ins

随机推荐