Using Azure Service Bus Topics in ASP.NET Core

This article shows how to implement two ASP.NET Core API applications to communicate with each other using Azure Service Bus Topics. This post continues on from the last article, this time using topics and subscriptions to communicate instead of a queue. By using a topic with subscriptions, and message can be sent to n receivers.

Code: https://github.com/damienbod/AspNetCoreServiceBus

Posts in this series:

Setting up the Azure Service Bus Topics

The Azure Service Bus Topic and the Topic Subscription need to be setup in Azure, either using the portal or scripts.

You need to create a Topic in the Azure Service Bus:

In the new Topic, add a Topic Subscription:

ASP.NET Core applications

The applications are setup like in the first post in this series. This time the message bus uses a topic and a subscription to send the messages.

Implementing the Azure Service Bus Topic sender

The messages are sent using the ServiceBusTopicSender class. This class uses the Azure Service Bus connection string and a topic path which matches what was configured in the Azure portal. A new TopicClient is created, and this can then be used to send messages to the topic.

using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;
using System.Text;
using System.Threading.Tasks;

namespace ServiceBusMessaging
{
    public class ServiceBusTopicSender
    {
        private readonly TopicClient _topicClient;
        private readonly IConfiguration _configuration;
        private const string TOPIC_PATH = "mytopic";
        private readonly ILogger _logger;

        public ServiceBusTopicSender(IConfiguration configuration, 
            ILogger<ServiceBusTopicSender> logger)
        {
            _configuration = configuration;
            _logger = logger;
            _topicClient = new TopicClient(
                _configuration.GetConnectionString("ServiceBusConnectionString"),
                TOPIC_PATH
            );
        }
        
        public async Task SendMessage(MyPayload payload)
        {
            string data = JsonConvert.SerializeObject(payload);
            Message message = new Message(Encoding.UTF8.GetBytes(data));

            try
            {
                await _topicClient.SendAsync(message);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
            }
        }
    }
}

The ServiceBusTopicSender class is added as a service in the Startup class.

services.AddScoped<ServiceBusTopicSender>();

This service can then be used in the API to send messages to the bus, when other services need the data from the API call.

[HttpPost]
[ProducesResponseType(typeof(Payload), StatusCodes.Status200OK)]
[ProducesResponseType(typeof(Payload), StatusCodes.Status409Conflict)]
public async Task<IActionResult> Create([FromBody][Required] Payload request)
{
	if (data.Any(d => d.Id == request.Id))
	{
		return Conflict($"data with id {request.Id} already exists");
	}

	data.Add(request);

	// Send this to the bus for the other services
	await _serviceBusTopicSender.SendMessage(new MyPayload
	{
		Goals = request.Goals,
		Name = request.Name,
		Delete = false
	});

	return Ok(request);
}

Implementing an Azure Service Bus Topic Subscription

The ServiceBusTopicSubscription class implements the topic subscription. The SubscriptionClient is created using the Azure Service Bus connection string, the topic path and the subscription name. These values are the values which have been configured in Azure. The RegisterOnMessageHandlerAndReceiveMessages method is used to receive the events and send the messages on for processing in the IProcessData implementation.

using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ServiceBusMessaging
{
    public interface IServiceBusTopicSubscription
    {
        void RegisterOnMessageHandlerAndReceiveMessages();
        Task CloseSubscriptionClientAsync();
    }

    public class ServiceBusTopicSubscription : IServiceBusTopicSubscription
    {
        private readonly IProcessData _processData;
        private readonly IConfiguration _configuration;
        private readonly SubscriptionClient _subscriptionClient;
        private const string TOPIC_PATH = "mytopic";
        private const string SUBSCRIPTION_NAME = "mytopicsubscription";
        private readonly ILogger _logger;

        public ServiceBusTopicSubscription(IProcessData processData, 
            IConfiguration configuration, 
            ILogger<ServiceBusTopicSubscription> logger)
        {
            _processData = processData;
            _configuration = configuration;
            _logger = logger;

            _subscriptionClient = new SubscriptionClient(
                _configuration.GetConnectionString("ServiceBusConnectionString"), 
                TOPIC_PATH, 
                SUBSCRIPTION_NAME);
        }

        public void RegisterOnMessageHandlerAndReceiveMessages()
        {
            var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
            {
                MaxConcurrentCalls = 1,
                AutoComplete = false
            };

            _subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
        }

        private async Task ProcessMessagesAsync(Message message, CancellationToken token)
        {
            var myPayload = JsonConvert.DeserializeObject<MyPayload>(Encoding.UTF8.GetString(message.Body));
            _processData.Process(myPayload);
            await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
        }

        private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
        {
            _logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");
            var context = exceptionReceivedEventArgs.ExceptionReceivedContext;

            _logger.LogDebug($"- Endpoint: {context.Endpoint}");
            _logger.LogDebug($"- Entity Path: {context.EntityPath}");
            _logger.LogDebug($"- Executing Action: {context.Action}");

            return Task.CompletedTask;
        }

        public async Task CloseSubscriptionClientAsync()
        {
            await _subscriptionClient.CloseAsync();
        }
    }
}

The IServiceBusTopicSubscription and the IProcessData, plus the implementations are added to the IoC of the ASP.NET Core application.

services.AddSingleton<IServiceBusTopicSubscription, ServiceBusTopicSubscription>();
services.AddTransient<IProcessData, ProcessData>();

The RegisterOnMessageHandlerAndReceiveMessages is called in the Configure Startup method, so that the application starts to listen for messages.

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
	...

	var busSubscription = 
		app.ApplicationServices.GetService<IServiceBusTopicSubscription>();
	busSubscription.RegisterOnMessageHandlerAndReceiveMessages();
}

The ProcessData service processes the incoming topic messages for the defined subscription, and adds them to an in-memory list in this demo, which can be viewed using the Swagger API.

using AspNetCoreServiceBusApi2.Model;
using ServiceBusMessaging;

namespace AspNetCoreServiceBusApi2
{
    public class ProcessData : IProcessData
    {
        public void Process(MyPayload myPayload)
        {
            DataServiceSimi.Data.Add(new Payload
            {
                Name = myPayload.Name,
                Goals = myPayload.Goals
            });
        }
    }
}

If only the ASP.NET Core application which sends messages is started, and a POST is called to for the topic API, a message will be sent to the Azure Service Bus topic. This can then be viewed in the portal.

If the API from the application which receives the topic subscriptions is started, the message will be sent and removed from the topic subscription.

Links:

https://docs.microsoft.com/en-us/azure/service-bus-messaging/

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues

https://www.nuget.org/packages/Microsoft.Azure.ServiceBus

Azure Service Bus Topologies

https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/multi-container-microservice-net-applications/integration-event-based-microservice-communications

Always subscribe to Dead-lettered messages when using an Azure Service Bus

Auto Forwarding, a hidden gem of Service Bus

3 comments

  1. I have Partitioning and RequireDuplicateDetection turned on for my Topic and when I attempted to execute your code, I recieve this message in return when the SendAsync is called: “Message to a partitioned entity with duplicate detection enabled must have either PartitionKey or MessageId set”

    So I had to add “message.MessageId = Guid.NewGuid().ToString();” right after the “var message = new Message(Encoding.UTF8.GetBytes(data));” line. It then worked for me.

    1. You can also use this code:

      var message = new Message(Encoding.UTF8.GetBytes(data))
      {
      MessageId = Guid.NewGuid().ToString()
      };

    2. thanks for the info, will add this

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: