Using an ASP.NET Core IHostedService to run Azure Service Bus subscriptions and consumers

This post shows how Azure Service bus subscription for topics or consumers for a queue, or can be used inside an ASP.NET Core application. The Azure Service Bus client listens to events and needs to be started, stopped and registered to the topic to receive messages. An IHostedService is used for this.

Code: https://github.com/damienbod/AspNetCoreServiceBus

Posts in this series:

The ServiceBusTopicSubscription class is used to setup the Azure Service bus subscription. The class uses the ServiceBusClient to set up the message handler, the ServiceBusAdministrationClient is used to implement filters and add or remove these rules. The Azure.Messaging.ServiceBus Nuget package is used to connect to the subscription.

using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace ServiceBusMessaging
{
    public class ServiceBusTopicSubscription : IServiceBusTopicSubscription
    {
        private readonly IProcessData _processData;
        private readonly IConfiguration _configuration;
        private const string TOPIC_PATH = "mytopic";
        private const string SUBSCRIPTION_NAME = "mytopicsubscription";
        private readonly ILogger _logger;
        private readonly ServiceBusClient _client;
        private readonly ServiceBusAdministrationClient _adminClient;
        private ServiceBusProcessor _processor;

        public ServiceBusTopicSubscription(IProcessData processData,
            IConfiguration configuration,
            ILogger<ServiceBusTopicSubscription> logger)
        {
            _processData = processData;
            _configuration = configuration;
            _logger = logger;

            var connectionString = _configuration.GetConnectionString("ServiceBusConnectionString");
            _client = new ServiceBusClient(connectionString);
            _adminClient = new ServiceBusAdministrationClient(connectionString);
        }

        public async Task PrepareFiltersAndHandleMessages()
        {
            ServiceBusProcessorOptions _serviceBusProcessorOptions = new ServiceBusProcessorOptions
            {
                MaxConcurrentCalls = 1,
                AutoCompleteMessages = false,
            };

            _processor = _client.CreateProcessor(TOPIC_PATH, SUBSCRIPTION_NAME, _serviceBusProcessorOptions);
            _processor.ProcessMessageAsync += ProcessMessagesAsync;
            _processor.ProcessErrorAsync += ProcessErrorAsync;

            await RemoveDefaultFilters().ConfigureAwait(false);
            await AddFilters().ConfigureAwait(false);

            await _processor.StartProcessingAsync().ConfigureAwait(false);
        }

        private async Task RemoveDefaultFilters()
        {
            try
            {
                var rules = _adminClient.GetRulesAsync(TOPIC_PATH, SUBSCRIPTION_NAME);
                var ruleProperties = new List<RuleProperties>();
                await foreach (var rule in rules)
                {
                    ruleProperties.Add(rule);
                }

                foreach (var rule in ruleProperties)
                {
                    if (rule.Name == "GoalsGreaterThanSeven")
                    {
                        await _adminClient.DeleteRuleAsync(TOPIC_PATH, SUBSCRIPTION_NAME, "GoalsGreaterThanSeven")
                            .ConfigureAwait(false);
                    }
                }
            }
            catch (Exception ex)
            {
                _logger.LogWarning(ex.ToString());
            }
        }

        private async Task AddFilters()
        {
            try
            {
                var rules = _adminClient.GetRulesAsync(TOPIC_PATH, SUBSCRIPTION_NAME)
                    .ConfigureAwait(false);

                var ruleProperties = new List<RuleProperties>();
                await foreach (var rule in rules)
                {
                    ruleProperties.Add(rule);
                }

                if (!ruleProperties.Any(r => r.Name == "GoalsGreaterThanSeven"))
                {
                    CreateRuleOptions createRuleOptions = new CreateRuleOptions
                    {
                        Name = "GoalsGreaterThanSeven",
                        Filter = new SqlRuleFilter("goals > 7")
                    };
                    await _adminClient.CreateRuleAsync(TOPIC_PATH, SUBSCRIPTION_NAME, createRuleOptions)
                        .ConfigureAwait(false);
                }
            }
            catch (Exception ex)
            {
                _logger.LogWarning(ex.ToString());
            }
        }

        private async Task ProcessMessagesAsync(ProcessMessageEventArgs args)
        {
            var myPayload = args.Message.Body.ToObjectFromJson<MyPayload>();
            await _processData.Process(myPayload).ConfigureAwait(false);
            await args.CompleteMessageAsync(args.Message).ConfigureAwait(false);
        }

        private Task ProcessErrorAsync(ProcessErrorEventArgs arg)
        {
            _logger.LogError(arg.Exception, "Message handler encountered an exception");
            _logger.LogDebug($"- ErrorSource: {arg.ErrorSource}");
            _logger.LogDebug($"- Entity Path: {arg.EntityPath}");
            _logger.LogDebug($"- FullyQualifiedNamespace: {arg.FullyQualifiedNamespace}");

            return Task.CompletedTask;
        }

        public async ValueTask DisposeAsync()
        {
            if (_processor != null)
            {
                await _processor.DisposeAsync().ConfigureAwait(false);
            }

            if (_client != null)
            {
                await _client.DisposeAsync().ConfigureAwait(false);
            }
        }

        public async Task CloseSubscriptionAsync()
        {
            await _processor.CloseAsync().ConfigureAwait(false);
        }
    }
}

The WorkerServiceBus class implements the IHostedService interface and uses the IServiceBusTopicSubscription interface to subscribe to an Azure Service Bus topic. The StartAsync method is used to register the subscription using the RegisterOnMessageHandlerAndReceiveMessages method. The interface provides a start, and stop and a dispose. The Azure Service Bus class is controlled using this hosted service. If needed, a periodic task could be implemented to run health checks on the client or whatever.

public class WorkerServiceBus : IHostedService, IDisposable
{
	private readonly ILogger<WorkerServiceBus> _logger;
	private readonly IServiceBusConsumer _serviceBusConsumer;
	private readonly IServiceBusTopicSubscription _serviceBusTopicSubscription;

	public WorkerServiceBus(IServiceBusConsumer serviceBusConsumer,
		IServiceBusTopicSubscription serviceBusTopicSubscription,
		ILogger<WorkerServiceBus> logger)
	{
		_serviceBusConsumer = serviceBusConsumer;
		_serviceBusTopicSubscription = serviceBusTopicSubscription;
		_logger = logger;
	}

	public async Task StartAsync(CancellationToken stoppingToken)
	{
		_logger.LogDebug("Starting the service bus queue consumer and the subscription");
		await _serviceBusConsumer.RegisterOnMessageHandlerAndReceiveMessages().ConfigureAwait(false);
		await _serviceBusTopicSubscription.PrepareFiltersAndHandleMessages().ConfigureAwait(false);
	}

	public async Task StopAsync(CancellationToken stoppingToken)
	{
		_logger.LogDebug("Stopping the service bus queue consumer and the subscription");
		await _serviceBusConsumer.CloseQueueAsync().ConfigureAwait(false);
		await _serviceBusTopicSubscription.CloseSubscriptionAsync().ConfigureAwait(false);
	}

	public void Dispose()
	{
		Dispose(true);
		GC.SuppressFinalize(this);
	}

	protected virtual async void Dispose(bool disposing)
	{
		if (disposing)
		{
			await _serviceBusConsumer.DisposeAsync().ConfigureAwait(false);
			await _serviceBusTopicSubscription.DisposeAsync().ConfigureAwait(false);
		}
	}
}

The IHostedService is added to the services in the ConfigureServices method. The AddHostedService is used to initialize this. Now the Azure Service bus subscription can be managed and consume messages from the topic subscription or a queue is used.

public void ConfigureServices(IServiceCollection services)
{
	services.AddControllers();

	var connection = Configuration.GetConnectionString("DefaultConnection");

	services.AddDbContext<PayloadContext>(options =>
		options.UseSqlite(connection));

	services.AddSingleton<IServiceBusConsumer, ServiceBusConsumer>();
	services.AddSingleton<IServiceBusTopicSubscription, ServiceBusTopicSubscription>();
	services.AddSingleton<IProcessData, ProcessData>();

	services.AddHostedService<WorkerServiceBus>();

	services.AddSwaggerGen(c =>
	{
		c.SwaggerDoc("v1", new OpenApiInfo
		{
			Version = "v1",
			Title = "Payload API",
		});
	});
}

When the application is run, the messages can be sent to the topic and are received using the IHostedService Azure Service Bus subscription.

Links:

https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/servicebus/Azure.Messaging.ServiceBus

https://docs.microsoft.com/en-us/azure/service-bus-messaging/

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues

https://docs.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services

3 comments

  1. […] Using an ASP.NET Core IHostedService to run Azure Service Bus subscriptions and consumers (Damien Bowden) […]

  2. […] & Jeremy Rickard) Got Bots? 3: Making Bots with QnA Maker + Azure Bot Service (Chloe Condon) Using an ASP.NET Core IHostedService to run Azure Service Bus subscriptions and consumers (Damien Bowden) Using Cosmos DB as an ASP.NET session state and caching provider (Matias Quaranta) […]

  3. […] Using an ASP.NET Core IHostedService to run Azure Service Bus subscriptions and consumers – Damien Bowden […]

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: