在使用 FromEventPattern 订阅之前捕获事件

2024-02-27

我正在使用 Rx 框架编写消息监听器。

我面临的问题是,我正在使用的库使用一个消费者,每当消息到达时就会发布事件。

我已经设法通过以下方式消费传入的消息Observable.FromEventPattern但我对服务器中已有的消息有疑问。

目前我有以下命令链

  1. 创建消费者
  2. 创建一个可观察序列FromEventPattern并应用所需的转换
  3. 告诉消费者开始
  4. 订阅序列

最简单的解决方案是交换步骤 3. 和 4.,但由于它们发生在系统的不同组件中,因此我很难这样做。

理想情况下,我想在第 4 步发生时执行第 3 步(就像OnSubscribe方法)。

感谢您的帮助 :)

PS:要添加更多详细信息,事件来自 RabbitMQ 队列,我正在使用EventingBasicConsumer在 RabbitMQ.Client 包中找到的类。

Here https://github.com/Kralizek/Nybus/tree/v1你可以找到我正在开发的图书馆。具体来说,this https://github.com/Kralizek/Nybus/blob/v1/src/Nybus.RabbitMQ/RabbitMQBusEngine.cs#L28是课程/方法给我带来了问题。

Edit

这是有问题的代码的精简版本

void Main()
{
    var engine = new Engine();

    var messages = engine.Start();

    messages.Subscribe(m => m.Dump());

    Console.ReadLine();

    engine.Stop();
}

public class Engine
{
    IConnection _connection;
    IModel _channel;

    public IObservable<Message> Start()
    {
        var connectionFactory = new ConnectionFactory();

        _connection = connectionFactory.CreateConnection();
        _channel = _connection.CreateModel();

        EventingBasicConsumer consumer = new EventingBasicConsumer(_channel);

        var observable = Observable.FromEventPattern<BasicDeliverEventArgs>(
                                        a => consumer.Received += a, 
                                        a => consumer.Received -= a)
                                    .Select(e => e.EventArgs);

        _channel.BasicConsume("a_queue", false, consumer);

        return observable.Select(Transform);
    }

    private Message Transform(BasicDeliverEventArgs args) => new Message();

    public void Stop()
    {
        _channel.Dispose();
        _connection.Dispose();
    }
}

public class Message { }

我遇到的症状是,由于我在订阅序列之前调用 BasicConsume,因此会获取 RabbitMQ 队列中的任何消息,但不会沿着管道传递。

由于我没有打开“autoack”,因此一旦程序停止,消息就会返回到队列。


正如一些人在评论中指出的那样,正如您在问题中指出的那样,问题是由于您使用 RabbitMQ 客户端的方式造成的。

为了解决其中一些问题,我实际上所做的是创建一个 ObservableConsumer 类。这是当前使用的 EventingBasicConsumer 的替代方案。我这样做的一个原因是为了处理问题中描述的问题,但这样做的另一件事是允许您在单个连接/通道实例之外重新使用此消费者对象。这样做的好处是,无论瞬态连接/通道特性如何,您的下游反应代码都可以保持连线。

using System;
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using RabbitMQ.Client;

namespace com.rabbitmq.consumers
{
    public sealed class ObservableConsumer : IBasicConsumer
    {
        private readonly List<string> _consumerTags = new List<string>();
        private readonly object _consumerTagsLock = new object();
        private readonly Subject<Message> _subject = new Subject<Message>();

        public ushort PrefetchCount { get; set; }
        public IEnumerable<string> ConsumerTags { get { return new List<string>(_consumerTags); } }

        /// <summary>
        /// Registers this consumer on the given queue. 
        /// </summary>
        /// <returns>The consumer tag assigned.</returns>
        public string ConsumeFrom(IModel channel, string queueName)
        {
            Model = channel;
            return Model.BasicConsume(queueName, false, this);
        }

        /// <summary>
        /// Contains an observable of the incoming messages where messages are processed on a thread pool thread.
        /// </summary>
        public IObservable<Message> IncomingMessages
        {
            get { return _subject.ObserveOn(Scheduler.ThreadPool); }
        }

        ///<summary>Retrieve the IModel instance this consumer is
        ///registered with.</summary>
        public IModel Model { get; private set; }

        ///<summary>Returns true while the consumer is registered and
        ///expecting deliveries from the broker.</summary>
        public bool IsRunning
        {
            get { return _consumerTags.Count > 0; }
        }

        /// <summary>
        /// Run after a consumer is cancelled.
        /// </summary>
        /// <param name="consumerTag"></param>
        private void OnConsumerCanceled(string consumerTag)
        {

        }

        /// <summary>
        /// Run after a consumer is added.
        /// </summary>
        /// <param name="consumerTag"></param>
        private void OnConsumerAdded(string consumerTag)
        {

        }

        public void HandleBasicConsumeOk(string consumerTag)
        {
            lock (_consumerTagsLock) {
                if (!_consumerTags.Contains(consumerTag))
                    _consumerTags.Add(consumerTag);
            }
        }

        public void HandleBasicCancelOk(string consumerTag)
        {
            lock (_consumerTagsLock) {
                if (_consumerTags.Contains(consumerTag)) {
                    _consumerTags.Remove(consumerTag);
                    OnConsumerCanceled(consumerTag);
                }
            }
        }

        public void HandleBasicCancel(string consumerTag)
        {
            lock (_consumerTagsLock) {
                if (_consumerTags.Contains(consumerTag)) {
                    _consumerTags.Remove(consumerTag);
                    OnConsumerCanceled(consumerTag);
                }
            }
        }

        public void HandleModelShutdown(IModel model, ShutdownEventArgs reason)
        {
            //Don't need to do anything.
        }

        public void HandleBasicDeliver(string consumerTag,
                                       ulong deliveryTag,
                                       bool redelivered,
                                       string exchange,
                                       string routingKey,
                                       IBasicProperties properties,
                                       byte[] body)
        {
            //Hack - prevents the broker from sending too many messages.
            //if (PrefetchCount > 0 && _unackedMessages.Count > PrefetchCount) {
            //    Model.BasicReject(deliveryTag, true);
            //    return;
            //}

            var message = new Message(properties.HeaderFromBasicProperties()) { Content = body };
            var deliveryData = new MessageDeliveryData()
            {
                ConsumerTag = consumerTag,
                DeliveryTag = deliveryTag,
                Redelivered = redelivered,
            };

            message.Tag = deliveryData;

            if (AckMode != AcknowledgeMode.AckWhenReceived) {
                message.Acknowledged += messageAcknowledged;
                message.Failed += messageFailed;
            }

            _subject.OnNext(message);
        }

        void messageFailed(Message message, Exception ex, bool requeue)
        {
            try {
                message.Acknowledged -= messageAcknowledged;
                message.Failed -= messageFailed;

                if (message.Tag is MessageDeliveryData) {
                    Model.BasicNack((message.Tag as MessageDeliveryData).DeliveryTag, false, requeue);
                }
            }
            catch {}
        }

        void messageAcknowledged(Message message)
        {
            try {
                message.Acknowledged -= messageAcknowledged;
                message.Failed -= messageFailed;

                if (message.Tag is MessageDeliveryData) {
                    var ackMultiple = AckMode == AcknowledgeMode.AckAfterAny;
                    Model.BasicAck((message.Tag as MessageDeliveryData).DeliveryTag, ackMultiple);
                }
            }
            catch {}
        }
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在使用 FromEventPattern 订阅之前捕获事件 的相关文章

随机推荐