gRPC Bi-directional streaming with Razor Pages and a Hosted Service gRPC client

This article shows how a Bi-directional streaming gRPC service could be implemented using an .NET Core Hosted Service as a gRPC client, and a Razor Page to send Bi-directional streaming messages to the servers connected clients.

Code: https://github.com/damienbod/Secure_gRpc

Posts in this series

History

2019-09-06: Updated Nuget packages, .NET Core 3 preview 9
2019-08-13: Updated Nuget packages, .NET Core 3 preview 8
2019-07-28: Updated Nuget packages, .NET Core 3 preview 7
2019-04-20: Updated Nuget packages, .NET Core 3 preview 4 changes
2019-03-26 Added Code improvements from feedback

Setting up the Bi-directional streaming gRPC Server

The gRPC client and server code is defined using a proto3 file. This has a single method, SendData which takes a MyMessage stream.

syntax = "proto3";

package Duplex;

service Messaging {

  rpc SendData (stream MyMessage) returns (stream MyMessage) {}
}

message MyMessage {
  string name = 1;
  string message = 2;
}

The DuplexService class implements the gRPC service. This class implements the SendData method, which was defined using the proto3 definitions. The service uses a ServerGrpcSubscribers singleton service, which implements the broadcast. If a gRPC client sends a request, the client is added to the list of subscribers and then the message is broadcasted to all the other clients.

If the gRPC client closes gracefully, the client will be removed here as well. The service requires that the client send a valid bearer token by using the Authorize attribute.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Duplex;
using Grpc.Core;
using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.Logging;

namespace SecureGrpc.Server
{
    [Authorize(Policy = "protectedScope")]
    public class DuplexService : Messaging.MessagingBase, IDisposable
    {
        private readonly ILogger _logger;
        private readonly ServerGrpcSubscribers _serverGrpcSubscribers;

        public DuplexService(ILoggerFactory loggerFactory, ServerGrpcSubscribers serverGrpcSubscribers)
        {
            _logger = loggerFactory.CreateLogger<DuplexService>();
            _serverGrpcSubscribers = serverGrpcSubscribers;
        }

        public override async Task SendData(IAsyncStreamReader<MyMessage> requestStream, IServerStreamWriter<MyMessage> responseStream, ServerCallContext context)
        {
            var httpContext = context.GetHttpContext();
            _logger.LogInformation($"Connection id: {httpContext.Connection.Id}");

            if (!await requestStream.MoveNext())
            {
                return;
            }

            var user = requestStream.Current.Name;
            _logger.LogInformation($"{user} connected");
            var subscriber = new SubscribersModel
            {
                Subscriber = responseStream,
                Name = user
            };

            _serverGrpcSubscribers.AddSubscriber(subscriber);

            do
            {
                await _serverGrpcSubscribers.BroadcastMessageAsync(requestStream.Current);
            } while (await requestStream.MoveNext());

            _serverGrpcSubscribers.RemoveSubscriber(subscriber);
            _logger.LogInformation($"{user} disconnected");
        }

        public void Dispose()
        {
            _logger.LogInformation("Cleaning up");
        }
    }
}

The ServerGrpcSubscribers class implements the BroadcastMessageAsync method and the ConcurrentDictionary of clients are managed here. This service can be used to send server messages to the connected clients.

If when sending a message to a client fails, for example, the client application is killed, the broadcast will catch an exception, and remove this subscription.

using Duplex;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace SecureGrpc.Server
{
    public class ServerGrpcSubscribers
    {
        private readonly ILogger _logger;
        private readonly ConcurrentDictionary<string, SubscribersModel> Subscribers = new ConcurrentDictionary<string,SubscribersModel>();
        
        public ServerGrpcSubscribers(ILoggerFactory loggerFactory)
        {
            _logger = loggerFactory.CreateLogger<ServerGrpcSubscribers>();
        }

        public async Task BroadcastMessageAsync(MyMessage message)
        {
            await BroadcastMessages(message);
        }


        public void AddSubscriber(SubscribersModel subscriber)
        {
            bool added = Subscribers.TryAdd(subscriber.Name, subscriber);
            _logger.LogInformation($"New subscriber added: {subscriber.Name}");
            if (!added)
            {
                _logger.LogInformation($"could not add subscriber: {subscriber.Name}");
            }
        }

        public void RemoveSubscriber(SubscribersModel subscriber)
        {
            try
            {
                Subscribers.TryRemove(subscriber.Name, out SubscribersModel item);
                _logger.LogInformation($"Force Remove: {item.Name} - no longer works");
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, $"Could not remove {subscriber.Name}");
            }
        }

        private async Task BroadcastMessages(MyMessage message)
        {
            foreach (var subscriber in Subscribers.Values)
            {
                var item = await SendMessageToSubscriber(subscriber, message);
                if (item != null)
                {
                    RemoveSubscriber(item);
                };
            }
        }

        private async Task<SubscribersModel> SendMessageToSubscriber(SubscribersModel subscriber, MyMessage message)
        {
            try
            {
                _logger.LogInformation($"Broadcasting: {message.Name} - {message.Message}");
                await subscriber.Subscriber.WriteAsync(message);
                return null;
            }
            catch(Exception ex)
            {
                _logger.LogError(ex, "Could not send");
                return subscriber;
            }
        }

    }
}

