Using Azure Service Bus Queues with ASP.NET Core Services

This article shows how to implement two ASP.NET Core API applications to communicate with each other using Azure Service Bus. The ASP.NET Core APIs are implemented with Swagger support and uses an Azure Service Bus Queue to send data from one service to the other ASP.NET Core application.

Code: https://github.com/damienbod/AspNetCoreServiceBus

Posts in this series:

Setting up the Azure Service Bus Queue

Azure Service Bus is setup as described here:

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal

A queue or a topic can be used to implement the messaging. A queue is used as the messaging type in this example. Once the data has been received, it is removed from the queue.

Applications Overview

The applications are implemented as follows:

Implementing a Service Bus Queue

The Microsoft.Azure.ServiceBus Nuget package is used to implement the Azure Service Bus clients. The connection string for the service bus is saved in the user secrets of the projects. To run the example yourself, create your own Azure Service Bus, and set the secret for the projects. This can be easily done in Visual Studio by right clicking the project menu in the solution explorer. When deploying the application, use Azure Key Vault to set the secret. This would need to be implemented in the applications.

The SendMessage method takes a MyPayload type as a parameter, and adds this to the message as a Json payload.

using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using System.Text;
using System.Threading.Tasks;

namespace ServiceBusMessaging
{
    public class ServiceBusSender
    {
        private readonly QueueClient _queueClient;
        private readonly IConfiguration _configuration;
        private const string QUEUE_NAME = "simplequeue";

        public ServiceBusSender(IConfiguration configuration)
        {
            _configuration = configuration;
            _queueClient = new QueueClient(
              _configuration.GetConnectionString("ServiceBusConnectionString"), 
              QUEUE_NAME);
        }
        
        public async Task SendMessage(MyPayload payload)
        {
            string data = JsonConvert.SerializeObject(payload);
            Message message = new Message(Encoding.UTF8.GetBytes(data));

            await _queueClient.SendAsync(message);
        }
    }
}

The ServiceBusSender is registered to the IoC of the ASP.NET Core application in the Startup class, ConfigureServices method. Swagger is also added here.

public void ConfigureServices(IServiceCollection services)
{
	services.AddMvc()
		.AddNewtonsoftJson();

	services.AddScoped<ServiceBusSender>();

	services.AddSwaggerGen(c =>
	{
		c.SwaggerDoc("v1", new OpenApiInfo
		{
			Version = "v1",
			Title = "Payload View API",
		});
	});
}

This service can then be used in the Controller which provides the API.

[HttpPost]
[ProducesResponseType(typeof(Payload), StatusCodes.Status200OK)]
[ProducesResponseType(typeof(Payload), StatusCodes.Status409Conflict)]
public async Task<IActionResult> Create([FromBody][Required] Payload request)
{
	if (data.Any(d => d.Id == request.Id))
	{
		return Conflict($"data with id {request.Id} already exists");
	}

	data.Add(request);

	// Send this to the bus for the other services
	await _serviceBusSender.SendMessage(new MyPayload
	{
		Goals = request.Goals,
		Name = request.Name,
		Delete = false
	});

	return Ok(request);
}

Consuming messaging from the Queue

The ServiceBusConsumer implements the IServiceBusConsumer interface. This is used to receive the messages from Azure Service Bus. The Connection String from the Queue is read from the application IConfiguration interface. The RegisterOnMessageHandlerAndReceiveMessages method adds the event handler for the messages, and uses the ProcessMessagesAsync method to process these. The ProcessMessagesAsync method converts the message to an object and calls the IProcessData interface to complete the processing of the message.

using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ServiceBusMessaging
{
    public interface IServiceBusConsumer
    {
        void RegisterOnMessageHandlerAndReceiveMessages();
        Task CloseQueueAsync();
    }

    public class ServiceBusConsumer : IServiceBusConsumer
    {
        private readonly IProcessData _processData;
        private readonly IConfiguration _configuration;
        private readonly QueueClient _queueClient;
        private const string QUEUE_NAME = "simplequeue";
        private readonly ILogger _logger;

        public ServiceBusConsumer(IProcessData processData, 
            IConfiguration configuration, 
            ILogger<ServiceBusConsumer> logger)
        {
            _processData = processData;
            _configuration = configuration;
            _logger = logger;
            _queueClient = new QueueClient(
              _configuration.GetConnectionString("ServiceBusConnectionString"), QUEUE_NAME);
        }

        public void RegisterOnMessageHandlerAndReceiveMessages()
        {
            var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
            {
                MaxConcurrentCalls = 1,
                AutoComplete = false
            };

            _queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
        }

        private async Task ProcessMessagesAsync(Message message, CancellationToken token)
        {
            var myPayload = JsonConvert.DeserializeObject<MyPayload>(Encoding.UTF8.GetString(message.Body));
            _processData.Process(myPayload);
            await _queueClient.CompleteAsync(message.SystemProperties.LockToken);
        }

        private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
        {
            _logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");
            var context = exceptionReceivedEventArgs.ExceptionReceivedContext;

            _logger.LogDebug($"- Endpoint: {context.Endpoint}");
            _logger.LogDebug($"- Entity Path: {context.EntityPath}");
            _logger.LogDebug($"- Executing Action: {context.Action}");

            return Task.CompletedTask;
        }

        public async Task CloseQueueAsync()
        {
            await _queueClient.CloseAsync();
        }
    }
}

The Startup class configures the application and adds the support for Azure Service Bus, Swagger and the ASP.NET Core application.

public void ConfigureServices(IServiceCollection services)
{
	services.AddMvc()
		.AddNewtonsoftJson();

	services.AddSingleton<IServiceBusConsumer, ServiceBusConsumer>();
	services.AddTransient<IProcessData, ProcessData>();

	services.AddSwaggerGen(c =>
	{
		c.SwaggerDoc("v1", new OpenApiInfo
		{
			Version = "v1",
			Title = "Payload API",
		});
	});
}

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{

	app.UseStaticFiles();
	app.UseHttpsRedirection();

	app.UseRouting();

	app.UseAuthorization();
	app.UseCors();

	app.UseEndpoints(endpoints =>
	{
	    endpoints.MapControllers();
	});

	// Enable middleware to serve generated Swagger as a JSON endpoint.
	app.UseSwagger();
	app.UseSwaggerUI(c =>
	{
	    c.SwaggerEndpoint("/swagger/v1/swagger.json", 
	       "Payload Management API V1");
	});

	var bus = app.ApplicationServices.GetService<IServiceBusConsumer>();
	bus.RegisterOnMessageHandlerAndReceiveMessages();
}

The IProcessData interface is added to the shared library. This is used to process the incoming messages in the ServiceBusConsumer service.

public interface IProcessData
{
	void Process(MyPayload myPayload);
}

The ProcessData implements the IProcessData and is added to the application which receives the messages. The hosting application can the do whatever is required with the messages.

using AspNetCoreServiceBusApi2.Model;
using ServiceBusMessaging;

namespace AspNetCoreServiceBusApi2
{
    public class ProcessData : IProcessData
    {
        public void Process(MyPayload myPayload)
        {
            DataServiceSimi.Data.Add(new Payload
            {
                Name = myPayload.Name,
                Goals = myPayload.Goals
            });
        }
    }
}

When the applications are started, a POST request can be sent using the swagger UI from the first App.

And the message is then processed in the API 2.

Links:

https://docs.microsoft.com/en-us/azure/service-bus-messaging/

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues

https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/multi-container-microservice-net-applications/integration-event-based-microservice-communications

https://www.nuget.org/packages/Microsoft.Azure.ServiceBus

Always subscribe to Dead-lettered messages when using an Azure Service Bus

Auto Forwarding, a hidden gem of Service Bus

13 comments

  1. […] Using Azure Service Bus Queues with ASP.NET Core Services (Damien Bowden) […]

  2. Great blog post as always! I see you that you injected the configuration to get the ServiceBusConnectionString and I wonder to know where is the best place in the code to declare it.

    Cheers,
    Maher

    1. Thanks, I added this as a configuration which is read from my user secrets.

      Like this:

      {
      “ConnectionStrings”: {
      “ServiceBusConnectionString”: “Endpoint=…”
      }
      }

      1. You get to the point. I am using VS2019 and I can not manage secrets on a .NET Standard project.

      2. Ah 🙂 I added the IConfiguration object to the ctor of my service in the .NET Standard lib. The hosting app can use secrets and I set it in the host project. Then it can be used in the lib

        Greetings Damien

  3. Tamás Bilik · · Reply

    IServiceBusConsumer is registered as Singleton, but its dependency, the IProcessData as Transient. Shouldn’t the IProcessData be Singleton as well? ServiceBusConsumer will use the same single instance it got in its constructor for all messages, right? What’s the reason for having it as Transient?

  4. […] Using Azure Service Bus Queues with ASP.NET Core Services – Damien Bowden […]

  5. Matthewsre · · Reply

    I see the following specified in the interface of IServiceBusConsumer:

    Task CloseQueueAsync();

    This is never used in the example. When starting up the listener in the startup does that need to be factored in anywhere?

    Would you also happen to know if QueueClient started having intermittent issues, would it still safely recover listening or in the example would you have to restart the app? (I’ve been burned by RabbitMq library in the past needing to build in custom recovery)

    I have a few components that must be run on-prem so I can’t leverage Azure Functions. I am currently using HangFire for some of these scenarios, but would like to switch to this approach and setup a few listeners to pickup messages from the other Azure Functions we have.

    Thanks for sharing! I like this approach and haven’t seen it elsewhere yet.

    1. Matthewsre · · Reply

      Looks like this will handle IsTransient failures and you can specify the RetryPolicy:
      https://docs.microsoft.com/en-us/azure/architecture/best-practices/retry-service-specific#service-bus

      Still curious if about the CloseQueueAsync question and if you see any red flags of using this for my use case.

  6. Srinivas Mummareddy · · Reply

    After this particular line
    _queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions); will it call the ProcessMessagesAsync method? Correct me If I’m wrong as Its not calling process method and I’m not getting any exceptions as well. Could you please help me in this.

    Thanks for the article

    1. Kauê Bonilha · · Reply

      I had the same issue as Srinivas Mummareddy. Any solution to this?

      Thanks for sharing

      1. thanks for the comments, will fix this now

  7. Jose Sus · · Reply

    Thanks, you do a nice article, in the project of git, i see a web job function, my question is When do you use it?

Leave a Reply to The Morning Brew - Chris Alcock » The Morning Brew #2731 Cancel reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: