Communities

Writing
Writing
Codidact Meta
Codidact Meta
The Great Outdoors
The Great Outdoors
Photography & Video
Photography & Video
Scientific Speculation
Scientific Speculation
Cooking
Cooking
Electrical Engineering
Electrical Engineering
Judaism
Judaism
Languages & Linguistics
Languages & Linguistics
Software Development
Software Development
Mathematics
Mathematics
Christianity
Christianity
Code Golf
Code Golf
Music
Music
Physics
Physics
Linux Systems
Linux Systems
Power Users
Power Users
Tabletop RPGs
Tabletop RPGs
Community Proposals
Community Proposals
tag:snake search within a tag
answers:0 unanswered questions
user:xxxx search by author id
score:0.5 posts with 0.5+ score
"snake oil" exact phrase
votes:4 posts with 4+ votes
created:<1w created < 1 week ago
post_type:xxxx type of post
Search help
Notifications
Mark all as read See all your notifications »
Code Reviews

Welcome to Software Development on Codidact!

Will you help us build our independent community of developers helping developers? We're small and trying to grow. We welcome questions about all aspects of software development, from design to code to QA and more. Got questions? Got answers? Got code you'd like someone to review? Please join us.

Azure Service Bus queue message consumption in an ASP.Net Core 6 application

+0
−0

My team is introducing Azure Service Bus consumption into the solution and we have developed an implementation that we want to act as a model for other services in the future.