The SubscribersModel class is used for the clients which are connected to the service. The RequireAuthorization method is used to define the authorization in the routing configuration.

public class SubscribersModel
{
	public IServerStreamWriter<MyMessage> Subscriber { get; set; }

	public string Name { get; set; }
}

The server startup configures the gRPC service.

using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using IdentityServer4.AccessTokenValidation;
using Microsoft.AspNetCore.Authentication.JwtBearer;
using System.Security.Claims;

namespace SecureGrpc.Server
{
    public class Startup
    {
        private string stsServer = "https://localhost:44352";

        public void ConfigureServices(IServiceCollection services)
        {
            services.AddHttpContextAccessor();

            services.AddSingleton<ServerGrpcSubscribers>();

            services.AddAuthorization(options =>
            {
                options.AddPolicy("protectedScope", policy =>
                {
                    policy.RequireClaim("scope", "grpc_protected_scope");
                });
            });

            services.AddAuthorizationPolicyEvaluator();

            services.AddAuthentication(IdentityServerAuthenticationDefaults.AuthenticationScheme)
                .AddIdentityServerAuthentication(options =>
                {
                    options.Authority = stsServer;
                    options.ApiName = "ProtectedGrpc";
                    options.ApiSecret = "grpc_protected_secret";
                    options.RequireHttpsMetadata = false;
                });

            services.AddGrpc(options =>
            {
                options.EnableDetailedErrors = true;
            });

            services.AddMvc()
               .AddNewtonsoftJson();
        }

        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            app.UseStaticFiles();
            app.UseHttpsRedirection();

            app.UseRouting();

            app.UseAuthentication();
            app.UseAuthorization();
            app.UseCors();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapGrpcService<GreeterService>().RequireAuthorization("protectedScope");
                endpoints.MapGrpcService<DuplexService>().RequireAuthorization("protectedScope");
                endpoints.MapRazorPages();
            });
        }
    }
}

The csproj requires the GrpcServices and the proto configuration to create the stubs.

<Project Sdk="Microsoft.NET.Sdk.Web">

  <PropertyGroup>
    <TargetFramework>netcoreapp3.0</TargetFramework>
  </PropertyGroup>

  <ItemGroup>
    <Protobuf Include="..\Protos\*.proto" GrpcServices="Server" />
    <Content Include="@(Protobuf)" LinkBase="" />
  </ItemGroup>

  <ItemGroup>
    <PackageReference Include="Grpc.AspNetCore.Server" Version="0.1.19-pre1" />
    <PackageReference Include="Google.Protobuf" Version="3.6.1" />

    <PackageReference Include="Grpc.Tools" Version="1.19.0-pre1" PrivateAssets="All" />

    <PackageReference Include="Microsoft.AspNetCore.Mvc.NewtonsoftJson" Version="3.0.0-preview-19075-0444" />
    <PackageReference Include="IdentityServer4.AccessTokenValidation" Version="2.7.0" />
  </ItemGroup>

  <ItemGroup>
    <None Update="server.pfx">
      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
    </None>
  </ItemGroup>

</Project>

Hosted Worker service gRPC client

The gRPC client is implemented in a worker class run in a Hosted Service. The csproj file also requires the gRPC configurations and the proto settings, otherwise the stub will not be built from the proto file.

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace BiDirectionalStreamingWorker
{
    public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureServices(services =>
                {
                    services.AddHostedService<Worker>();
                    services.AddSingleton<ApiService>();
                });
    }
}

The worker service implements the gRPC client. This is based on the example from the C# gRPC github repo.

The application gets a bearer token from the Secure token service, and uses the Metadata to add this as a header to the stream.

The data is then sent, received from the server. If the application is closed properly, it will close it’s connection. If the application is killed, the server gRPC server needs to handle this.

using System;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Net.Http;
using System.IO;

namespace BiDirectionalStreamingWorker
{
    public class Worker : BackgroundService
    {
        private readonly ILogger<Worker> _logger;

        public Worker(ILogger<Worker> logger)
        {
            _logger = logger;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            ///
            /// Token init
            /// 
            HttpClient httpClient = new HttpClient();
            ApiService apiService = new ApiService(httpClient);
            var token = await apiService.GetAccessTokenAsync();
            //var token = "This is invalid, I hope it fails";

            var tokenValue = "Bearer " + token;
            var metadata = new Metadata
            {
                { "Authorization", tokenValue }
            };

            ///
            /// Call gRPC HTTPS
            ///
            var channelCredentials = new SslCredentials(
                File.ReadAllText("Certs\\ca.crt"),
                    new KeyCertificatePair(
                        File.ReadAllText("Certs\\client.crt"),
                        File.ReadAllText("Certs\\client.key")
                    )
                );

            var port = "50051";

            var name = "worker_client";
            while (!stoppingToken.IsCancellationRequested)
            {
                _logger.LogInformation($"Worker running at: {DateTime.Now}");

                var channel = new Channel("localhost:" + port, channelCredentials);
                var client = new Duplex.Messaging.MessagingClient(channel);

                using (var sendData = client.SendData(metadata))
                {
                    Console.WriteLine($"Connected as {name}. Send empty message to quit.");

                    var responseTask = Task.Run(async () =>
                    {
                        while (await sendData.ResponseStream.MoveNext(stoppingToken))
                        {
                            Console.WriteLine($"{sendData.ResponseStream.Current.Name}: {sendData.ResponseStream.Current.Message}");
                        }
                    });

                    var line = Console.ReadLine();
                    while (!string.IsNullOrEmpty(line))
                    {
                        await sendData.RequestStream.WriteAsync(new Duplex.MyMessage { Name = name, Message = line });
                        line = Console.ReadLine();
                    }
                    await sendData.RequestStream.CompleteAsync();
                }

                await channel.ShutdownAsync();
                await Task.Delay(1000, stoppingToken);
            }
        }
    }
}

Sending server messages from the Server Razor Pages

On the gRPC server a Razor page can be used to send server messages to all the connected clients. For Razor pages and gRPC to work on the same kestrel server, HTTP and HTTP2 need to be allowed.

A Razor page implements a form to send a broadcast to all the connected gRPC clients, using the ServerGrpcSubscribers defined above.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Mvc.RazorPages;

namespace SecureGrpc.Server.Pages
{
    public class IndexModel : PageModel
    {
        private readonly ServerGrpcSubscribers _serverGrpcSubscribers;

        public IndexModel(ServerGrpcSubscribers serverGrpcSubscribers)
        {
            _serverGrpcSubscribers = serverGrpcSubscribers;
        }

        public void OnGet()
        {
        }

        public async Task OnPostAsync(string message)
        {
            await _serverGrpcSubscribers.BroadcastMessageAsync(
              new Duplex.MyMessage { Message = message, Name = "Server" });
        }
    }
}

Running the code

In Visual studio, build and run all the projects, multiple project start. The clients will get an access token from the Secure token service, and then send a message to the gRPC server from the clients using the console.

Using a browser at https://localhost:50051, a razor page can be opened to send a server message to the connected clients.

If a connected client is killed, a message is sent, the server throws an exception, and removes the client without crashing.

When the client connects again, the server can send messages to the same client.

Links:

https://github.com/grpc/grpc-dotnet/

https://grpc.io/

An Early Look at gRPC and ASP.NET Core 3.0

https://www.zoeys.blog/first-impressions-of-grpc-integration-in-asp-net-core-3-preview/

8 comments

  1. Luís Barbosa · · Reply

    Hi Damien,

    any reason to block on the channel shutdown (channel.ShutdownAsync().Wait()) instead of awaiting?

    1. no… This could be improved, thanks

      Greetings Damien

  2. […] gRPC Bi-directional streaming with Razor Pages and a Hosted Service gRPC client – Damien Bowden […]

  3. […] gRPC Bi-directional streaming with Razor Pages and a Hosted Service gRPC client […]

  4. endpoints.MapGrpcService().RequireAuthorization(“protectedScope”);

  5. ^greeter service left in the code

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: