Waiting for Azure Durable Functions to complete

The article show how an Azure Durable Function can be used to process a HTTP API request which waits for the completion result. This can be required when you have no control over the client application calling the API and the process requires asynchronous operations like further API calls and so on. The Azure Durable Function could call other APIs, run separate processes and it is unknown when this is finished. If you could control the client starting the process, you would not wait, but use a callback, for example in the last activity.

Code: https://github.com/damienbod/AzureDurableFunctions

Posts in this series

The API call underneath handles the client request using a HTTP POST request. The response is or can be specific for the client. The Azure Durable Function is implemented and processed in the Processing class. This returns the result directly. The data received in the body of the request is passed as a parameter. The data returned also needs to be in the format required by the client, and not the format you use.

using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
using System.Net.Http;
using Microsoft.AspNetCore.Mvc;
using DurableWait.Model;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;

namespace DurableWait.Apis
{
    
    public class BeginFlowWithHttpPost
    {
        private readonly Processing _processing;

        public BeginFlowWithHttpPost(Processing processing)
        {
            _processing = processing;
        }

        [FunctionName(Constants.BeginFlowWithHttpPost)]
        public async Task<IActionResult> HttpStart(
          [HttpTrigger(AuthorizationLevel.Anonymous, "post")] HttpRequestMessage request,
          [DurableClient] IDurableOrchestrationClient client,
          ILogger log)
        {
            log.LogInformation("Started new flow");

            BeginRequestData beginRequestData = await request.Content.ReadAsAsync<BeginRequestData>();
            log.LogInformation($"Started new flow with ID = '{beginRequestData.Id}'.");

            return await _processing.ProcessFlow(beginRequestData, request, client);
        }
    }
}

The Processing class starts the Azure Durable Function and waits for this to complete. The IDurableOrchestrationClient interface is passed as a parameter from the Azure Function. The MyOrchestration orchestration is started and the method waits for this to complete or timeout using the WaitForCompletionOrCreateCheckStatusResponseAsync method. If the process times out, the result is returned without a completed status. An InternalServerError 500 result could be returned for this and the status can be set to terminated. If the Azure Durable Function completes successfully, the result needs to be mapped to the caller’s client API required body result, not the output of the Azure Durable Function. This can be created using the data from the status request. The CompleteResponseData data is produced using the data from the Azure Durable Function output and returned to the client.

using DurableWait.Model;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using System;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using System.Web.Http;

namespace DurableWait
{
    public class Processing
    {
        private readonly ILogger<Processing> _log;

        public Processing(ILoggerFactory loggerFactory)
        {
            _log = loggerFactory.CreateLogger<Processing>();
        }

        public async Task<IActionResult> ProcessFlow(
            BeginRequestData beginRequestData, 
            HttpRequestMessage request,
            IDurableOrchestrationClient client)
        {
            await client.StartNewAsync(Constants.MyOrchestration, beginRequestData.Id, beginRequestData);
            _log.LogInformation($"Started orchestration with ID = '{beginRequestData.Id}'.");

            TimeSpan timeout = TimeSpan.FromSeconds(7);
            TimeSpan retryInterval = TimeSpan.FromSeconds(1);

            await client.WaitForCompletionOrCreateCheckStatusResponseAsync(
                request,
                beginRequestData.Id,
                timeout,
                retryInterval);

            var data = await client.GetStatusAsync(beginRequestData.Id);

            // timeout
            if(data.RuntimeStatus != OrchestrationRuntimeStatus.Completed)
            {
                await client.TerminateAsync(beginRequestData.Id, "Timeout something took too long");
                return new ContentResult()
                {
                    Content = "{ error: \"Timeout something took too long\" }",
                    ContentType = "application/json",
                    StatusCode = (int)HttpStatusCode.InternalServerError
                };
            }
            var output = data.Output.ToObject<MyOrchestrationDto>();

            var completeResponseData = new CompleteResponseData
            {
                BeginRequestData = output.BeginRequest,
                Id2 = output.BeginRequest.Id + ".v2",
                MyActivityTwoResult = output.MyActivityTwoResult
            };

            return new OkObjectResult(completeResponseData);
        }
    }
}

The MyOrchestration class implements the Azure Durable Function orchestration. This has two activities and uses the body from the client API call as the input data. The result of each activity is added to the orchestration data.

using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using DurableWait.Model;
using DurableWait;

