Welcome to Software Development on Codidact!
Will you help us build our independent community of developers helping developers? We're small and trying to grow. We welcome questions about all aspects of software development, from design to code to QA and more. Got questions? Got answers? Got code you'd like someone to review? Please join us.
Message consumption with priorities and consumer error handling using EasyNetQ library
Context
I was tasked with developing a way to consume RabbitMQ messages that have various priorities (actually different levels of QoS). My restrictions are:
- high priority messages should have precedence over lower priority messages (must be processed more quickly)
- low priority messages should not "starve" (i.e. a burst of high-priority messages should not lead to lower priority not being processed at all)
Overview
After researching several Rabbit MQ libraries, I have picked EasyNetQ which provides some useful abstraction over RabbitMQ .NET client. One of its consumption patterns looks like this:
var queue = Bus.Advanced.QueueDeclare(Config.QueueName);
_ = Bus.Advanced.Consume(queue, (body, properties, info, ct) => Task.Run(async () =>
{
// this task will be awaited internally
}, ct),
cfg =>
{
cfg.WithPrefetchCount(Config.PrefetchCount);
});
- The library automatically ACK the message if no exception is raised
- If an exception passes the end of this handler, a
IConsumerErrorStrategy
(default or custom) is called to handle that exception and decide on retrial, send to some dead letter exchange - there is no support for explicit ACK or NACK
Consumption with priorities
My case requires handling the priorities and I use another class to deal with this.
public abstract class RabbitMqMessageConsumerBase
{
protected IBus Bus { get; private set; }
private IPriorityTaskScheduler _priorityTaskScheduler = default!;
public RabbitMqConsumerConfig Config { get; set; } = new();
protected RabbitMqMessageConsumerBase(IBus bus)
{
Bus = bus;
}
public void StartConsumingWithPriorities(IPriorityTaskScheduler priorityTaskScheduler)
{
_priorityTaskScheduler = priorityTaskScheduler;
var queue = Bus.Advanced.QueueDeclare(Config.QueueName);
_ = Bus.Advanced.Consume(queue, (body, properties, info, ct) => Task.Run(async () =>
{
// not copying into a byte[] may lead memory corruption and deserialization failure
var bodyCopy = new byte[body.Length];
body.Span.CopyTo(bodyCopy);
await ScheduleMessageProcessing(bodyCopy, properties, info, ct);
}, ct),
cfg =>
{
cfg.WithPrefetchCount(Config.PrefetchCount);
});
}
private async Task ScheduleMessageProcessing(byte[] bodyCopy, MessageProperties properties, MessageReceivedInfo receivedInfo, CancellationToken ct)
{
byte priorityIndex = (byte)(properties.Priority - 1);
await _priorityTaskScheduler.ScheduleMessageProcessing(new PriorityTaskSchedulerItem
{
PriorityIndex = priorityIndex,
ProcessingTask = async () =>
{
await ProcessMessage(bodyCopy, properties, receivedInfo, ct);
}
Ct = ct
});
}
The priority task scheduler places the Task (work to be done) in a BlockingCollection based on a computed priority, effectively delaying its execution if placed after other tasks.
Processing works as expected (priorities and throughput), but **a Task will be executed in another context than the client consumption one **(that one in which ProcessTasks
runs).
This means that without using TaskCompletionSource
and having the ScheduleMessageProcessing
awaiting for TaskCompletionSource's
Task, any raised exception during message processing will not reach the EasyNetQ handler.
public class PriorityTaskScheduler : IPriorityTaskScheduler
{
private readonly int _maxPriority;
private readonly int _maxParallelism;
private readonly int _maxQueueSize;
private readonly int _blockingCollectionWaitTimeout;
private BlockingCollection<PriorityTaskSchedulerItem>[] _taskQueues = default!;
private readonly SemaphoreSlim _semaphore = new(1, 1);
private int _totalInProcess = 0;
private int _maxTotalInProcess = 0;
/// <summary>
/// specifies range in % of the total slots are searchable for current priority
/// if all slots have some load on them
/// </summary>
/// <remarks>index-0 correspond to priority=1</remarks>
private readonly int[] _prioritySearchableSlotRangePercs;
/// <summary>
/// specifies range in number of slots that are searchable for current priority
/// if all slots have some load on them
/// </summary>
private int[] _prioritySearchableSlotRange = default!;
/// <summary>
/// this should be set only for local dev testing
/// </summary>
public bool DiagnosticMode { get; set; } = false;
public PriorityTaskScheduler(PriorityTaskSchedulerConfig config)
{
_maxPriority = config.MaxPriority;
_maxParallelism = config.MaxParallelism;
_maxQueueSize = config.MaxQueueSize;
_blockingCollectionWaitTimeout = config.BlockingCollectionWaitTimeout;
_prioritySearchableSlotRangePercs = config.PrioritySearchableSlotRangePercs;
Init();
}
private void Init()
{
_taskQueues = new BlockingCollection<PriorityTaskSchedulerItem>[_maxParallelism];
_prioritySearchableSlotRange = new int[_maxPriority];
for (int i = 0; i < _maxPriority; i++)
{
_prioritySearchableSlotRange[i] = (int)(_maxParallelism * _prioritySearchableSlotRangePercs[i] / 100.0);
}
for (int i = 0; i < _maxParallelism; i++)
{
_taskQueues[i] = new BlockingCollection<PriorityTaskSchedulerItem>(_maxQueueSize);
}
}
// this is called on application startup
public void Start()
{
for (int i = 0; i < _maxParallelism; i++)
{
int queueIndex = i; // local copy for closure
Task.Run(async () => await ProcessTasks(queueIndex));
}
}
public async Task ScheduleMessageProcessing(PriorityTaskSchedulerItem item)
{
bool addedOk = false;
while (!addedOk)
{
var queue = GetQueueToUse(item.PriorityIndex);
addedOk = queue.TryAdd(item, _blockingCollectionWaitTimeout);
}
await item.TaskCompletionSource.Task;
}
private BlockingCollection<PriorityTaskSchedulerItem> GetQueueToUse(byte priorityIndex)
{
// compute queue index to use based on priority and current queues load
}
private async Task ProcessTasks(int queueIndex)
{
foreach (var schedulerItem in _taskQueues[queueIndex].GetConsumingEnumerable())
{
try
{
Interlocked.Increment(ref _totalInProcess);
if (_totalInProcess > _maxTotalInProcess)
_maxTotalInProcess = _totalInProcess;
if (DiagnosticMode)
LogSchedulerInfo();
var task = schedulerItem.ProcessingTask();
if (task != null)
await task;
schedulerItem.TaskCompletionSource.TrySetResult(null);
}
catch (Exception ex)
{
schedulerItem.TaskCompletionSource.TrySetException(ex);
if (schedulerItem.ExceptionTask != null)
await schedulerItem.ExceptionTask(ex);
}
finally
{
Interlocked.Decrement(ref _totalInProcess);
if (DiagnosticMode)
LogSchedulerInfo();
}
}
}
}
The code is functional, but the solution (blocking Tasks, but having the scheduling waiting for the Task to be completed) seems clunky.
Any suggestions for improving this?
0 comment threads