This article shows how a Bi-directional streaming gRPC service could be implemented using an .NET Core Hosted Service as a gRPC client, and a Razor Page to send Bi-directional streaming messages to the servers connected clients.
Code: https://github.com/damienbod/Secure_gRpc
Posts in this series
- Security Experiments with gRPC and ASP.NET Core 5
- Running Razor Pages and a gRPC service in a single ASP.NET Core application
- gRPC Bi-directional streaming with Razor Pages and a Hosted Service gRPC client
History
2020-11-25: Updated to .NET 5
2019-12-17: Updated Nuget packages, .NET Core 3.1, updated grpc implementations
2019-09-06: Updated Nuget packages, .NET Core 3 preview 9
2019-08-13: Updated Nuget packages, .NET Core 3 preview 8
2019-07-28: Updated Nuget packages, .NET Core 3 preview 7
2019-04-20: Updated Nuget packages, .NET Core 3 preview 4 changes
2019-03-26 Added Code improvements from feedback
Setting up the Bi-directional streaming gRPC Server
The gRPC client and server code is defined using a proto3 file. This has a single method, SendData which takes a MyMessage stream.
syntax = "proto3"; package Duplex; service Messaging { rpc SendData (stream MyMessage) returns (stream MyMessage) {} } message MyMessage { string name = 1; string message = 2; }
The DuplexService class implements the gRPC service. This class implements the SendData method, which was defined using the proto3 definitions. The service uses a ServerGrpcSubscribers singleton service, which implements the broadcast. If a gRPC client sends a request, the client is added to the list of subscribers and then the message is broadcasted to all the other clients.
If the gRPC client closes gracefully, the client will be removed here as well. The service requires that the client send a valid bearer token by using the Authorize attribute.
using System; using System.Collections.Generic; using System.Threading.Tasks; using Duplex; using Grpc.Core; using Microsoft.AspNetCore.Authorization; using Microsoft.Extensions.Logging; namespace SecureGrpc.Server { [Authorize(Policy = "protectedScope")] public class DuplexService : Messaging.MessagingBase, IDisposable { private readonly ILogger _logger; private readonly ServerGrpcSubscribers _serverGrpcSubscribers; public DuplexService(ILoggerFactory loggerFactory, ServerGrpcSubscribers serverGrpcSubscribers) { _logger = loggerFactory.CreateLogger<DuplexService>(); _serverGrpcSubscribers = serverGrpcSubscribers; } public override async Task SendData(IAsyncStreamReader<MyMessage> requestStream, IServerStreamWriter<MyMessage> responseStream, ServerCallContext context) { var httpContext = context.GetHttpContext(); _logger.LogInformation($"Connection id: {httpContext.Connection.Id}"); if (!await requestStream.MoveNext()) { return; } var user = requestStream.Current.Name; _logger.LogInformation($"{user} connected"); var subscriber = new SubscribersModel { Subscriber = responseStream, Name = user }; _serverGrpcSubscribers.AddSubscriber(subscriber); do { await _serverGrpcSubscribers.BroadcastMessageAsync(requestStream.Current); } while (await requestStream.MoveNext()); _serverGrpcSubscribers.RemoveSubscriber(subscriber); _logger.LogInformation($"{user} disconnected"); } public void Dispose() { _logger.LogInformation("Cleaning up"); } } }
The ServerGrpcSubscribers class implements the BroadcastMessageAsync method and the ConcurrentDictionary of clients are managed here. This service can be used to send server messages to the connected clients.
If when sending a message to a client fails, for example, the client application is killed, the broadcast will catch an exception, and remove this subscription.
using Duplex; using Microsoft.Extensions.Logging; using System; using System.Collections.Concurrent; using System.Threading.Tasks; namespace SecureGrpc.Server { public class ServerGrpcSubscribers { private readonly ILogger _logger; private readonly ConcurrentDictionary<string, SubscribersModel> Subscribers = new ConcurrentDictionary<string,SubscribersModel>(); public ServerGrpcSubscribers(ILoggerFactory loggerFactory) { _logger = loggerFactory.CreateLogger<ServerGrpcSubscribers>(); } public async Task BroadcastMessageAsync(MyMessage message) { await BroadcastMessages(message); } public void AddSubscriber(SubscribersModel subscriber) { bool added = Subscribers.TryAdd(subscriber.Name, subscriber); _logger.LogInformation($"New subscriber added: {subscriber.Name}"); if (!added) { _logger.LogInformation($"could not add subscriber: {subscriber.Name}"); } } public void RemoveSubscriber(SubscribersModel subscriber) { try { Subscribers.TryRemove(subscriber.Name, out SubscribersModel item); _logger.LogInformation($"Force Remove: {item.Name} - no longer works"); } catch (Exception ex) { _logger.LogError(ex, $"Could not remove {subscriber.Name}"); } } private async Task BroadcastMessages(MyMessage message) { foreach (var subscriber in Subscribers.Values) { var item = await SendMessageToSubscriber(subscriber, message); if (item != null) { RemoveSubscriber(item); }; } } private async Task<SubscribersModel> SendMessageToSubscriber(SubscribersModel subscriber, MyMessage message) { try { _logger.LogInformation($"Broadcasting: {message.Name} - {message.Message}"); await subscriber.Subscriber.WriteAsync(message); return null; } catch(Exception ex) { _logger.LogError(ex, "Could not send"); return subscriber; } } } }
The SubscribersModel class is used for the clients which are connected to the service. The RequireAuthorization method is used to define the authorization in the routing configuration.
public class SubscribersModel { public IServerStreamWriter<MyMessage> Subscriber { get; set; } public string Name { get; set; } }
The server startup configures the gRPC service.
using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using IdentityServer4.AccessTokenValidation; using Microsoft.AspNetCore.Authentication.JwtBearer; using System.Security.Claims; namespace SecureGrpc.Server { public class Startup { private string stsServer = "https://localhost:44352"; public void ConfigureServices(IServiceCollection services) { services.AddHttpContextAccessor(); services.AddSingleton<ServerGrpcSubscribers>(); services.AddAuthorization(options => { options.AddPolicy("protectedScope", policy => { policy.RequireClaim("scope", "grpc_protected_scope"); }); }); services.AddAuthorizationPolicyEvaluator(); services.AddAuthentication(IdentityServerAuthenticationDefaults.AuthenticationScheme) .AddIdentityServerAuthentication(options => { options.Authority = stsServer; options.ApiName = "ProtectedGrpc"; options.ApiSecret = "grpc_protected_secret"; options.RequireHttpsMetadata = false; }); services.AddGrpc(options => { options.EnableDetailedErrors = true; }); services.AddMvc() .AddNewtonsoftJson(); } public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { app.UseStaticFiles(); app.UseHttpsRedirection(); app.UseRouting(); app.UseAuthentication(); app.UseAuthorization(); app.UseCors(); app.UseEndpoints(endpoints => { endpoints.MapGrpcService<GreeterService>().RequireAuthorization("protectedScope"); endpoints.MapGrpcService<DuplexService>().RequireAuthorization("protectedScope"); endpoints.MapRazorPages(); }); } } }
The csproj requires the GrpcServices and the proto configuration to create the stubs.
<Project Sdk="Microsoft.NET.Sdk.Web"> <PropertyGroup> <TargetFramework>netcoreapp3.1</TargetFramework> </PropertyGroup> <ItemGroup> <Protobuf Include="..\Protos\*.proto" GrpcServices="Server" /> <Content Include="@(Protobuf)" LinkBase="" /> </ItemGroup> <ItemGroup> <PackageReference Include="Microsoft.AspNetCore.Authentication.Certificate" Version="3.1.0" /> </ItemGroup> <ItemGroup> <PackageReference Include="Grpc.AspNetCore.Server" Version="2.25.0" /> <PackageReference Include="Google.Protobuf" Version="3.11.2" /> <PackageReference Include="Grpc.Tools" Version="2.25.0" PrivateAssets="All" /> <PackageReference Include="Microsoft.AspNetCore.Mvc.NewtonsoftJson" Version="3.1.0" /> <PackageReference Include="IdentityServer4.AccessTokenValidation" Version="3.0.1" /> <PackageReference Include="Serilog" Version="2.9.0" /> <PackageReference Include="Serilog.AspNetCore" Version="3.2.0" /> <PackageReference Include="Serilog.Settings.Configuration" Version="3.1.1-dev-00209" /> <PackageReference Include="Serilog.Sinks.Console" Version="3.1.2-dev-00824" /> <PackageReference Include="Serilog.Sinks.File" Version="4.1.0" /> </ItemGroup> <ItemGroup> <None Update="Certs\client1.pfx"> <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> </None> <None Update="Certs\server1.pfx"> <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> </None> <None Update="Certs\server2.pfx"> <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> </None> <None Update="server.pfx"> <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> </None> </ItemGroup> </Project>
Hosted Worker service gRPC client
The gRPC client is implemented in a worker class run in a Hosted Service. The csproj file also requires the gRPC configurations and the proto settings, otherwise the stub will not be built from the proto file.
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; namespace BiDirectionalStreamingWorker { public class Program { public static void Main(string[] args) { CreateHostBuilder(args).Build().Run(); } public static IHostBuilder CreateHostBuilder(string[] args) => Host.CreateDefaultBuilder(args) .ConfigureServices(services => { services.AddHostedService<Worker>(); services.AddSingleton<ApiService>(); }); } }
The worker service implements the gRPC client. This is based on the example from the C# gRPC github repo.
The application gets a bearer token from the Secure token service, and uses the Metadata to add this as a header to the stream.
The data is then sent, received from the server. If the application is closed properly, it will close it’s connection. If the application is killed, the server gRPC server needs to handle this.
using System; using System.Threading; using System.Threading.Tasks; using Grpc.Core; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System.Net.Http; using System.IO; using System.Security.Cryptography.X509Certificates; using Grpc.Net.Client; namespace BiDirectionalStreamingWorker { public class Worker : BackgroundService { private readonly ILogger<Worker> _logger; public Worker(ILogger<Worker> logger) { _logger = logger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { /// /// Token init /// HttpClient httpClient = new HttpClient(); ApiService apiService = new ApiService(httpClient); var token = await apiService.GetAccessTokenAsync(); //var token = "This is invalid, I hope it fails"; var tokenValue = "Bearer " + token; var metadata = new Metadata { { "Authorization", tokenValue } }; var channel = GrpcChannel.ForAddress("https://localhost:50051", new GrpcChannelOptions { HttpClient = CreateHttpClient() }); var name = "worker_client"; while (!stoppingToken.IsCancellationRequested) { _logger.LogInformation($"Worker running at: {DateTime.Now}"); var client = new Duplex.Messaging.MessagingClient(channel); using (var sendData = client.SendData(metadata)) { Console.WriteLine($"Connected as {name}. Send empty message to quit."); var responseTask = Task.Run(async () => { while (await sendData.ResponseStream.MoveNext(stoppingToken)) { Console.WriteLine($"{sendData.ResponseStream.Current.Name}: {sendData.ResponseStream.Current.Message}"); } }); var line = Console.ReadLine(); while (!string.IsNullOrEmpty(line)) { await sendData.RequestStream.WriteAsync(new Duplex.MyMessage { Name = name, Message = line }); line = Console.ReadLine(); } await sendData.RequestStream.CompleteAsync(); } await Task.Delay(1000, stoppingToken); } } private static HttpClient CreateHttpClient() { var handler = new HttpClientHandler(); var cert = new X509Certificate2(Path.Combine("Certs/client2.pfx"), "1111"); handler.ClientCertificates.Add(cert); // Create client return new HttpClient(handler); } } }
Sending server messages from the Server Razor Pages
On the gRPC server a Razor page can be used to send server messages to all the connected clients. For Razor pages and gRPC to work on the same kestrel server, HTTP and HTTP2 need to be allowed.
A Razor page implements a form to send a broadcast to all the connected gRPC clients, using the ServerGrpcSubscribers defined above.
using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc.RazorPages; namespace SecureGrpc.Server.Pages { public class IndexModel : PageModel { private readonly ServerGrpcSubscribers _serverGrpcSubscribers; public IndexModel(ServerGrpcSubscribers serverGrpcSubscribers) { _serverGrpcSubscribers = serverGrpcSubscribers; } public void OnGet() { } public async Task OnPostAsync(string message) { await _serverGrpcSubscribers.BroadcastMessageAsync( new Duplex.MyMessage { Message = message, Name = "Server" }); } } }
Running the code
In Visual studio, build and run all the projects, multiple project start. The clients will get an access token from the Secure token service, and then send a message to the gRPC server from the clients using the console.
Using a browser at https://localhost:50051, a razor page can be opened to send a server message to the connected clients.
If a connected client is killed, a message is sent, the server throws an exception, and removes the client without crashing.
When the client connects again, the server can send messages to the same client.
Links:
https://github.com/grpc/grpc-dotnet/
https://www.zoeys.blog/first-impressions-of-grpc-integration-in-asp-net-core-3-preview/
Nice!
thanks
Hi Damien,
any reason to block on the channel shutdown (channel.ShutdownAsync().Wait()) instead of awaiting?
no… This could be improved, thanks
Greetings Damien
[…] gRPC Bi-directional streaming with Razor Pages and a Hosted Service gRPC client – Damien Bowden […]
[…] gRPC Bi-directional streaming with Razor Pages and a Hosted Service gRPC client […]
endpoints.MapGrpcService().RequireAuthorization(“protectedScope”);
^greeter service left in the code
[…] ServerNotification() sendet jede IntervalMs Millisekunden einen ServerNotificationMessage mit der aktuellen Zeit durch den Server-Stream zum Client. Diese Methode simuliert das asynchrone Notifizieren vom Server zum Client. Es gibt mit dieser Technik auch die Möglichkeit einen Broadcast an allen Clients zu machen. Der Server muss sich dann den IServerStreamWriterStreams von jedem Client merken und beim Broadcast die Clients einzeln aufrufen. Für ein Beispiel siehe den Post von Damien Bowden. […]
Thank you for such a detailed article. I followed it and just add below code to suppress errors of invalid certificate.
var handler = new HttpClientHandler();
handler.ServerCertificateCustomValidationCallback = (sender, cert, chain, sslPolicyErrors) => { return true; };
return new HttpClient(handler);
Now I am trying to add razor page for frontend of BiDirectionalStreamingWorker instead of console interface.