Using Azure Service Bus Queues with ASP.NET Core Services

This article shows how to implement two ASP.NET Core API applications to communicate with each other using Azure Service Bus. The ASP.NET Core APIs are implemented with Swagger support and uses an Azure Service Bus Queue to send data from one service to the other ASP.NET Core application.

Code: https://github.com/damienbod/AspNetCoreServiceBus

Posts in this series:

History

2021-05-18 Updated .NET, Azure Service Bus SDK

Setting up the Azure Service Bus Queue

Azure Service Bus is setup as described here:

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal

A queue or a topic can be used to implement the messaging. A queue is used as the messaging type in this example. Once the data has been received, it is removed from the queue.

Applications Overview

The applications are implemented as follows:

Implementing a Service Bus Queue

The Microsoft.Azure.ServiceBus Nuget package is used to implement the Azure Service Bus clients. The connection string for the service bus is saved in the user secrets of the projects. To run the example yourself, create your own Azure Service Bus, and set the secret for the projects. This can be easily done in Visual Studio by right clicking the project menu in the solution explorer. When deploying the application, use Azure Key Vault to set the secret. This would need to be implemented in the applications.

The SendMessage method takes a MyPayload type as a parameter, and adds this to the message as a Json payload.

using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Configuration;
using System.Text.Json;
using System.Threading.Tasks;

namespace ServiceBusMessaging
{
    public class ServiceBusSender
    {
        private readonly ServiceBusClient _client;
        private readonly Azure.Messaging.ServiceBus.ServiceBusSender _clientSender;
        private const string QUEUE_NAME = "simplequeue";

        public ServiceBusSender(IConfiguration configuration)
        {
            var connectionString = configuration.GetConnectionString("ServiceBusConnectionString");
            _client = new ServiceBusClient(connectionString);
            _clientSender = _client.CreateSender(QUEUE_NAME);
        }
        
        public async Task SendMessage(MyPayload payload)
        {
            string messagePayload = JsonSerializer.Serialize(payload);
            ServiceBusMessage message = new ServiceBusMessage(messagePayload);
            await _clientSender.SendMessageAsync(message).ConfigureAwait(false);
        }
    }
}

The ServiceBusSender is registered to the IoC of the ASP.NET Core application in the Startup class, ConfigureServices method. Swagger is also added here.

public void ConfigureServices(IServiceCollection services)
{
	services.AddControllers();

	services.AddScoped<ServiceBusSender>();
	services.AddScoped<ServiceBusTopicSender>();

	services.AddSwaggerGen(c =>
	{
		c.SwaggerDoc("v1", new OpenApiInfo
		{
			Version = "v1",
			Title = "Payload View API",
		});
	});
}

This service can then be used in the Controller which provides the API.

[HttpPost]
[ProducesResponseType(typeof(Payload), StatusCodes.Status200OK)]
[ProducesResponseType(typeof(Payload), StatusCodes.Status409Conflict)]
public async Task<IActionResult> Create([FromBody][Required] Payload request)
{
	if (data.Any(d => d.Id == request.Id))
	{
		return Conflict($"data with id {request.Id} already exists");
	}

	data.Add(request);

	// Send this to the bus for the other services
	await _serviceBusSender.SendMessage(new MyPayload
	{
		Goals = request.Goals,
		Name = request.Name,
		Delete = false
	});

	return Ok(request);
}

Consuming messaging from the Queue

The ServiceBusConsumer implements the IServiceBusConsumer interface. This is used to receive the messages from Azure Service Bus. The Connection String from the Queue is read from the application IConfiguration interface. The RegisterOnMessageHandlerAndReceiveMessages method adds the event handler for the messages, and uses the ProcessMessagesAsync method to process these. The ProcessMessagesAsync method converts the message to an object and calls the IProcessData interface to complete the processing of the message.

using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks;

namespace ServiceBusMessaging
{
    public interface IServiceBusConsumer
    {
        Task RegisterOnMessageHandlerAndReceiveMessages();
        Task CloseQueueAsync();
        ValueTask DisposeAsync();
    }

    public class ServiceBusConsumer : IServiceBusConsumer
    {
        private readonly IProcessData _processData;
        private readonly IConfiguration _configuration;
        private readonly ServiceBusClient _client;
        private const string QUEUE_NAME = "simplequeue";
        private readonly ILogger _logger;
        private ServiceBusProcessor _processor;

        public ServiceBusConsumer(IProcessData processData, 
            IConfiguration configuration, 
            ILogger<ServiceBusConsumer> logger)
        {
            _processData = processData;
            _configuration = configuration;
            _logger = logger;

            var connectionString = _configuration.GetConnectionString("ServiceBusConnectionString");
            _client = new ServiceBusClient(connectionString);
        }

