Communities

Writing
Writing
Codidact Meta
Codidact Meta
The Great Outdoors
The Great Outdoors
Photography & Video
Photography & Video
Scientific Speculation
Scientific Speculation
Cooking
Cooking
Electrical Engineering
Electrical Engineering
Judaism
Judaism
Languages & Linguistics
Languages & Linguistics
Software Development
Software Development
Mathematics
Mathematics
Christianity
Christianity
Code Golf
Code Golf
Music
Music
Physics
Physics
Linux Systems
Linux Systems
Power Users
Power Users
Tabletop RPGs
Tabletop RPGs
Community Proposals
Community Proposals
tag:snake search within a tag
answers:0 unanswered questions
user:xxxx search by author id
score:0.5 posts with 0.5+ score
"snake oil" exact phrase
votes:4 posts with 4+ votes
created:<1w created < 1 week ago
post_type:xxxx type of post
Search help
Notifications
Mark all as read See all your notifications »
Code Reviews

Welcome to Software Development on Codidact!

Will you help us build our independent community of developers helping developers? We're small and trying to grow. We welcome questions about all aspects of software development, from design to code to QA and more. Got questions? Got answers? Got code you'd like someone to review? Please join us.

Message consumption with priorities and consumer error handling using EasyNetQ library

+1
−0

Context

I was tasked with developing a way to consume RabbitMQ messages that have various priorities (actually different levels of QoS). My restrictions are:

  • high priority messages should have precedence over lower priority messages (must be processed more quickly)
  • low priority messages should not "starve" (i.e. a burst of high-priority messages should not lead to lower priority not being processed at all)

Overview

After researching several Rabbit MQ libraries, I have picked EasyNetQ which provides some useful abstraction over RabbitMQ .NET client. One of its consumption patterns looks like this:

var queue = Bus.Advanced.QueueDeclare(Config.QueueName);
_ = Bus.Advanced.Consume(queue, (body, properties, info, ct) => Task.Run(async () =>
{
   // this task will be awaited internally
}, ct),
cfg =>
{
	cfg.WithPrefetchCount(Config.PrefetchCount);
});
  • The library automatically ACK the message if no exception is raised
  • If an exception passes the end of this handler, a IConsumerErrorStrategy (default or custom) is called to handle that exception and decide on retrial, send to some dead letter exchange
  • there is no support for explicit ACK or NACK

Consumption with priorities

My case requires handling the priorities and I use another class to deal with this.

public abstract class RabbitMqMessageConsumerBase
{
    protected IBus Bus { get; private set; }
    private IPriorityTaskScheduler _priorityTaskScheduler = default!;

    public RabbitMqConsumerConfig Config { get; set; } = new();

    protected RabbitMqMessageConsumerBase(IBus bus)
    {
        Bus = bus;
    }
	
    public void StartConsumingWithPriorities(IPriorityTaskScheduler priorityTaskScheduler)
    {
        _priorityTaskScheduler = priorityTaskScheduler;

        var queue = Bus.Advanced.QueueDeclare(Config.QueueName);
        _ = Bus.Advanced.Consume(queue, (body, properties, info, ct) => Task.Run(async () =>
        {
            // not copying into a byte[] may lead memory corruption and deserialization failure
            var bodyCopy = new byte[body.Length];
            body.Span.CopyTo(bodyCopy);

            await ScheduleMessageProcessing(bodyCopy, properties, info, ct);
        }, ct),
        cfg =>
        {
            cfg.WithPrefetchCount(Config.PrefetchCount);
        });
    }

    private async Task ScheduleMessageProcessing(byte[] bodyCopy, MessageProperties properties, MessageReceivedInfo receivedInfo, CancellationToken ct)
    {
        byte priorityIndex = (byte)(properties.Priority - 1);

        await _priorityTaskScheduler.ScheduleMessageProcessing(new PriorityTaskSchedulerItem
        {
            PriorityIndex = priorityIndex,
            ProcessingTask = async () =>
            {
                await ProcessMessage(bodyCopy, properties, receivedInfo, ct);
            }
            Ct = ct
        });
    }

The priority task scheduler places the Task (work to be done) in a BlockingCollection based on a computed priority, effectively delaying its execution if placed after other tasks.

Processing works as expected (priorities and throughput), but **a Task will be executed in another context than the client consumption one **(that one in which ProcessTasks runs).

This means that without using TaskCompletionSource and having the ScheduleMessageProcessing awaiting for TaskCompletionSource's Task, any raised exception during message processing will not reach the EasyNetQ handler.

public class PriorityTaskScheduler : IPriorityTaskScheduler
{
    private readonly int _maxPriority;
    private readonly int _maxParallelism;
    private readonly int _maxQueueSize;
    private readonly int _blockingCollectionWaitTimeout;

    private BlockingCollection<PriorityTaskSchedulerItem>[] _taskQueues = default!;
    private readonly SemaphoreSlim _semaphore = new(1, 1);

    private int _totalInProcess = 0;
    private int _maxTotalInProcess = 0;


    /// <summary>
    /// specifies range in % of the total slots are searchable for current priority
    /// if all slots have some load on them
    /// </summary>
    /// <remarks>index-0 correspond to priority=1</remarks>
    private readonly int[] _prioritySearchableSlotRangePercs;

    /// <summary>
    /// specifies range in number of slots that are searchable for current priority
    /// if all slots have some load on them
    /// </summary>
    private int[] _prioritySearchableSlotRange = default!;

    /// <summary>
    /// this should be set only for local dev testing
    /// </summary>
    public bool DiagnosticMode { get; set; } = false;

    public PriorityTaskScheduler(PriorityTaskSchedulerConfig config)
    {
        _maxPriority = config.MaxPriority;
        _maxParallelism = config.MaxParallelism;
        _maxQueueSize = config.MaxQueueSize;
        _blockingCollectionWaitTimeout = config.BlockingCollectionWaitTimeout;
        _prioritySearchableSlotRangePercs = config.PrioritySearchableSlotRangePercs;

        Init();
    }

    private void Init()
    {
        _taskQueues = new BlockingCollection<PriorityTaskSchedulerItem>[_maxParallelism];
        _prioritySearchableSlotRange = new int[_maxPriority];

        for (int i = 0; i < _maxPriority; i++)
        {
            _prioritySearchableSlotRange[i] = (int)(_maxParallelism * _prioritySearchableSlotRangePercs[i] / 100.0);
        }

        for (int i = 0; i < _maxParallelism; i++)
        {
            _taskQueues[i] = new BlockingCollection<PriorityTaskSchedulerItem>(_maxQueueSize);
        }
    }

    // this is called on application startup
    public void Start()
    {
        for (int i = 0; i < _maxParallelism; i++)
        {
            int queueIndex = i; // local copy for closure
            Task.Run(async () => await ProcessTasks(queueIndex));
        }
    }

    public async Task ScheduleMessageProcessing(PriorityTaskSchedulerItem item)
    {
        bool addedOk = false;
        while (!addedOk)
        {
            var queue = GetQueueToUse(item.PriorityIndex);
            addedOk = queue.TryAdd(item, _blockingCollectionWaitTimeout);
        }

        await item.TaskCompletionSource.Task;
    }

    private BlockingCollection<PriorityTaskSchedulerItem> GetQueueToUse(byte priorityIndex)
    {
         // compute queue index to use based on priority and current queues load
    }

     private async Task ProcessTasks(int queueIndex)
    {
        foreach (var schedulerItem in _taskQueues[queueIndex].GetConsumingEnumerable())
        {
            try
            {
                Interlocked.Increment(ref _totalInProcess);
                if (_totalInProcess > _maxTotalInProcess)
                    _maxTotalInProcess = _totalInProcess;

                if (DiagnosticMode)
                    LogSchedulerInfo();

                var task = schedulerItem.ProcessingTask();
                if (task != null)
                    await task;

                schedulerItem.TaskCompletionSource.TrySetResult(null);
            }
            catch (Exception ex)
            {
                schedulerItem.TaskCompletionSource.TrySetException(ex);

                if (schedulerItem.ExceptionTask != null)
                    await schedulerItem.ExceptionTask(ex);
            }
            finally
            {
                Interlocked.Decrement(ref _totalInProcess);

                if (DiagnosticMode)
                    LogSchedulerInfo();
            }
        }
    }
}

The code is functional, but the solution (blocking Tasks, but having the scheduling waiting for the Task to be completed) seems clunky.

Any suggestions for improving this?

History
Why does this post require moderator attention?
You might want to add some details to your flag.
Why should this post be closed?

0 comment threads

0 answers

Sign up to answer this question »