Welcome to Software Development on Codidact!
Will you help us build our independent community of developers helping developers? We're small and trying to grow. We welcome questions about all aspects of software development, from design to code to QA and more. Got questions? Got answers? Got code you'd like someone to review? Please join us.
Azure Service Bus queue message consumption in an ASP.Net Core 6 application
My team is introducing Azure Service Bus consumption into the solution and we have developed an implementation that we want to act as a model for other services in the future.
It is split in two parts:
- a generic part that is supposed to be pushed into a core layer (a NuGet used by all the services
- a specific part that consumes events with a certain structure, but that allows multiple event types
The generic part
public interface IQueueConsumer : IAsyncDisposable
{
Task RegisterOnMessageHandlerAndReceiveMessages(CancellationToken stoppingToken);
}
public interface IQueueProcessor
{
Task<ProcessedQueueMessage> Process(ServiceBusReceivedMessage receivedMessage);
}
public interface IQueueSettingsBase
{
public string QueueName { get; }
public int MaxConcurrentCalls { get; }
public int PrefetchCount { get; }
}
QueueWorkerBase is the base for setting up a background worker to host the events processing.
public class QueueWorkerBase<TConsumer> : BackgroundService, IAsyncDisposable
where TConsumer : notnull, IQueueConsumer
{
private readonly IServiceProvider _serviceProvider;
private IQueueConsumer _queueConsumer = null!;
public QueueWorkerBase(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var scope = _serviceProvider.CreateScope();
var sp = scope.ServiceProvider;
_queueConsumer = sp.GetRequiredService<TConsumer>();
await _queueConsumer.RegisterOnMessageHandlerAndReceiveMessages(stoppingToken).ConfigureAwait(false);
}
public async ValueTask DisposeAsync()
{
if (_queueConsumer != null)
await _queueConsumer.DisposeAsync();
GC.SuppressFinalize(this);
}
}
QueueConsumerBase offers a generic functionality for consuming from an Azure Service Bus, using a specified processor type (which gets the received event) and ensures that the processing logic is able to log in the current scope (to reduce logged event count; this is outside of this review area).
public abstract class QueueConsumerBase<TSettings, TQueueProcessor> : IQueueConsumer
where TSettings: class, IQueueSettingsBase
where TQueueProcessor: class, IQueueProcessor
{
public abstract string QueueClientName { get; }
private readonly ServiceBusClient _client;
private readonly IOptions<TSettings> _settings;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly ILogger<TQueueProcessor> _logger;
protected QueueConsumerBase(IServiceBusFactory serviceBusFactory,
IOptions<TSettings> settings,
IServiceScopeFactory serviceScopeFactory,
ILogger<TQueueProcessor> logger)
{
_client = serviceBusFactory.GetClient(QueueClientName);
_settings = settings;
_serviceScopeFactory = serviceScopeFactory;
_logger = logger;
}
public async Task RegisterOnMessageHandlerAndReceiveMessages(CancellationToken stoppingToken)
{
var processor = _client.CreateProcessor(_settings.Value.QueueName, new ServiceBusProcessorOptions
{
MaxConcurrentCalls = _settings.Value.MaxConcurrentCalls,
AutoCompleteMessages = false,
PrefetchCount = _settings.Value.PrefetchCount,
});
processor.ProcessMessageAsync += ProcessMessagesAsync;
processor.ProcessErrorAsync += ProcessErrorAsync;
await processor.StartProcessingAsync(stoppingToken);
}
private async Task ProcessMessagesAsync(ProcessMessageEventArgs args)
{
ProcessedQueueMessage processedQueueMessage = new()
{
State = QueueMessageState.DeadLetter
};
try
{
processedQueueMessage = await ProcessEntity(args);
}
catch (Exception)
{
// already logged in ProcessEntity, but do not allow to reach the thread boundary
}
finally
{
await FinalizeQueueMessage(args, processedQueueMessage).ConfigureAwait(false);
}
}
private async Task<ProcessedQueueMessage> ProcessEntity(ProcessMessageEventArgs args)
{
using var scope = _serviceScopeFactory.CreateAsyncScope();
var sp = scope.ServiceProvider;
var loggingContextHelper = sp.GetRequiredService<ILoggingContextHelper>();
var inboundQueueProcessor = sp.GetRequiredService<TQueueProcessor>();
ProcessedQueueMessage processedQueueMessage = new() { State = QueueMessageState.DeadLetter };
// this gathers all the "scoped" or context information and performs the logging at the end
await loggingContextHelper.Log(async () =>
{
processedQueueMessage = await inboundQueueProcessor.Process(args.Message);
}, null);
return processedQueueMessage;
}
private static Task FinalizeQueueMessage(
ProcessMessageEventArgs args,
ProcessedQueueMessage processedQueueMessage)
{
return processedQueueMessage.State switch
{
QueueMessageState.Abandon =>
args.AbandonMessageAsync(args.Message),
QueueMessageState.DeadLetter =>
args.DeadLetterMessageAsync(args.Message, processedQueueMessage.DeadLetterReason, processedQueueMessage.DeadLetterDescription),
_ =>
args.CompleteMessageAsync(args.Message)
};
}
private Task ProcessErrorAsync(ProcessErrorEventArgs arg)
{
_logger.LogError(arg.Exception, "Message handler encountered an exception");
return Task.CompletedTask;
}
public async ValueTask DisposeAsync()
{
if (_client != null)
await _client.DisposeAsync();
GC.SuppressFinalize(this);
}
}
The ServiceBusClient
is obtained using a factory:
public interface IServiceBusFactory
{
ServiceBusClient GetClient(string name);
}
public class ServiceBusFactory : IServiceBusFactory
{
private readonly IAzureClientFactory<ServiceBusClient> _azureClientFactory;
public ServiceBusFactory(IAzureClientFactory<ServiceBusClient> azureClientFactory)
{
_azureClientFactory = azureClientFactory;
}
public ServiceBusClient GetClient(string name)
{
return _azureClientFactory.CreateClient(name);
}
}
The specific part
The specific worker only need to specify the consumer type:
public class InboundQueueWorker : QueueWorkerBase<IInboundQueueConsumer>
{
public InboundQueueWorker(IServiceProvider serviceProvider) : base(serviceProvider)
{
}
}
The consumer specifies the queue settings and the processor type
public class IncomingQueueSettings : IQueueSettingsBase
{
public const string SectionName = "Queues:InboundQueue";
public string QueueName { get; set; } = string.Empty;
public string QueueConnectionString { get; set; } = string.Empty;
public int MaxConcurrentCalls { get; set; } = 10;
public int PrefetchCount { get; set; } = 10;
}
public class InboundQueueConsumer : QueueConsumerBase<IncomingQueueSettings, IInboundQueueProcessor>, IInboundQueueConsumer
{
public override string QueueClientName => "inboundSb";
public InboundQueueConsumer(
IServiceBusFactory serviceBusFactory,
IOptions<IncomingQueueSettings> settings,
IServiceScopeFactory serviceScopeFactory,
ILogger<InboundQueueProcessor> logger) :
base(serviceBusFactory, settings, serviceScopeFactory, logger)
{
}
}
The processor gets the service bus queue message and relays to the business layer:
public interface IInboundQueueProcessor : IQueueProcessor
{
}
public class InboundQueueProcessor : IInboundQueueProcessor
{
private readonly ILoggingContextHelper _loggingHelper;
private readonly IInboundMsgMapper _inboundMsgMapper;
private readonly IInboundMsgProcessingService _inboundMsgProcessingService;
public InboundQueueProcessor(ILoggingContextHelper loggingHelper, IInboundMsgMapper inboundMsgMapper, IInboundMsgProcessingService inboundMsgProcessingService)
{
_loggingHelper = loggingHelper;
_inboundMsgMapper = inboundMsgMapper;
_inboundMsgProcessingService = inboundMsgProcessingService;
}
public async Task<ProcessedQueueMessage> Process(ServiceBusReceivedMessage receivedMessage)
{
//InboundContent
var inboundContent = receivedMessage.Body.ToString().Deserialize<InboundEventGridWrapper<InboundMsgReceivedMessage>>();
if (inboundContent is null)
return new ProcessedQueueMessage { State = QueueMessageState.DeadLetter, DeadLetterReason = "Entity not expected" };
_loggingHelper.CorrelationId = inboundContent.Data.CorrelationId;
// inboundContent.EventType -> choose the right business processor
var msg= _inboundMsgMapper.Map(inboundContent.Data);
await _inboundMsgProcessingService.Process(msg);
return new ProcessedQueueMessage { State = QueueMessageState.Complete };
}
}
The wiring part (DI)
private static IServiceCollection AddServiceBusClients(this IServiceCollection services, IConfiguration configuration)
{
services.AddSingleton<IServiceBusFactory, ServiceBusFactory>();
services.AddAzureClients(builder =>
{
var connStr = configuration["Queues:InboundQueue:QueueConnectionString"];
builder.AddServiceBusClient(connStr)
.ConfigureOptions(opt =>
{
opt.RetryOptions = _defaultServiceBusRetryOptions;
}).WithName("inboundSb");
});
services.Configure<IncomingQueueSettings>(configuration.GetSection(IncomingQueueSettings.SectionName));
services.AddHostedService<InboundQueueWorker>();
services.AddScoped<IInboundQueueConsumer, InboundQueueConsumer>();
services.AddScoped<IInboundQueueProcessor, InboundQueueProcessor>();
services.AddSingleton<IInboundMsgMapper, InboundMsgMapper>();
return services;
}
The service bus message processing should be pretty fast as it needs to process up to about 500 messages per second (spread across several pods).
The aspects I am mostly interested in the code review are:
- performance
- memory leaks
- maintainability
What I do not like is the rather big setup for what it seems to be a rather simple service bus queue message consumption.
0 comment threads