This article shows how to implement Azure Service Bus filters for topic subscriptions used in an ASP.NET Core API application. The application uses the Microsoft.Azure.ServiceBus NuGet package for all the Azure Service Bus client logic.
Code: https://github.com/damienbod/AspNetCoreServiceBus
Posts in this series:
- Using Azure Service Bus Queues with ASP.NET Core Services
- Using Azure Service Bus Topics in ASP.NET Core
- Using Azure Service Bus Topics Subscription Filters in ASP.NET Core
- Using Entity Framework Core to process Azure Service Messages in ASP.NET Core
- Using an Azure Service Bus Topic Subscription in an Azure Function
- Using Azure Service Bus with restricted access
History
2021-05-18 Updated .NET, Azure Service Bus SDK
Azure Service Bus Topic Sender
The topic sender from the previous post was changed to add a UserProperties item to the message called goals which will be filtered. Otherwise the sender is as before and sends the messages to the topic.
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using System;
using System.Text.Json;
using System.Threading.Tasks;
namespace ServiceBusMessaging
{
public class ServiceBusTopicSender
{
private const string TOPIC_PATH = "mytopic";
private readonly ILogger _logger;
private readonly ServiceBusClient _client;
private readonly Azure.Messaging.ServiceBus.ServiceBusSender _clientSender;
public ServiceBusTopicSender(IConfiguration configuration,
ILogger<ServiceBusTopicSender> logger)
{
_logger = logger;
var connectionString = configuration.GetConnectionString("ServiceBusConnectionString");
_client = new ServiceBusClient(connectionString);
_clientSender = _client.CreateSender(TOPIC_PATH);
}
public async Task SendMessage(MyPayload payload)
{
string messagePayload = JsonSerializer.Serialize(payload);
ServiceBusMessage message = new ServiceBusMessage(messagePayload);
message.ApplicationProperties.Add("goals", payload.Goals);
try
{
await _clientSender.SendMessageAsync(message).ConfigureAwait(false);
}
catch (Exception e)
{
_logger.LogError(e.Message);
}
}
}
}
It is not possible to add a subscription filter to the topic using the Azure portal. To do this you need to implement it in code, or used scripts, or the Azure CLI.
The RemoveDefaultFilters method checks if the default filter exists, and if it does it is removed. It does not remove the other filters.
private async Task RemoveDefaultFilters()
{
try
{
var rules = _adminClient.GetRulesAsync(TOPIC_PATH, SUBSCRIPTION_NAME);
var ruleProperties = new List<RuleProperties>();
await foreach (var rule in rules)
{
ruleProperties.Add(rule);
}
foreach (var rule in ruleProperties)
{
if (rule.Name == "GoalsGreaterThanSeven")
{
await _adminClient.DeleteRuleAsync(TOPIC_PATH, SUBSCRIPTION_NAME, "GoalsGreaterThanSeven")
.ConfigureAwait(false);
}
}
}
catch (Exception ex)
{
_logger.LogWarning(ex.ToString());
}
}
The AddFilters method adds the new filter, if it is not already added. The filter in this demo will use the goals user property from the message and only subscribe to messages with a value greater than 7.
private async Task AddFilters()
{
try
{
var rules = _adminClient.GetRulesAsync(TOPIC_PATH, SUBSCRIPTION_NAME)
.ConfigureAwait(false);
var ruleProperties = new List<RuleProperties>();
await foreach (var rule in rules)
{
ruleProperties.Add(rule);
}
if (!ruleProperties.Any(r => r.Name == "GoalsGreaterThanSeven"))
{
CreateRuleOptions createRuleOptions = new CreateRuleOptions
{
Name = "GoalsGreaterThanSeven",
Filter = new SqlRuleFilter("goals > 7")
};
await _adminClient.CreateRuleAsync(TOPIC_PATH, SUBSCRIPTION_NAME, createRuleOptions)
.ConfigureAwait(false);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex.ToString());
}
}
The filter methods are added to the PrepareFiltersAndHandleMessages method. This sets up the filters, or makes sure the filters are correct on the Azure Service Bus, and then registers itself to the topic subscription to receive the messages form its subscription.
public async Task PrepareFiltersAndHandleMessages()
{
ServiceBusProcessorOptions _serviceBusProcessorOptions = new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 1,
AutoCompleteMessages = false,
};
_processor = _client.CreateProcessor(TOPIC_PATH, SUBSCRIPTION_NAME, _serviceBusProcessorOptions);
_processor.ProcessMessageAsync += ProcessMessagesAsync;
_processor.ProcessErrorAsync += ProcessErrorAsync;
await RemoveDefaultFilters();
await AddFilters();
await _processor.StartProcessingAsync().ConfigureAwait(false);
}
The Azure Service Bus classes are added to the ASP.NET Core application in the Startup class. This adds the services to the IoC and initializes the message listener.
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
var connection = Configuration.GetConnectionString("DefaultConnection");
services.AddDbContext<PayloadContext>(options =>
options.UseSqlite(connection));
services.AddSingleton<IServiceBusConsumer, ServiceBusConsumer>();
services.AddSingleton<IServiceBusTopicSubscription, ServiceBusTopicSubscription>();
services.AddSingleton<IProcessData, ProcessData>();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo
{
Version = "v1",
Title = "Payload API",
});
});
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
// ....
var bus = app.ApplicationServices.GetService<IServiceBusConsumer>();
bus.RegisterOnMessageHandlerAndReceiveMessages().GetAwaiter().GetResult();
var busSubscription = app.ApplicationServices.GetService<IServiceBusTopicSubscription>();
busSubscription.PrepareFiltersAndHandleMessages().GetAwaiter().GetResult();
}
When the applications are started, the API2 only receives messages which have a goal value greater than seven.
Links:
https://docs.microsoft.com/en-us/azure/service-bus-messaging/
https://www.nuget.org/packages/Microsoft.Azure.ServiceBus
Always subscribe to Dead-lettered messages when using an Azure Service Bus