Using Azure Service Bus Topics Subscription Filters in ASP.NET Core

This article shows how to implement Azure Service Bus filters for topic subscriptions used in an ASP.NET Core API application. The application uses the Microsoft.Azure.ServiceBus NuGet package for all the Azure Service Bus client logic.

Code: https://github.com/damienbod/AspNetCoreServiceBus

Posts in this series:

History

2021-05-18 Updated .NET, Azure Service Bus SDK

Azure Service Bus Topic Sender

The topic sender from the previous post was changed to add a UserProperties item to the message called goals which will be filtered. Otherwise the sender is as before and sends the messages to the topic.

using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using System;
using System.Text.Json;
using System.Threading.Tasks;

namespace ServiceBusMessaging
{
    public class ServiceBusTopicSender
    {
        private const string TOPIC_PATH = "mytopic";
        private readonly ILogger _logger;
        private readonly ServiceBusClient _client;
        private readonly Azure.Messaging.ServiceBus.ServiceBusSender _clientSender;

        public ServiceBusTopicSender(IConfiguration configuration, 
            ILogger<ServiceBusTopicSender> logger)
        {
            _logger = logger;

            var connectionString = configuration.GetConnectionString("ServiceBusConnectionString");
            _client = new ServiceBusClient(connectionString);
            _clientSender = _client.CreateSender(TOPIC_PATH);
        }
        
        public async Task SendMessage(MyPayload payload)
        {
            string messagePayload = JsonSerializer.Serialize(payload);
            ServiceBusMessage message = new ServiceBusMessage(messagePayload);

            message.ApplicationProperties.Add("goals", payload.Goals);

            try
            {
                await _clientSender.SendMessageAsync(message).ConfigureAwait(false);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
            }
        }
    }
}

It is not possible to add a subscription filter to the topic using the Azure portal. To do this you need to implement it in code, or used scripts, or the Azure CLI.

The RemoveDefaultFilters method checks if the default filter exists, and if it does it is removed. It does not remove the other filters.

private async Task RemoveDefaultFilters()
{
	try
	{
		var rules = _adminClient.GetRulesAsync(TOPIC_PATH, SUBSCRIPTION_NAME);
		var ruleProperties = new List<RuleProperties>();
		await foreach (var rule in rules)
		{
			ruleProperties.Add(rule);
		}

		foreach (var rule in ruleProperties)
		{
			if (rule.Name == "GoalsGreaterThanSeven")
			{
				await _adminClient.DeleteRuleAsync(TOPIC_PATH, SUBSCRIPTION_NAME, "GoalsGreaterThanSeven")
					.ConfigureAwait(false);
			}
		}
	}
	catch (Exception ex)
	{
		_logger.LogWarning(ex.ToString());
	}
}

The AddFilters method adds the new filter, if it is not already added. The filter in this demo will use the goals user property from the message and only subscribe to messages with a value greater than 7.

private async Task AddFilters()
{
	try
	{
		var rules = _adminClient.GetRulesAsync(TOPIC_PATH, SUBSCRIPTION_NAME)
			.ConfigureAwait(false);

		var ruleProperties = new List<RuleProperties>();
		await foreach (var rule in rules)
		{
			ruleProperties.Add(rule);
		}

		if (!ruleProperties.Any(r => r.Name == "GoalsGreaterThanSeven"))
		{
			CreateRuleOptions createRuleOptions = new CreateRuleOptions
			{
				Name = "GoalsGreaterThanSeven",
				Filter = new SqlRuleFilter("goals > 7")
			};
			await _adminClient.CreateRuleAsync(TOPIC_PATH, SUBSCRIPTION_NAME, createRuleOptions)
				.ConfigureAwait(false);
		}
	}
	catch (Exception ex)
	{
		_logger.LogWarning(ex.ToString());
	}
}

The filter methods are added to the PrepareFiltersAndHandleMessages method. This sets up the filters, or makes sure the filters are correct on the Azure Service Bus, and then registers itself to the topic subscription to receive the messages form its subscription.

public async Task PrepareFiltersAndHandleMessages()
{
	ServiceBusProcessorOptions _serviceBusProcessorOptions = new ServiceBusProcessorOptions
	{
		MaxConcurrentCalls = 1,
		AutoCompleteMessages = false,
	};

	_processor = _client.CreateProcessor(TOPIC_PATH, SUBSCRIPTION_NAME, _serviceBusProcessorOptions);
	_processor.ProcessMessageAsync += ProcessMessagesAsync;
	_processor.ProcessErrorAsync += ProcessErrorAsync;

	await RemoveDefaultFilters();
	await AddFilters();

	await _processor.StartProcessingAsync().ConfigureAwait(false);
}

The Azure Service Bus classes are added to the ASP.NET Core application in the Startup class. This adds the services to the IoC and initializes the message listener.

public void ConfigureServices(IServiceCollection services)
{
	services.AddControllers();

	var connection = Configuration.GetConnectionString("DefaultConnection");

	services.AddDbContext<PayloadContext>(options =>
		options.UseSqlite(connection));

	services.AddSingleton<IServiceBusConsumer, ServiceBusConsumer>();
	services.AddSingleton<IServiceBusTopicSubscription, ServiceBusTopicSubscription>();
	services.AddSingleton<IProcessData, ProcessData>();

	services.AddSwaggerGen(c =>
	{
		c.SwaggerDoc("v1", new OpenApiInfo
		{
			Version = "v1",
			Title = "Payload API",
		});
	});
}

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
	// ....

	var bus = app.ApplicationServices.GetService<IServiceBusConsumer>();
	bus.RegisterOnMessageHandlerAndReceiveMessages().GetAwaiter().GetResult();

	var busSubscription = app.ApplicationServices.GetService<IServiceBusTopicSubscription>();
	busSubscription.PrepareFiltersAndHandleMessages().GetAwaiter().GetResult();
}



When the applications are started, the API2 only receives messages which have a goal value greater than seven.

Links:

https://docs.microsoft.com/en-us/azure/service-bus-messaging/

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues

https://www.nuget.org/packages/Microsoft.Azure.ServiceBus

Azure Service Bus Topologies

https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/multi-container-microservice-net-applications/integration-event-based-microservice-communications

Always subscribe to Dead-lettered messages when using an Azure Service Bus

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: