RabbitMQ在重置OnMessageReceived后不检索消息

rqmkfv5c  于 8个月前  发布在  RabbitMQ
关注(0)|答案(1)|浏览(86)

我正在使用RabbitMQ.Client(C#)来使用RabbitMQ。删除并重新添加消息接收事件处理程序后,从队列中检索消息时遇到问题。consumer.Received -= OnMessageRecieved;我有一个复杂的系统,其中一个windows服务订阅RabbitMQ队列并处理消息。有多个线程运行的各种事情-定时器调用推送API,另一个定时器做API认证等。如果API身份验证失败,我们不想处理队列中的消息。我们希望将消息保持在就绪状态。只有当API认证成功时,我们才想处理消息。因此,在失败事件中,我们删除事件处理程序,在成功事件中,我们将其添加回来。当我们这样做时,事件处理程序被成功添加,但是现在消息没有从队列中检索。
为了模拟这个,我创建了一个控制台应用程序。我在不到一个小时的时间里写了这段代码,我知道这段代码非常原始和肮脏-请原谅我。sub.StopReceiveMessages();包含删除处理程序consumer.Received -= OnMessageRecieved的代码。而且,sub.StartReceiveMessages();具有删除处理程序consumer.Received += OnMessageRecieved的代码。当你把它加回去的时候,我以为它会正常工作。但是,它不再击中MessageReceived()。是不是我使用同一个消费者,但我们必须再次调用BasicCon?任何帮助将不胜感激。

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Client = RabbitMQ.Client;

namespace RabbitMQTest
{
    class Program
    {

        static void Main(string[] args)
        {
            MessageBusSubscription sub = new MessageBusSubscription();

            sub.Subscription("EmployeeDataChanged", "HR", "CompanyA", 5, 5000);
            sub.MessagesReceived += MessageReceived;
            Console.WriteLine("Press ESC to exit");
            while (!(Console.KeyAvailable && Console.ReadKey(true).Key == ConsoleKey.Escape))
            {
                // Simulating an event where we have to stop pulling the messages from the queue
                sub.StopReceiveMessages();

                Thread.Sleep(2000);

                // After some time, the issue is resolved and now we resume reading the messages from the queue
                sub.StartReceiveMessages();
            }
            sub.Dispose();
            Environment.Exit(0);
        }

        private static bool MessageReceived(string topic, string subscription, List<MessageContainer> messages)
        {
            List<MessageContainer> data = null;
            data = messages as List<MessageContainer>;

            foreach (var messageContainer in data)
            {
                // Do something with the message
                // Ack or Reject based on some logic
            }

            return true;
        }
    }
    public class MessageBusSubscription : IDisposable
    {
        #region variables
        Client.Events.EventingBasicConsumer consumer;
        Client.ConnectionFactory factory;        
        private System.Timers.Timer _timer = null;
        private Client.IModel _channel = null;
        private string _topic = string.Empty;
        private string _subscription = string.Empty;
        int batchCounter = 0;
        int batchSize = 0;
        ManualResetEvent _waitHandle = new ManualResetEvent(false);
        bool _disposing = false;
        bool _isSubscribed = false;
        List<MessageContainer> messages = new List<MessageContainer>();
        private object _processMessageLocker = new object();
        public event Func<string, string, List<MessageContainer>, bool> MessagesReceived;
        #endregion
        public MessageBusSubscription()
        {
            Client.IConnection conn = GetConnection();
            _channel = conn.CreateModel();
        }
        public void Subscription(string exchangeName, string queueName, string routingKey, int batchSize, double batchInterval)
        {
            _topic = exchangeName;
            _subscription = queueName;
            DeclareExchangeAndQueue(exchangeName, queueName, routingKey);

            if (batchInterval > 0 && batchSize > 1)
            {
                _timer = new System.Timers.Timer(batchInterval);

                _timer.Elapsed += (o, e) => {
                    ProcessMessagesReceived(exchangeName, queueName, true);
                };
            }
            Subscribe(routingKey, exchangeName, queueName, batchSize, batchInterval);
        }
        public Task Subscribe(string routingKey, string topic, string subscription, int _batchSize, double batchInterval)
        {
            try
            {
                consumer = new Client.Events.EventingBasicConsumer(_channel);

                batchCounter = 0;
                batchSize = _batchSize;

                consumer.Received += OnMessageRecieved;
                _isSubscribed = true;

                //RabbitMQ PUSH implementation using RabbitMQ.Client library
                var t = Task.Factory.StartNew(() =>
                {
                    try
                    {
                        if (_timer != null)
                        {
                            _timer.Start();
                        }

                        var queueName = string.Join(".", routingKey, topic, subscription);

                        if (!_disposing)
                        {
                            _channel.BasicConsume(queueName, false, consumer);
                            _waitHandle.WaitOne();
                        }

                        if (_timer != null)
                        {
                            _timer.Stop();
                            _timer.Dispose();
                        }

                        if (_channel != null)
                        {
                            if (_channel.IsOpen)
                                _channel.Close();
                            _channel.Dispose();
                        }
                    }
                    catch (Exception ex)
                    {

                    }

                });

                return t;
            }
            catch (Exception ex)
            {
                var exTask = new Task(() => { throw new AggregateException(ex); });
                exTask.RunSynchronously();
                return exTask;
            }
        }
        public void OnMessageRecieved(Client.IBasicConsumer sender, Client.Events.BasicDeliverEventArgs e)
        {
            try
            {
                string sourceExchange = string.Empty;
                string sourceQueue = string.Empty;

                string body = Encoding.ASCII.GetString(e.Body);
                string routingKey = e.RoutingKey;
                ulong deliveryTag = e.DeliveryTag;
                sourceExchange = "";
                sourceQueue = "";
                MessageContainer msgContainer = new MessageContainer();
                msgContainer.Message = body;

                batchCounter++;
                msgContainer.DeliveryTag = deliveryTag;

                lock (_processMessageLocker)
                {
                    messages.Add(msgContainer);
                    ProcessMessagesReceived(_topic, _subscription, false);
                }
            }
            catch (Exception ex)
            {

            }

        }
        public void ProcessMessagesReceived(string topic, string subscription, bool hasTimerElapsed)
        {
            try
            {
                // if it's the last message in the batch, or the interval has elapsed
                if ((batchCounter % batchSize == 0 && messages.Count > 0) || (hasTimerElapsed && messages.Count > 0))
                {
                    if (_timer != null)
                    {
                        _timer.Stop();
                    }

                    lock (_processMessageLocker)
                    {
                        // process the message
                        if (!MessagesReceived(topic, subscription, messages))
                        {
                            throw new Exception("Message processing exception - look in the default error queue (broker)");
                        }
                        messages.Clear();
                    }

                    batchCounter = 0;
                    if (_timer != null)
                    {
                        _timer.Start();
                    }
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }
        public Client.IConnection GetConnection()
        {
            factory = new Client.ConnectionFactory();

            factory.UserName = "guest";
            factory.Password = "guest";
            factory.VirtualHost = "/";
            factory.HostName = "localhost";
            factory.Protocol = Client.Protocols.AMQP_0_9_1;
            factory.Port = 5673;

            return factory.CreateConnection();
        }
        public void DeclareExchangeAndQueue(string exchangeName, string queueName, string routingKey)
        {
            using (var exchangeConn = factory.CreateConnection())
            using (Client.IModel channel = exchangeConn.CreateModel())
            {
                channel.ExchangeDeclare(exchangeName, Client.ExchangeType.Direct);

                var queue = String.Join(".", routingKey, exchangeName, queueName);
                channel.QueueDeclare(queue, false, false, false, null);
                channel.QueueBind(queue, exchangeName, routingKey, null);
            }
        }
        public void StartReceiveMessages()
        {
            if (_timer != null && !_isSubscribed)
            {
                _timer.Start();
            }

            if (consumer != null && !_isSubscribed)
            {
                consumer.Received += OnMessageRecieved;
                _isSubscribed = true;
            }            
        }
        public void StopReceiveMessages()
        {
            if (_timer != null)
            {
                _timer.Stop();
            }

            if (consumer != null)
            {
                consumer.Received -= OnMessageRecieved;
                _isSubscribed = false;
            }                        
        }
        public void Dispose()
        {
            _disposing = true;
            _waitHandle.Set();
            _waitHandle?.Dispose();
            _waitHandle = null;

            if (_timer != null)
            {
                _timer.Stop();
                _timer.Dispose();
            }
        }
    }
    public class MessageContainer
    {
        public ulong DeliveryTag { get; set; }
        public string Message { get; set; }
    }
}
3xiyfsfu

3xiyfsfu1#

不要取消订阅Received事件,而是使用BasicCancel方法停止消费消息,然后再次使用BasicConsume开始消费。
许多线程同步代码以及在Task中运行消费者并不是真正的最佳实践。如果你想进一步了解这段代码,可以将其保存到git仓库或gist中,并在官方邮件列表中跟进。

相关问题