        public async Task RegisterOnMessageHandlerAndReceiveMessages()
        {
            ServiceBusProcessorOptions _serviceBusProcessorOptions = new ServiceBusProcessorOptions
            {
                MaxConcurrentCalls = 1,
                AutoCompleteMessages = false,
            };

            _processor = _client.CreateProcessor(QUEUE_NAME, _serviceBusProcessorOptions);
            _processor.ProcessMessageAsync += ProcessMessagesAsync;
            _processor.ProcessErrorAsync += ProcessErrorAsync;
            await _processor.StartProcessingAsync().ConfigureAwait(false);
        }

        private Task ProcessErrorAsync(ProcessErrorEventArgs arg)
        {
            _logger.LogError(arg.Exception, "Message handler encountered an exception");
            _logger.LogDebug($"- ErrorSource: {arg.ErrorSource}");
            _logger.LogDebug($"- Entity Path: {arg.EntityPath}");
            _logger.LogDebug($"- FullyQualifiedNamespace: {arg.FullyQualifiedNamespace}");

            return Task.CompletedTask;
        }

        private async Task ProcessMessagesAsync(ProcessMessageEventArgs args)
        {
            var myPayload = args.Message.Body.ToObjectFromJson<MyPayload>();
            await _processData.Process(myPayload).ConfigureAwait(false);
            await args.CompleteMessageAsync(args.Message).ConfigureAwait(false);
        }

        public async ValueTask DisposeAsync()
        {
            if (_processor != null)
            {
                await _processor.DisposeAsync().ConfigureAwait(false);
            }

            if (_client != null)
            {
                await _client.DisposeAsync().ConfigureAwait(false);
            }
        }

        public async Task CloseQueueAsync()
        {
            await _processor.CloseAsync().ConfigureAwait(false);
        }
    }
}

The Startup class configures the application and adds the support for Azure Service Bus, Swagger and the ASP.NET Core application.

public void ConfigureServices(IServiceCollection services)
{
	services.AddControllers();

	var connection = Configuration.GetConnectionString("DefaultConnection");

	services.AddDbContext<PayloadContext>(options =>
		options.UseSqlite(connection));

	services.AddSingleton<IServiceBusConsumer, ServiceBusConsumer>();
	services.AddSingleton<IServiceBusTopicSubscription, ServiceBusTopicSubscription>();
	services.AddSingleton<IProcessData, ProcessData>();

	services.AddSwaggerGen(c =>
	{
		c.SwaggerDoc("v1", new OpenApiInfo
		{
			Version = "v1",
			Title = "Payload API",
		});
	});
}

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
	if (env.IsDevelopment())
	{
		app.UseDeveloperExceptionPage();
	}
	else
	{
		app.UseHsts();
	}

	app.UseStaticFiles();
	app.UseHttpsRedirection();

	app.UseRouting();

	app.UseAuthorization();
	app.UseCors();

	app.UseEndpoints(endpoints =>
	{
		endpoints.MapControllers();
	});

	app.UseSwagger();
	app.UseSwaggerUI(c =>
	{
		c.SwaggerEndpoint("/swagger/v1/swagger.json", "Payload Management API V1");
	});

	var bus = app.ApplicationServices.GetService<IServiceBusConsumer>();
	bus.RegisterOnMessageHandlerAndReceiveMessages().GetAwaiter().GetResult();

	var busSubscription = app.ApplicationServices.GetService<IServiceBusTopicSubscription>();
	busSubscription.PrepareFiltersAndHandleMessages().GetAwaiter().GetResult();
}

The IProcessData interface is added to the shared library. This is used to process the incoming messages in the ServiceBusConsumer service.

public interface IProcessData
{
	Task Process(MyPayload myPayload);
}

The ProcessData implements the IProcessData and is added to the application which receives the messages. The hosting application can the do whatever is required with the messages.

using AspNetCoreServiceBusApi2.Model;
using Microsoft.Extensions.Configuration;
using ServiceBusMessaging;
using System;
using System.Threading.Tasks;

namespace AspNetCoreServiceBusApi2
{
    public class ProcessData : IProcessData
    {
        private IConfiguration _configuration;

        public ProcessData(IConfiguration configuration)
        {
            _configuration = configuration;
        }
        public async Task Process(MyPayload myPayload)
        {
            using (var payloadMessageContext = 
                new PayloadMessageContext(
                    _configuration.GetConnectionString("DefaultConnection")))
            {
                await payloadMessageContext.AddAsync(new Payload
                {
                    Name = myPayload.Name,
                    Goals = myPayload.Goals,
                    Created = DateTime.UtcNow
                });

                await payloadMessageContext.SaveChangesAsync();
            }
        }
    }
}

When the applications are started, a POST request can be sent using the swagger UI from the first App.

And the message is then processed in the API 2.

Links:

https://docs.microsoft.com/en-us/azure/service-bus-messaging/

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues

https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/multi-container-microservice-net-applications/integration-event-based-microservice-communications

https://www.nuget.org/packages/Microsoft.Azure.ServiceBus

Always subscribe to Dead-lettered messages when using an Azure Service Bus

21 comments

  1. […] Using Azure Service Bus Queues with ASP.NET Core Services (Damien Bowden) […]

  2. Great blog post as always! I see you that you injected the configuration to get the ServiceBusConnectionString and I wonder to know where is the best place in the code to declare it.

    Cheers,
    Maher

    1. Thanks, I added this as a configuration which is read from my user secrets.

      Like this:

      {
      “ConnectionStrings”: {
      “ServiceBusConnectionString”: “Endpoint=…”
      }
      }

      1. You get to the point. I am using VS2019 and I can not manage secrets on a .NET Standard project.

      2. Ah 🙂 I added the IConfiguration object to the ctor of my service in the .NET Standard lib. The hosting app can use secrets and I set it in the host project. Then it can be used in the lib

        Greetings Damien

  3. Tamás Bilik · · Reply

    IServiceBusConsumer is registered as Singleton, but its dependency, the IProcessData as Transient. Shouldn’t the IProcessData be Singleton as well? ServiceBusConsumer will use the same single instance it got in its constructor for all messages, right? What’s the reason for having it as Transient?

  4. […] Using Azure Service Bus Queues with ASP.NET Core Services – Damien Bowden […]

  5. Matthewsre · · Reply

    I see the following specified in the interface of IServiceBusConsumer:

    Task CloseQueueAsync();

    This is never used in the example. When starting up the listener in the startup does that need to be factored in anywhere?

    Would you also happen to know if QueueClient started having intermittent issues, would it still safely recover listening or in the example would you have to restart the app? (I’ve been burned by RabbitMq library in the past needing to build in custom recovery)

    I have a few components that must be run on-prem so I can’t leverage Azure Functions. I am currently using HangFire for some of these scenarios, but would like to switch to this approach and setup a few listeners to pickup messages from the other Azure Functions we have.

    Thanks for sharing! I like this approach and haven’t seen it elsewhere yet.

    1. Matthewsre · · Reply

      Looks like this will handle IsTransient failures and you can specify the RetryPolicy:
      https://docs.microsoft.com/en-us/azure/architecture/best-practices/retry-service-specific#service-bus

      Still curious if about the CloseQueueAsync question and if you see any red flags of using this for my use case.

  6. Srinivas Mummareddy · · Reply

    After this particular line
    _queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions); will it call the ProcessMessagesAsync method? Correct me If I’m wrong as Its not calling process method and I’m not getting any exceptions as well. Could you please help me in this.

    Thanks for the article

    1. Kauê Bonilha · · Reply

      I had the same issue as Srinivas Mummareddy. Any solution to this?

      Thanks for sharing

      1. thanks for the comments, will fix this now

  7. Jose Sus · · Reply

    Thanks, you do a nice article, in the project of git, i see a web job function, my question is When do you use it?

  8. Sander ten Brinke · · Reply

    Hello!

    I already have a solution almost the same as yours. I was looking for a better one because it feels very weird to subscribe to service bus messages in the HTTP pipeline of asp net core. I discussed this with some other people and the consensus is that it is much better to subscribe to the service bus by using a Hosted Service (.AddHostedService). That way you have async support, di, logging, and cancellation support, etc.. out of the box instead of with workarounds.

    With that in mind, why did you choose the current solution you have in this post? Do you have any good responses to the use of HostedService?

    1. Hi Sander ten Brinke

      Thanks for the feedback.

      I was thinking about background services as well and de-coupling the bus client. I am not sure this is better, maybe it is, I want to evaluate this some more. Persisting the data is one thing, error handling and recovery.

      I have this in production as well, works without problem. Not sure which is the best. Would be grateful if you have some more insights. I like to keep it as simple as possible.

      Greetings Damien

  9. Sathish Kumar · · Reply

    I trying to do unit testing for this receiver class. Getting difficult for me. Could you please help me, how to do unit and integration testing for this?

  10. […] Using Azure Service Bus Queues with ASP.NET Core ServicesUsing Azure Service Bus Topics in ASP.NET CoreUsing Azure Service Bus Topics Subscription Filters in ASP.NET CoreUsing Entity Framework Core to process Azure Service Messages in ASP.NET CoreUsing an Azure Service Bus Topic Subscription in an Azure FunctionUsing Azure Service Bus with restricted accessUsing an ASP.NET Core IHostedService to run Azure Service Bus subscriptions and consumers […]

  11. sdfsadf sdfs · · Reply

    does the Hosted Service go to idle when the API service goes to idle? or recycled the app pool, or restart IIS? may need a HTTP call to re-active the Hosted service

  12. Marcel · · Reply

    locally works for me, but in azure the consumer never receive messages. The publisher always send the messages.
    If I run the API in the azure console as a console app then the consumer process the messages.
    do you know what it happens?
    thanks in advance

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.