Using Azure Service Bus Topics Subscription Filters in ASP.NET Core

This article shows how to implement Azure Service Bus filters for topic subscriptions used in an ASP.NET Core API application. The application uses the Microsoft.Azure.ServiceBus NuGet package for all the Azure Service Bus client logic.

Code: https://github.com/damienbod/AspNetCoreServiceBus

Posts in this series:

Azure Service Bus Topic Sender

The topic sender from the previous post was changed to add a UserProperties item to the message called goals which will be filtered. Otherwise the sender is as before and sends the messages to the topic.

using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;
using System.Text;
using System.Threading.Tasks;

namespace ServiceBusMessaging
{
    public class ServiceBusTopicSender
    {
        private readonly TopicClient _topicClient;
        private readonly IConfiguration _configuration;
        private const string TOPIC_PATH = "mytopic";
        private readonly ILogger _logger;

        public ServiceBusTopicSender(IConfiguration configuration, 
            ILogger<ServiceBusTopicSender> logger)
        {
            _configuration = configuration;
            _logger = logger;
            _topicClient = new TopicClient(
                _configuration.GetConnectionString("ServiceBusConnectionString"),
                TOPIC_PATH
            );
        }
        
        public async Task SendMessage(MyPayload payload)
        {
            string data = JsonConvert.SerializeObject(payload);
            Message message = new Message(Encoding.UTF8.GetBytes(data));
            message.UserProperties.Add("goals", payload.Goals);

            try
            {
                await _topicClient.SendAsync(message);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
            }
        }
        
    }
}

It is not possible to add a subscription filter to the topic using the Azure portal. To do this you need to implement it in code, or used scripts, or the Azure CLI.

The RemoveDefaultFilters method checks if the default filter exists, and if it does it is removed. It does not remove the other filters.

private async Task RemoveDefaultFilters()
{
	try
	{
		var rules = await _subscriptionClient.GetRulesAsync();
		foreach(var rule in rules)
		{
			if(rule.Name == RuleDescription.DefaultRuleName)
			{
				await _subscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName);
			}
		}
		
	}
	catch (Exception ex)
	{
		_logger.LogWarning(ex.ToString());
	}
}

The AddFilters method adds the new filter, if it is not already added. The filter in this demo will use the goals user property from the message and only subscribe to messages with a value greater than 7.

private async Task AddFilters()
{
	try
	{
		var rules = await _subscriptionClient.GetRulesAsync();
		if(!rules.Any(r => r.Name == "GoalsGreaterThanSeven"))
		{
			var filter = new SqlFilter("goals > 7");
			await _subscriptionClient.AddRuleAsync("GoalsGreaterThanSeven", filter);
		}
	}
	catch (Exception ex)
	{
		_logger.LogWarning(ex.ToString());
	}
}

The filter methods are added to the PrepareFiltersAndHandleMessages method. This sets up the filters, or makes sure the filters are correct on the Azure Service Bus, and then registers itself to the topic subscription to receive the messages form its subscription.

public async Task PrepareFiltersAndHandleMessages()
{
	await RemoveDefaultFilters();
	await AddFilters();

	var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
	{
		MaxConcurrentCalls = 1,
		AutoComplete = false,
	};

	_subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
}

The Azure Service Bus classes are added to the ASP.NET Core application in the Startup class. This adds the services to the IoC and initializes the message listener.

using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.OpenApi.Models;
using ServiceBusMessaging;
using System.Threading.Tasks;

namespace AspNetCoreServiceBusApi2
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        public void ConfigureServices(IServiceCollection services)
        {
            ...
			
            services.AddSingleton<IServiceBusTopicSubscription, ServiceBusTopicSubscription>();
            services.AddTransient<IProcessData, ProcessData>();

        }

        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            ...
			
            var busSubscription = app.ApplicationServices.GetService<IServiceBusTopicSubscription>();
            busSubscription.PrepareFiltersAndHandleMessages().GetAwaiter().GetResult();
        }
    }
}

When the applications are started, the API2 only receives messages which have a goal value greater than seven.

Links:

https://docs.microsoft.com/en-us/azure/service-bus-messaging/

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues

https://www.nuget.org/packages/Microsoft.Azure.ServiceBus

Azure Service Bus Topologies

https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/multi-container-microservice-net-applications/integration-event-based-microservice-communications

Always subscribe to Dead-lettered messages when using an Azure Service Bus

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: