This article shows how to implement two ASP.NET Core API applications to communicate with each other using Azure Service Bus Topics. This post continues on from the last article, this time using topics and subscriptions to communicate instead of a queue. By using a topic with subscriptions, and message can be sent to n receivers.
Code: https://github.com/damienbod/AspNetCoreServiceBus
Posts in this series:
- Using Azure Service Bus Queues with ASP.NET Core Services
- Using Azure Service Bus Topics in ASP.NET Core
- Using Azure Service Bus Topics Subscription Filters in ASP.NET Core
- Using Entity Framework Core to process Azure Service Messages in ASP.NET Core
- Using an Azure Service Bus Topic Subscription in an Azure Function
- Using Azure Service Bus with restricted access
History
2021-05-18 Updated .NET, Azure Service Bus SDK
Setting up the Azure Service Bus Topics
The Azure Service Bus Topic and the Topic Subscription need to be setup in Azure, either using the portal or scripts.
You need to create a Topic in the Azure Service Bus:

In the new Topic, add a Topic Subscription:

ASP.NET Core applications
The applications are setup like in the first post in this series. This time the message bus uses a topic and a subscription to send the messages.

Implementing the Azure Service Bus Topic sender
The messages are sent using the ServiceBusTopicSender class. This class uses the Azure Service Bus connection string and a topic path which matches what was configured in the Azure portal. A new TopicClient is created, and this can then be used to send messages to the topic.
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using System;
using System.Text.Json;
using System.Threading.Tasks;
namespace ServiceBusMessaging
{
public class ServiceBusTopicSender
{
private const string TOPIC_PATH = "mytopic";
private readonly ILogger _logger;
private readonly ServiceBusClient _client;
private readonly Azure.Messaging.ServiceBus.ServiceBusSender _clientSender;
public ServiceBusTopicSender(IConfiguration configuration,
ILogger<ServiceBusTopicSender> logger)
{
_logger = logger;
var connectionString = configuration.GetConnectionString("ServiceBusConnectionString");
_client = new ServiceBusClient(connectionString);
_clientSender = _client.CreateSender(TOPIC_PATH);
}
public async Task SendMessage(MyPayload payload)
{
string messagePayload = JsonSerializer.Serialize(payload);
ServiceBusMessage message = new ServiceBusMessage(messagePayload);
message.ApplicationProperties.Add("goals", payload.Goals);
try
{
await _clientSender.SendMessageAsync(message).ConfigureAwait(false);
}
catch (Exception e)
{
_logger.LogError(e.Message);
}
}
}
}
The ServiceBusTopicSender class is added as a service in the Startup class.
services.AddScoped<ServiceBusTopicSender>();
This service can then be used in the API to send messages to the bus, when other services need the data from the API call.
[HttpPost]
[ProducesResponseType(typeof(Payload), StatusCodes.Status200OK)]
[ProducesResponseType(typeof(Payload), StatusCodes.Status409Conflict)]
public async Task<IActionResult> Create([FromBody][Required] Payload request)
{
if (data.Any(d => d.Id == request.Id))
{
return Conflict($"data with id {request.Id} already exists");
}
data.Add(request);
// Send this to the bus for the other services
await _serviceBusTopicSender.SendMessage(new MyPayload
{
Goals = request.Goals,
Name = request.Name,
Delete = false
});
return Ok(request);
}
Implementing an Azure Service Bus Topic Subscription
The ServiceBusTopicSubscription class implements the topic subscription. The SubscriptionClient is created using the Azure Service Bus connection string, the topic path and the subscription name. These values are the values which have been configured in Azure. The RegisterOnMessageHandlerAndReceiveMessages method is used to receive the events and send the messages on for processing in the IProcessData implementation.
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace ServiceBusMessaging
{
public interface IServiceBusTopicSubscription
{
Task PrepareFiltersAndHandleMessages();
Task CloseQueueAsync();
ValueTask DisposeAsync();
}
public class ServiceBusTopicSubscription : IServiceBusTopicSubscription
{
private readonly IProcessData _processData;
private readonly IConfiguration _configuration;
private const string TOPIC_PATH = "mytopic";
private const string SUBSCRIPTION_NAME = "mytopicsubscription";
private readonly ILogger _logger;
private readonly ServiceBusClient _client;
private readonly ServiceBusAdministrationClient _adminClient;
private ServiceBusProcessor _processor;
public ServiceBusTopicSubscription(IProcessData processData,
IConfiguration configuration,
ILogger<ServiceBusTopicSubscription> logger)
{
_processData = processData;
_configuration = configuration;
_logger = logger;
var connectionString = _configuration.GetConnectionString("ServiceBusConnectionString");
_client = new ServiceBusClient(connectionString);
_adminClient = new ServiceBusAdministrationClient(connectionString);
}
public async Task PrepareFiltersAndHandleMessages()
{
ServiceBusProcessorOptions _serviceBusProcessorOptions = new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 1,
AutoCompleteMessages = false,
};
_processor = _client.CreateProcessor(TOPIC_PATH, SUBSCRIPTION_NAME, _serviceBusProcessorOptions);
_processor.ProcessMessageAsync += ProcessMessagesAsync;
_processor.ProcessErrorAsync += ProcessErrorAsync;
await RemoveDefaultFilters();
await AddFilters();
await _processor.StartProcessingAsync().ConfigureAwait(false);
}
private async Task RemoveDefaultFilters()
{
try
{
var rules = _adminClient.GetRulesAsync(TOPIC_PATH, SUBSCRIPTION_NAME);
var ruleProperties = new List<RuleProperties>();
await foreach (var rule in rules)
{
ruleProperties.Add(rule);
}
foreach (var rule in ruleProperties)
{
if (rule.Name == "GoalsGreaterThanSeven")
{
await _adminClient.DeleteRuleAsync(TOPIC_PATH, SUBSCRIPTION_NAME, "GoalsGreaterThanSeven")
.ConfigureAwait(false);
}
}
}
catch (Exception ex)
{
_logger.LogWarning(ex.ToString());
}
}
private async Task AddFilters()
{
try
{
var rules = _adminClient.GetRulesAsync(TOPIC_PATH, SUBSCRIPTION_NAME)
.ConfigureAwait(false);
var ruleProperties = new List<RuleProperties>();
await foreach (var rule in rules)
{
ruleProperties.Add(rule);
}
if (!ruleProperties.Any(r => r.Name == "GoalsGreaterThanSeven"))
{
CreateRuleOptions createRuleOptions = new CreateRuleOptions
{
Name = "GoalsGreaterThanSeven",
Filter = new SqlRuleFilter("goals > 7")
};
await _adminClient.CreateRuleAsync(TOPIC_PATH, SUBSCRIPTION_NAME, createRuleOptions)
.ConfigureAwait(false);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex.ToString());
}
}
private async Task ProcessMessagesAsync(ProcessMessageEventArgs args)
{
var myPayload = args.Message.Body.ToObjectFromJson<MyPayload>();
await _processData.Process(myPayload).ConfigureAwait(false);
await args.CompleteMessageAsync(args.Message).ConfigureAwait(false);
}
private Task ProcessErrorAsync(ProcessErrorEventArgs arg)
{
_logger.LogError(arg.Exception, "Message handler encountered an exception");
_logger.LogDebug($"- ErrorSource: {arg.ErrorSource}");
_logger.LogDebug($"- Entity Path: {arg.EntityPath}");
_logger.LogDebug($"- FullyQualifiedNamespace: {arg.FullyQualifiedNamespace}");
return Task.CompletedTask;
}
public async ValueTask DisposeAsync()
{
if (_processor != null)
{
await _processor.DisposeAsync().ConfigureAwait(false);
}
if (_client != null)
{
await _client.DisposeAsync().ConfigureAwait(false);
}
}
public async Task CloseQueueAsync()
{
await _processor.CloseAsync().ConfigureAwait(false);
}
}
}
The IServiceBusTopicSubscription and the IProcessData, plus the implementations are added to the IoC of the ASP.NET Core application.
services.AddSingleton<IServiceBusTopicSubscription, ServiceBusTopicSubscription>(); services.AddTransient<IProcessData, ProcessData>();
The RegisterOnMessageHandlerAndReceiveMessages is called in the Configure Startup method, so that the application starts to listen for messages.
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
//. ...
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
app.UseSwagger();
app.UseSwaggerUI(c =>
{
c.SwaggerEndpoint("/swagger/v1/swagger.json", "Payload Management API V1");
});
var bus = app.ApplicationServices.GetService<IServiceBusConsumer>();
bus.RegisterOnMessageHandlerAndReceiveMessages().GetAwaiter().GetResult();
var busSubscription = app.ApplicationServices.GetService<IServiceBusTopicSubscription>();
busSubscription.PrepareFiltersAndHandleMessages().GetAwaiter().GetResult();
}
The ProcessData service processes the incoming topic messages for the defined subscription, and adds them to an in-memory list in this demo, which can be viewed using the Swagger API.
public async Task Process(MyPayload myPayload)
{
using (var payloadMessageContext =
new PayloadMessageContext(
_configuration.GetConnectionString("DefaultConnection")))
{
await payloadMessageContext.AddAsync(new Payload
{
Name = myPayload.Name,
Goals = myPayload.Goals,
Created = DateTime.UtcNow
});
await payloadMessageContext.SaveChangesAsync();
}
}
If only the ASP.NET Core application which sends messages is started, and a POST is called to for the topic API, a message will be sent to the Azure Service Bus topic. This can then be viewed in the portal.

If the API from the application which receives the topic subscriptions is started, the message will be sent and removed from the topic subscription.

Links:
https://docs.microsoft.com/en-us/azure/service-bus-messaging/
https://www.nuget.org/packages/Microsoft.Azure.ServiceBus
Always subscribe to Dead-lettered messages when using an Azure Service Bus
I have Partitioning and RequireDuplicateDetection turned on for my Topic and when I attempted to execute your code, I recieve this message in return when the SendAsync is called: “Message to a partitioned entity with duplicate detection enabled must have either PartitionKey or MessageId set”
So I had to add “message.MessageId = Guid.NewGuid().ToString();” right after the “var message = new Message(Encoding.UTF8.GetBytes(data));” line. It then worked for me.
You can also use this code:
var message = new Message(Encoding.UTF8.GetBytes(data))
{
MessageId = Guid.NewGuid().ToString()
};
thanks for the info, will add this
Great article. Quick question here: I was looking at your full implementation and I didn’t clearly understand why you need both the Payload and Topic, it seems to me that they are doing the exact same thing. Any comments on that?
[…] I am using this article to setup subscribers https://damienbod.com/2019/04/24/using-azure-service-bus-topics-in-asp-net-core/ […]
The SqlRuleFilter(“goals > 7”) never works. No matter goals > 7 or == 7 or < 7, they will all be accepted and stored to the Sqlite database. Why?
Will have a look at this, was working when I tested this last.
Greetings Damien
I have a quick question, what if we have different objects i only see MyPayload, how can i use different objects.
I mean if i need differentiate between two objects and the subscription deal differently with both of them