It is split in two parts:

  • a generic part that is supposed to be pushed into a core layer (a NuGet used by all the services
  • a specific part that consumes events with a certain structure, but that allows multiple event types

The generic part

public interface IQueueConsumer : IAsyncDisposable 
{
    Task RegisterOnMessageHandlerAndReceiveMessages(CancellationToken stoppingToken);
}
public interface IQueueProcessor
{
    Task<ProcessedQueueMessage> Process(ServiceBusReceivedMessage receivedMessage);
}
public interface IQueueSettingsBase
{
    public string QueueName { get; }
    public int MaxConcurrentCalls { get; }
    public int PrefetchCount { get; }
}

QueueWorkerBase is the base for setting up a background worker to host the events processing.

public class QueueWorkerBase<TConsumer> : BackgroundService, IAsyncDisposable
    where TConsumer : notnull, IQueueConsumer
{
    private readonly IServiceProvider _serviceProvider;
    private IQueueConsumer _queueConsumer = null!;

    public QueueWorkerBase(IServiceProvider serviceProvider)
    {
        _serviceProvider = serviceProvider;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var scope = _serviceProvider.CreateScope();
        var sp = scope.ServiceProvider;

        _queueConsumer = sp.GetRequiredService<TConsumer>();

        await _queueConsumer.RegisterOnMessageHandlerAndReceiveMessages(stoppingToken).ConfigureAwait(false);
    }

    public async ValueTask DisposeAsync()
    {
        if (_queueConsumer != null)
            await _queueConsumer.DisposeAsync();

        GC.SuppressFinalize(this);
    }
}

QueueConsumerBase offers a generic functionality for consuming from an Azure Service Bus, using a specified processor type (which gets the received event) and ensures that the processing logic is able to log in the current scope (to reduce logged event count; this is outside of this review area).

public abstract class QueueConsumerBase<TSettings, TQueueProcessor> : IQueueConsumer
    where TSettings: class, IQueueSettingsBase
    where TQueueProcessor: class, IQueueProcessor
{
    public abstract string QueueClientName { get; }

    private readonly ServiceBusClient _client;
    private readonly IOptions<TSettings> _settings;
    private readonly IServiceScopeFactory _serviceScopeFactory;
    private readonly ILogger<TQueueProcessor> _logger;

    protected QueueConsumerBase(IServiceBusFactory serviceBusFactory,
        IOptions<TSettings> settings,
        IServiceScopeFactory serviceScopeFactory,
        ILogger<TQueueProcessor> logger)
    {
        _client = serviceBusFactory.GetClient(QueueClientName);
        _settings = settings;
        _serviceScopeFactory = serviceScopeFactory;
        _logger = logger;
    }

    public async Task RegisterOnMessageHandlerAndReceiveMessages(CancellationToken stoppingToken)
    {
        var processor = _client.CreateProcessor(_settings.Value.QueueName, new ServiceBusProcessorOptions
        {
            MaxConcurrentCalls = _settings.Value.MaxConcurrentCalls,
            AutoCompleteMessages = false,
            PrefetchCount = _settings.Value.PrefetchCount,
        });
        processor.ProcessMessageAsync += ProcessMessagesAsync;
        processor.ProcessErrorAsync += ProcessErrorAsync;
        await processor.StartProcessingAsync(stoppingToken);
    }

    private async Task ProcessMessagesAsync(ProcessMessageEventArgs args)
    {
        ProcessedQueueMessage processedQueueMessage = new()
        { 
            State = QueueMessageState.DeadLetter
        };

        try
        {
            processedQueueMessage = await ProcessEntity(args);
        }
        catch (Exception)
        {
            // already logged in ProcessEntity, but do not allow to reach the thread boundary
        }
        finally
        {
            await FinalizeQueueMessage(args, processedQueueMessage).ConfigureAwait(false);
        }
    }

    private async Task<ProcessedQueueMessage> ProcessEntity(ProcessMessageEventArgs args)
    {
        using var scope = _serviceScopeFactory.CreateAsyncScope();
        var sp = scope.ServiceProvider;

        var loggingContextHelper = sp.GetRequiredService<ILoggingContextHelper>();
        var inboundQueueProcessor = sp.GetRequiredService<TQueueProcessor>();

        ProcessedQueueMessage processedQueueMessage = new() { State = QueueMessageState.DeadLetter };

        // this gathers all the "scoped" or context information and performs the logging at the end
        await loggingContextHelper.Log(async () => 
        {
            processedQueueMessage = await inboundQueueProcessor.Process(args.Message);
        }, null);

        return processedQueueMessage;
    }

    private static Task FinalizeQueueMessage(
        ProcessMessageEventArgs args,
        ProcessedQueueMessage processedQueueMessage)
    {
        return processedQueueMessage.State switch
        {
            QueueMessageState.Abandon =>
                args.AbandonMessageAsync(args.Message),
            QueueMessageState.DeadLetter =>
                args.DeadLetterMessageAsync(args.Message, processedQueueMessage.DeadLetterReason, processedQueueMessage.DeadLetterDescription),
            _ =>
                args.CompleteMessageAsync(args.Message)
        };
    }

    private Task ProcessErrorAsync(ProcessErrorEventArgs arg)
    {
        _logger.LogError(arg.Exception, "Message handler encountered an exception");
        return Task.CompletedTask;
    }


    public async ValueTask DisposeAsync()
    {
        if (_client != null)
            await _client.DisposeAsync();

        GC.SuppressFinalize(this);
    }
}

The ServiceBusClient is obtained using a factory:

public interface IServiceBusFactory
{
    ServiceBusClient GetClient(string name);
}

public class ServiceBusFactory : IServiceBusFactory
{
    private readonly IAzureClientFactory<ServiceBusClient> _azureClientFactory;
    public ServiceBusFactory(IAzureClientFactory<ServiceBusClient> azureClientFactory)
    {
        _azureClientFactory = azureClientFactory;
    }

    public ServiceBusClient GetClient(string name)
    {
        return _azureClientFactory.CreateClient(name);
    }
}

The specific part

The specific worker only need to specify the consumer type:

public class InboundQueueWorker : QueueWorkerBase<IInboundQueueConsumer>
{
    public InboundQueueWorker(IServiceProvider serviceProvider) : base(serviceProvider)
    {
    }
}

The consumer specifies the queue settings and the processor type

public class IncomingQueueSettings : IQueueSettingsBase
{
    public const string SectionName = "Queues:InboundQueue";

    public string QueueName { get; set; } = string.Empty;
    public string QueueConnectionString { get; set; } = string.Empty;
    public int MaxConcurrentCalls { get; set; } = 10;
    public int PrefetchCount { get; set; } = 10;
}
public class InboundQueueConsumer : QueueConsumerBase<IncomingQueueSettings, IInboundQueueProcessor>, IInboundQueueConsumer
{
    public override string QueueClientName => "inboundSb";

    public InboundQueueConsumer(
            IServiceBusFactory serviceBusFactory,
            IOptions<IncomingQueueSettings> settings,
            IServiceScopeFactory serviceScopeFactory,
            ILogger<InboundQueueProcessor> logger) :
        base(serviceBusFactory, settings, serviceScopeFactory, logger)
    {
    }
}

The processor gets the service bus queue message and relays to the business layer:

public interface IInboundQueueProcessor : IQueueProcessor
{
}

public class InboundQueueProcessor : IInboundQueueProcessor
{
    private readonly ILoggingContextHelper _loggingHelper;
    private readonly IInboundMsgMapper _inboundMsgMapper;
    private readonly IInboundMsgProcessingService _inboundMsgProcessingService;

    public InboundQueueProcessor(ILoggingContextHelper loggingHelper, IInboundMsgMapper inboundMsgMapper, IInboundMsgProcessingService inboundMsgProcessingService)
    {
        _loggingHelper = loggingHelper;
        _inboundMsgMapper = inboundMsgMapper;
        _inboundMsgProcessingService = inboundMsgProcessingService;
    }

    public async Task<ProcessedQueueMessage> Process(ServiceBusReceivedMessage receivedMessage)
    {
        //InboundContent
        var inboundContent = receivedMessage.Body.ToString().Deserialize<InboundEventGridWrapper<InboundMsgReceivedMessage>>();
        if (inboundContent is null)
            return new ProcessedQueueMessage { State = QueueMessageState.DeadLetter, DeadLetterReason = "Entity not expected" };

        _loggingHelper.CorrelationId = inboundContent.Data.CorrelationId;

        // inboundContent.EventType -> choose the right business processor

        var msg= _inboundMsgMapper.Map(inboundContent.Data);
        await _inboundMsgProcessingService.Process(msg);

        return new ProcessedQueueMessage {  State = QueueMessageState.Complete };
    }
}

The wiring part (DI)

    private static IServiceCollection AddServiceBusClients(this IServiceCollection services, IConfiguration configuration)
    {
        services.AddSingleton<IServiceBusFactory, ServiceBusFactory>();

        services.AddAzureClients(builder =>
        {
            var connStr = configuration["Queues:InboundQueue:QueueConnectionString"];
            builder.AddServiceBusClient(connStr)
                .ConfigureOptions(opt =>
                {
                    opt.RetryOptions = _defaultServiceBusRetryOptions;
                }).WithName("inboundSb");
        });

        services.Configure<IncomingQueueSettings>(configuration.GetSection(IncomingQueueSettings.SectionName));

        services.AddHostedService<InboundQueueWorker>();
        services.AddScoped<IInboundQueueConsumer, InboundQueueConsumer>();
        services.AddScoped<IInboundQueueProcessor, InboundQueueProcessor>();
        services.AddSingleton<IInboundMsgMapper, InboundMsgMapper>();

        return services;
    }

The service bus message processing should be pretty fast as it needs to process up to about 500 messages per second (spread across several pods).

The aspects I am mostly interested in the code review are:

  • performance
  • memory leaks
  • maintainability

What I do not like is the rather big setup for what it seems to be a rather simple service bus queue message consumption.

History
Why does this post require attention from curators or moderators?
You might want to add some details to your flag.
Why should this post be closed?

0 comment threads

0 answers

Sign up to answer this question »