Welcome to Software Development on Codidact!
Will you help us build our independent community of developers helping developers? We're small and trying to grow. We welcome questions about all aspects of software development, from design to code to QA and more. Got questions? Got answers? Got code you'd like someone to review? Please join us.
Post History
Context I was tasked with developing a way to consume RabbitMQ messages that have various priorities (actually different levels of QoS). My restrictions are: high priority messages should have ...
#2: Post edited
- ## Context
- I was tasked with developing a way to consume RabbitMQ messages that have various priorities (actually different levels of QoS). My restrictions are:
- - high priority messages should have precedence over lower priority messages (must be processed more quickly)
- - low priority messages should not "starve" (i.e. a burst of high-priority messages should not lead to lower priority not being processed at all)
- ## Overview
- After researching several Rabbit MQ libraries, I have picked EasyNetQ which provides some useful abstraction over RabbitMQ .NET client. One of its consumption patterns looks like this:
- ```c#
- var queue = Bus.Advanced.QueueDeclare(Config.QueueName);
- _ = Bus.Advanced.Consume(queue, (body, properties, info, ct) => Task.Run(async () =>
- {
- // this task will be awaited internally
- }, ct),
- cfg =>
- {
- cfg.WithPrefetchCount(Config.PrefetchCount);
- });
- ```
- - The library automatically ACK the message if no exception is raised
- - If an exception passes the end of this handler, a `IConsumerErrorStrategy` (default or custom) is called to handle that exception and decide on retrial, send to some dead letter exchange
- - there is no support for explicit ACK or NACK
- ## Consumption with priorities
- My case requires handling the priorities and I use another class to deal with this.
- ```c#
- public abstract class RabbitMqMessageConsumerBase
- {
- protected IBus Bus { get; private set; }
- private IPriorityTaskScheduler _priorityTaskScheduler = default!;
- public RabbitMqConsumerConfig Config { get; set; } = new();
- protected RabbitMqMessageConsumerBase(IBus bus)
- {
- Bus = bus;
- }
- public void StartConsumingWithPriorities(IPriorityTaskScheduler priorityTaskScheduler)
- {
- _priorityTaskScheduler = priorityTaskScheduler;
- var queue = Bus.Advanced.QueueDeclare(Config.QueueName);
- _ = Bus.Advanced.Consume(queue, (body, properties, info, ct) => Task.Run(async () =>
- {
- // not copying into a byte[] may lead memory corruption and deserialization failure
- var bodyCopy = new byte[body.Length];
- body.Span.CopyTo(bodyCopy);
- await ScheduleMessageProcessing(bodyCopy, properties, info, ct);
- }, ct),
- cfg =>
- {
- cfg.WithPrefetchCount(Config.PrefetchCount);
- });
- }
- private async Task ScheduleMessageProcessing(byte[] bodyCopy, MessageProperties properties, MessageReceivedInfo receivedInfo, CancellationToken ct)
- {
- byte priorityIndex = (byte)(properties.Priority - 1);
- await _priorityTaskScheduler.ScheduleMessageProcessing(new PriorityTaskSchedulerItem
- {
- PriorityIndex = priorityIndex,
- ProcessingTask = async () =>
- {
- await ProcessMessage(bodyCopy, properties, receivedInfo, ct);
},ExceptionTask = async (ex) =>{await ProcessMessageException(ex, bodyCopy, properties, receivedInfo, ct);},- Ct = ct
- });
- }
- ```
- The priority task scheduler places the Task (work to be done) in a BlockingCollection based on a computed priority, effectively delaying its execution if placed after other tasks.
- Processing works as expected (priorities and throughput), but **a Task will be executed in another context than the client consumption one **(that one in which `ProcessTasks` runs).
- This means that without using `TaskCompletionSource` and having the `ScheduleMessageProcessing` awaiting for `TaskCompletionSource's` Task, any raised exception during message processing will not reach the EasyNetQ handler.
- ```c#
- public class PriorityTaskScheduler : IPriorityTaskScheduler
- {
- private readonly int _maxPriority;
- private readonly int _maxParallelism;
- private readonly int _maxQueueSize;
- private readonly int _blockingCollectionWaitTimeout;
- private BlockingCollection<PriorityTaskSchedulerItem>[] _taskQueues = default!;
- private readonly SemaphoreSlim _semaphore = new(1, 1);
- private int _totalInProcess = 0;
- private int _maxTotalInProcess = 0;
- /// <summary>
- /// specifies range in % of the total slots are searchable for current priority
- /// if all slots have some load on them
- /// </summary>
- /// <remarks>index-0 correspond to priority=1</remarks>
- private readonly int[] _prioritySearchableSlotRangePercs;
- /// <summary>
- /// specifies range in number of slots that are searchable for current priority
- /// if all slots have some load on them
- /// </summary>
- private int[] _prioritySearchableSlotRange = default!;
- /// <summary>
- /// this should be set only for local dev testing
- /// </summary>
- public bool DiagnosticMode { get; set; } = false;
- public PriorityTaskScheduler(PriorityTaskSchedulerConfig config)
- {
- _maxPriority = config.MaxPriority;
- _maxParallelism = config.MaxParallelism;
- _maxQueueSize = config.MaxQueueSize;
- _blockingCollectionWaitTimeout = config.BlockingCollectionWaitTimeout;
- _prioritySearchableSlotRangePercs = config.PrioritySearchableSlotRangePercs;
- Init();
- }
- private void Init()
- {
- _taskQueues = new BlockingCollection<PriorityTaskSchedulerItem>[_maxParallelism];
- _prioritySearchableSlotRange = new int[_maxPriority];
- for (int i = 0; i < _maxPriority; i++)
- {
- _prioritySearchableSlotRange[i] = (int)(_maxParallelism * _prioritySearchableSlotRangePercs[i] / 100.0);
- }
- for (int i = 0; i < _maxParallelism; i++)
- {
- _taskQueues[i] = new BlockingCollection<PriorityTaskSchedulerItem>(_maxQueueSize);
- }
- }
- // this is called on application startup
- public void Start()
- {
- for (int i = 0; i < _maxParallelism; i++)
- {
- int queueIndex = i; // local copy for closure
- Task.Run(async () => await ProcessTasks(queueIndex));
- }
- }
- public async Task ScheduleMessageProcessing(PriorityTaskSchedulerItem item)
- {
- bool addedOk = false;
- while (!addedOk)
- {
- var queue = GetQueueToUse(item.PriorityIndex);
- addedOk = queue.TryAdd(item, _blockingCollectionWaitTimeout);
- }
- await item.TaskCompletionSource.Task;
- }
- private BlockingCollection<PriorityTaskSchedulerItem> GetQueueToUse(byte priorityIndex)
- {
- // compute queue index to use based on priority and current queues load
- }
- private async Task ProcessTasks(int queueIndex)
- {
- foreach (var schedulerItem in _taskQueues[queueIndex].GetConsumingEnumerable())
- {
- try
- {
- Interlocked.Increment(ref _totalInProcess);
- if (_totalInProcess > _maxTotalInProcess)
- _maxTotalInProcess = _totalInProcess;
- if (DiagnosticMode)
- LogSchedulerInfo();
- var task = schedulerItem.ProcessingTask();
- if (task != null)
- await task;
- schedulerItem.TaskCompletionSource.TrySetResult(null);
- }
- catch (Exception ex)
- {
- schedulerItem.TaskCompletionSource.TrySetException(ex);
- if (schedulerItem.ExceptionTask != null)
- await schedulerItem.ExceptionTask(ex);
- }
- finally
- {
- Interlocked.Decrement(ref _totalInProcess);
- if (DiagnosticMode)
- LogSchedulerInfo();
- }
- }
- }
- }
- ```
- The code is functional, but the solution (blocking Tasks, but having the scheduling waiting for the Task to be completed) seems clunky.
- Any suggestions for improving this?
- ## Context
- I was tasked with developing a way to consume RabbitMQ messages that have various priorities (actually different levels of QoS). My restrictions are:
- - high priority messages should have precedence over lower priority messages (must be processed more quickly)
- - low priority messages should not "starve" (i.e. a burst of high-priority messages should not lead to lower priority not being processed at all)
- ## Overview
- After researching several Rabbit MQ libraries, I have picked EasyNetQ which provides some useful abstraction over RabbitMQ .NET client. One of its consumption patterns looks like this:
- ```c#
- var queue = Bus.Advanced.QueueDeclare(Config.QueueName);
- _ = Bus.Advanced.Consume(queue, (body, properties, info, ct) => Task.Run(async () =>
- {
- // this task will be awaited internally
- }, ct),
- cfg =>
- {
- cfg.WithPrefetchCount(Config.PrefetchCount);
- });
- ```
- - The library automatically ACK the message if no exception is raised
- - If an exception passes the end of this handler, a `IConsumerErrorStrategy` (default or custom) is called to handle that exception and decide on retrial, send to some dead letter exchange
- - there is no support for explicit ACK or NACK
- ## Consumption with priorities
- My case requires handling the priorities and I use another class to deal with this.
- ```c#
- public abstract class RabbitMqMessageConsumerBase
- {
- protected IBus Bus { get; private set; }
- private IPriorityTaskScheduler _priorityTaskScheduler = default!;
- public RabbitMqConsumerConfig Config { get; set; } = new();
- protected RabbitMqMessageConsumerBase(IBus bus)
- {
- Bus = bus;
- }
- public void StartConsumingWithPriorities(IPriorityTaskScheduler priorityTaskScheduler)
- {
- _priorityTaskScheduler = priorityTaskScheduler;
- var queue = Bus.Advanced.QueueDeclare(Config.QueueName);
- _ = Bus.Advanced.Consume(queue, (body, properties, info, ct) => Task.Run(async () =>
- {
- // not copying into a byte[] may lead memory corruption and deserialization failure
- var bodyCopy = new byte[body.Length];
- body.Span.CopyTo(bodyCopy);
- await ScheduleMessageProcessing(bodyCopy, properties, info, ct);
- }, ct),
- cfg =>
- {
- cfg.WithPrefetchCount(Config.PrefetchCount);
- });
- }
- private async Task ScheduleMessageProcessing(byte[] bodyCopy, MessageProperties properties, MessageReceivedInfo receivedInfo, CancellationToken ct)
- {
- byte priorityIndex = (byte)(properties.Priority - 1);
- await _priorityTaskScheduler.ScheduleMessageProcessing(new PriorityTaskSchedulerItem
- {
- PriorityIndex = priorityIndex,
- ProcessingTask = async () =>
- {
- await ProcessMessage(bodyCopy, properties, receivedInfo, ct);
- }
- Ct = ct
- });
- }
- ```
- The priority task scheduler places the Task (work to be done) in a BlockingCollection based on a computed priority, effectively delaying its execution if placed after other tasks.
- Processing works as expected (priorities and throughput), but **a Task will be executed in another context than the client consumption one **(that one in which `ProcessTasks` runs).
- This means that without using `TaskCompletionSource` and having the `ScheduleMessageProcessing` awaiting for `TaskCompletionSource's` Task, any raised exception during message processing will not reach the EasyNetQ handler.
- ```c#
- public class PriorityTaskScheduler : IPriorityTaskScheduler
- {
- private readonly int _maxPriority;
- private readonly int _maxParallelism;
- private readonly int _maxQueueSize;
- private readonly int _blockingCollectionWaitTimeout;
- private BlockingCollection<PriorityTaskSchedulerItem>[] _taskQueues = default!;
- private readonly SemaphoreSlim _semaphore = new(1, 1);
- private int _totalInProcess = 0;
- private int _maxTotalInProcess = 0;
- /// <summary>
- /// specifies range in % of the total slots are searchable for current priority
- /// if all slots have some load on them
- /// </summary>
- /// <remarks>index-0 correspond to priority=1</remarks>
- private readonly int[] _prioritySearchableSlotRangePercs;
- /// <summary>
- /// specifies range in number of slots that are searchable for current priority
- /// if all slots have some load on them
- /// </summary>
- private int[] _prioritySearchableSlotRange = default!;
- /// <summary>
- /// this should be set only for local dev testing
- /// </summary>
- public bool DiagnosticMode { get; set; } = false;
- public PriorityTaskScheduler(PriorityTaskSchedulerConfig config)
- {
- _maxPriority = config.MaxPriority;
- _maxParallelism = config.MaxParallelism;
- _maxQueueSize = config.MaxQueueSize;
- _blockingCollectionWaitTimeout = config.BlockingCollectionWaitTimeout;
- _prioritySearchableSlotRangePercs = config.PrioritySearchableSlotRangePercs;
- Init();
- }
- private void Init()
- {
- _taskQueues = new BlockingCollection<PriorityTaskSchedulerItem>[_maxParallelism];
- _prioritySearchableSlotRange = new int[_maxPriority];
- for (int i = 0; i < _maxPriority; i++)
- {
- _prioritySearchableSlotRange[i] = (int)(_maxParallelism * _prioritySearchableSlotRangePercs[i] / 100.0);
- }
- for (int i = 0; i < _maxParallelism; i++)
- {
- _taskQueues[i] = new BlockingCollection<PriorityTaskSchedulerItem>(_maxQueueSize);
- }
- }
- // this is called on application startup
- public void Start()
- {
- for (int i = 0; i < _maxParallelism; i++)
- {
- int queueIndex = i; // local copy for closure
- Task.Run(async () => await ProcessTasks(queueIndex));
- }
- }
- public async Task ScheduleMessageProcessing(PriorityTaskSchedulerItem item)
- {
- bool addedOk = false;
- while (!addedOk)
- {
- var queue = GetQueueToUse(item.PriorityIndex);
- addedOk = queue.TryAdd(item, _blockingCollectionWaitTimeout);
- }
- await item.TaskCompletionSource.Task;
- }
- private BlockingCollection<PriorityTaskSchedulerItem> GetQueueToUse(byte priorityIndex)
- {
- // compute queue index to use based on priority and current queues load
- }
- private async Task ProcessTasks(int queueIndex)
- {
- foreach (var schedulerItem in _taskQueues[queueIndex].GetConsumingEnumerable())
- {
- try
- {
- Interlocked.Increment(ref _totalInProcess);
- if (_totalInProcess > _maxTotalInProcess)
- _maxTotalInProcess = _totalInProcess;
- if (DiagnosticMode)
- LogSchedulerInfo();
- var task = schedulerItem.ProcessingTask();
- if (task != null)
- await task;
- schedulerItem.TaskCompletionSource.TrySetResult(null);
- }
- catch (Exception ex)
- {
- schedulerItem.TaskCompletionSource.TrySetException(ex);
- if (schedulerItem.ExceptionTask != null)
- await schedulerItem.ExceptionTask(ex);
- }
- finally
- {
- Interlocked.Decrement(ref _totalInProcess);
- if (DiagnosticMode)
- LogSchedulerInfo();
- }
- }
- }
- }
- ```
- The code is functional, but the solution (blocking Tasks, but having the scheduling waiting for the Task to be completed) seems clunky.
- Any suggestions for improving this?
#1: Initial revision
Message consumption with priorities and consumer error handling using EasyNetQ library
## Context I was tasked with developing a way to consume RabbitMQ messages that have various priorities (actually different levels of QoS). My restrictions are: - high priority messages should have precedence over lower priority messages (must be processed more quickly) - low priority messages should not "starve" (i.e. a burst of high-priority messages should not lead to lower priority not being processed at all) ## Overview After researching several Rabbit MQ libraries, I have picked EasyNetQ which provides some useful abstraction over RabbitMQ .NET client. One of its consumption patterns looks like this: ```c# var queue = Bus.Advanced.QueueDeclare(Config.QueueName); _ = Bus.Advanced.Consume(queue, (body, properties, info, ct) => Task.Run(async () => { // this task will be awaited internally }, ct), cfg => { cfg.WithPrefetchCount(Config.PrefetchCount); }); ``` - The library automatically ACK the message if no exception is raised - If an exception passes the end of this handler, a `IConsumerErrorStrategy` (default or custom) is called to handle that exception and decide on retrial, send to some dead letter exchange - there is no support for explicit ACK or NACK ## Consumption with priorities My case requires handling the priorities and I use another class to deal with this. ```c# public abstract class RabbitMqMessageConsumerBase { protected IBus Bus { get; private set; } private IPriorityTaskScheduler _priorityTaskScheduler = default!; public RabbitMqConsumerConfig Config { get; set; } = new(); protected RabbitMqMessageConsumerBase(IBus bus) { Bus = bus; } public void StartConsumingWithPriorities(IPriorityTaskScheduler priorityTaskScheduler) { _priorityTaskScheduler = priorityTaskScheduler; var queue = Bus.Advanced.QueueDeclare(Config.QueueName); _ = Bus.Advanced.Consume(queue, (body, properties, info, ct) => Task.Run(async () => { // not copying into a byte[] may lead memory corruption and deserialization failure var bodyCopy = new byte[body.Length]; body.Span.CopyTo(bodyCopy); await ScheduleMessageProcessing(bodyCopy, properties, info, ct); }, ct), cfg => { cfg.WithPrefetchCount(Config.PrefetchCount); }); } private async Task ScheduleMessageProcessing(byte[] bodyCopy, MessageProperties properties, MessageReceivedInfo receivedInfo, CancellationToken ct) { byte priorityIndex = (byte)(properties.Priority - 1); await _priorityTaskScheduler.ScheduleMessageProcessing(new PriorityTaskSchedulerItem { PriorityIndex = priorityIndex, ProcessingTask = async () => { await ProcessMessage(bodyCopy, properties, receivedInfo, ct); }, ExceptionTask = async (ex) => { await ProcessMessageException(ex, bodyCopy, properties, receivedInfo, ct); }, Ct = ct }); } ``` The priority task scheduler places the Task (work to be done) in a BlockingCollection based on a computed priority, effectively delaying its execution if placed after other tasks. Processing works as expected (priorities and throughput), but **a Task will be executed in another context than the client consumption one **(that one in which `ProcessTasks` runs). This means that without using `TaskCompletionSource` and having the `ScheduleMessageProcessing` awaiting for `TaskCompletionSource's` Task, any raised exception during message processing will not reach the EasyNetQ handler. ```c# public class PriorityTaskScheduler : IPriorityTaskScheduler { private readonly int _maxPriority; private readonly int _maxParallelism; private readonly int _maxQueueSize; private readonly int _blockingCollectionWaitTimeout; private BlockingCollection<PriorityTaskSchedulerItem>[] _taskQueues = default!; private readonly SemaphoreSlim _semaphore = new(1, 1); private int _totalInProcess = 0; private int _maxTotalInProcess = 0; /// <summary> /// specifies range in % of the total slots are searchable for current priority /// if all slots have some load on them /// </summary> /// <remarks>index-0 correspond to priority=1</remarks> private readonly int[] _prioritySearchableSlotRangePercs; /// <summary> /// specifies range in number of slots that are searchable for current priority /// if all slots have some load on them /// </summary> private int[] _prioritySearchableSlotRange = default!; /// <summary> /// this should be set only for local dev testing /// </summary> public bool DiagnosticMode { get; set; } = false; public PriorityTaskScheduler(PriorityTaskSchedulerConfig config) { _maxPriority = config.MaxPriority; _maxParallelism = config.MaxParallelism; _maxQueueSize = config.MaxQueueSize; _blockingCollectionWaitTimeout = config.BlockingCollectionWaitTimeout; _prioritySearchableSlotRangePercs = config.PrioritySearchableSlotRangePercs; Init(); } private void Init() { _taskQueues = new BlockingCollection<PriorityTaskSchedulerItem>[_maxParallelism]; _prioritySearchableSlotRange = new int[_maxPriority]; for (int i = 0; i < _maxPriority; i++) { _prioritySearchableSlotRange[i] = (int)(_maxParallelism * _prioritySearchableSlotRangePercs[i] / 100.0); } for (int i = 0; i < _maxParallelism; i++) { _taskQueues[i] = new BlockingCollection<PriorityTaskSchedulerItem>(_maxQueueSize); } } // this is called on application startup public void Start() { for (int i = 0; i < _maxParallelism; i++) { int queueIndex = i; // local copy for closure Task.Run(async () => await ProcessTasks(queueIndex)); } } public async Task ScheduleMessageProcessing(PriorityTaskSchedulerItem item) { bool addedOk = false; while (!addedOk) { var queue = GetQueueToUse(item.PriorityIndex); addedOk = queue.TryAdd(item, _blockingCollectionWaitTimeout); } await item.TaskCompletionSource.Task; } private BlockingCollection<PriorityTaskSchedulerItem> GetQueueToUse(byte priorityIndex) { // compute queue index to use based on priority and current queues load } private async Task ProcessTasks(int queueIndex) { foreach (var schedulerItem in _taskQueues[queueIndex].GetConsumingEnumerable()) { try { Interlocked.Increment(ref _totalInProcess); if (_totalInProcess > _maxTotalInProcess) _maxTotalInProcess = _totalInProcess; if (DiagnosticMode) LogSchedulerInfo(); var task = schedulerItem.ProcessingTask(); if (task != null) await task; schedulerItem.TaskCompletionSource.TrySetResult(null); } catch (Exception ex) { schedulerItem.TaskCompletionSource.TrySetException(ex); if (schedulerItem.ExceptionTask != null) await schedulerItem.ExceptionTask(ex); } finally { Interlocked.Decrement(ref _totalInProcess); if (DiagnosticMode) LogSchedulerInfo(); } } } } ``` The code is functional, but the solution (blocking Tasks, but having the scheduling waiting for the Task to be completed) seems clunky. Any suggestions for improving this?