namespace MyAzureFDurableWaitunctions.Orchestrations
{
    public class MyOrchestration
    {
        [FunctionName(Constants.MyOrchestration)]
        public async Task<MyOrchestrationDto> RunOrchestrator(
            [OrchestrationTrigger] IDurableOrchestrationContext context,
            ILogger log)
        {
            var myOrchestrationDto = new MyOrchestrationDto
            {
                BeginRequest = context.GetInput<BeginRequestData>()
            };

            if (!context.IsReplaying)
            {
                log.LogWarning($"begin MyOrchestration with input id {myOrchestrationDto.BeginRequest.Id}");
            }

            var myActivityOne = await context.CallActivityAsync<string>(
                Constants.MyActivityOne, context.GetInput<BeginRequestData>());

            myOrchestrationDto.MyActivityOneResult = myActivityOne;

            if(!context.IsReplaying)
            {
                log.LogWarning($"myActivityOne completed {myActivityOne}");
            }

            var myActivityTwo = await context.CallActivityAsync<string>(
                Constants.MyActivityTwo, myOrchestrationDto);

            myOrchestrationDto.MyActivityTwoResult = myActivityTwo;

            if (!context.IsReplaying)
            {
                log.LogWarning($"myActivityTwo completed {myActivityTwo}");
            }

            return myOrchestrationDto;
        }
    }
}

The Startup classes adds the services to the DI so that construction injection can be used in the implementation classes.

using Microsoft.Azure.Functions.Extensions.DependencyInjection;
using Microsoft.Azure.KeyVault;
using Microsoft.Azure.Services.AppAuthentication;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using DurableWait;
using DurableWait.Activities;
using System;
using System.Reflection;

[assembly: FunctionsStartup(typeof(Startup))]

namespace DurableWait
{
    public class Startup : FunctionsStartup
    {
        public override void Configure(IFunctionsHostBuilder builder)
        {
            var keyVaultEndpoint = Environment.GetEnvironmentVariable("AzureKeyVaultEndpoint");

            if (!string.IsNullOrEmpty(keyVaultEndpoint))
            {
                // using Key Vault, either local dev or deployed
                var azureServiceTokenProvider = new AzureServiceTokenProvider();
                var keyVaultClient = new KeyVaultClient(new KeyVaultClient.AuthenticationCallback(azureServiceTokenProvider.KeyVaultTokenCallback));

                var config = new ConfigurationBuilder()
                        .AddAzureKeyVault(keyVaultEndpoint)
                        .SetBasePath(Environment.CurrentDirectory)
                        .AddJsonFile("local.settings.json", true)
                        .AddEnvironmentVariables()
                    .Build();

                builder.Services.AddSingleton<IConfiguration>(config);
            }
            else
            {
                // local dev no Key Vault
                var config = new ConfigurationBuilder()
               .SetBasePath(Environment.CurrentDirectory)
               .AddJsonFile("local.settings.json", true)
               .AddUserSecrets(Assembly.GetExecutingAssembly(), true)
               .AddEnvironmentVariables()
               .Build();

                builder.Services.AddSingleton<IConfiguration>(config);
            }

            builder.Services.AddOptions<MyConfiguration>()
                .Configure<IConfiguration>((settings, configuration) =>
                {
                    configuration.GetSection("MyConfiguration").Bind(settings);
                });

            builder.Services.AddOptions<MyConfigurationSecrets>()
                .Configure<IConfiguration>((settings, configuration) =>
                {
                    configuration.GetSection("MyConfigurationSecrets").Bind(settings);
                });

            builder.Services.AddLogging();
            builder.Services.AddScoped<MyActivities>();
            builder.Services.AddScoped<Processing>();
        }
    }
}

If the process completes successfully, the result gets returned as required.

If the process fails, an error message is returned after the timeout. This was simulated using a thread sleep in an activity. The API call is set to timeout after 7 seconds.

Links:

https://damienbod.com/2018/12/23/using-azure-key-vault-with-asp-net-core-and-azure-app-services/

https://docs.microsoft.com/en-us/azure/azure-functions/functions-how-to-use-azure-function-app-settings

https://docs.microsoft.com/en-us/azure/azure-functions/durable/

https://github.com/Azure/azure-functions-durable-extension

https://damienbod.com/2019/03/14/running-local-azure-functions-in-visual-studio-with-https/

Microsoft Azure Storage Explorer

Microsoft Azure Storage Emulator

Install the Azure Functions Core Tools

NodeJS

Azure CLI

Azure SDK

Visual Studio zure development extensions

One comment

  1. […] Waiting for Azure Durable Functions to complete (Damien Bowden) […]

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: