The article show how an Azure Durable Function can be used to process a HTTP API request which waits for the completion result. This can be required when you have no control over the client application calling the API and the process requires asynchronous operations like further API calls and so on. The Azure Durable Function could call other APIs, run separate processes and it is unknown when this is finished. If you could control the client starting the process, you would not wait, but use a callback, for example in the last activity.
Code: https://github.com/damienbod/AzureDurableFunctions
Posts in this series
- Using External Inputs in Azure Durable functions
- Azure Functions Configuration and Secrets Management
- Using Key Vault and Managed Identities with Azure Functions
- Waiting for Azure Durable Functions to complete
- Azure Durable Functions Monitoring and Diagnostics
- Retry Error Handling for Activities and Orchestrations in Azure Durable Functions
History
2021-03-07 Update packages and using DefaultAzureCredential for Azure Key vault access
2020-09-18 Updated Configuration, updated Nuget packages
The API call underneath handles the client request using a HTTP POST request. The response is or can be specific for the client. The Azure Durable Function is implemented and processed in the Processing class. This returns the result directly. The data received in the body of the request is passed as a parameter. The data returned also needs to be in the format required by the client, and not the format you use.
using System.Threading.Tasks; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Extensions.Http; using Microsoft.Extensions.Logging; using System.Net.Http; using Microsoft.AspNetCore.Mvc; using DurableWait.Model; using Microsoft.Azure.WebJobs.Extensions.DurableTask; namespace DurableWait.Apis { public class BeginFlowWithHttpPost { private readonly Processing _processing; public BeginFlowWithHttpPost(Processing processing) { _processing = processing; } [FunctionName(Constants.BeginFlowWithHttpPost)] public async Task<IActionResult> HttpStart( [HttpTrigger(AuthorizationLevel.Anonymous, "post")] HttpRequestMessage request, [DurableClient] IDurableOrchestrationClient client, ILogger log) { log.LogInformation("Started new flow"); BeginRequestData beginRequestData = await request.Content.ReadAsAsync<BeginRequestData>(); log.LogInformation($"Started new flow with ID = '{beginRequestData.Id}'."); return await _processing.ProcessFlow(beginRequestData, request, client); } } }
The Processing class starts the Azure Durable Function and waits for this to complete. The IDurableOrchestrationClient interface is passed as a parameter from the Azure Function. The MyOrchestration orchestration is started and the method waits for this to complete or timeout using the WaitForCompletionOrCreateCheckStatusResponseAsync method. If the process times out, the result is returned without a completed status. An InternalServerError 500 result could be returned for this and the status can be set to terminated. If the Azure Durable Function completes successfully, the result needs to be mapped to the caller’s client API required body result, not the output of the Azure Durable Function. This can be created using the data from the status request. The CompleteResponseData data is produced using the data from the Azure Durable Function output and returned to the client.
using DurableWait.Model; using Microsoft.AspNetCore.Mvc; using Microsoft.Azure.WebJobs.Extensions.DurableTask; using Microsoft.Extensions.Logging; using System; using System.Net; using System.Net.Http; using System.Threading.Tasks; using System.Web.Http; namespace DurableWait { public class Processing { private readonly ILogger<Processing> _log; public Processing(ILoggerFactory loggerFactory) { _log = loggerFactory.CreateLogger<Processing>(); } public async Task<IActionResult> ProcessFlow( BeginRequestData beginRequestData, HttpRequestMessage request, IDurableOrchestrationClient client) { await client.StartNewAsync(Constants.MyOrchestration, beginRequestData.Id, beginRequestData); _log.LogInformation($"Started orchestration with ID = '{beginRequestData.Id}'."); TimeSpan timeout = TimeSpan.FromSeconds(7); TimeSpan retryInterval = TimeSpan.FromSeconds(1); await client.WaitForCompletionOrCreateCheckStatusResponseAsync( request, beginRequestData.Id, timeout, retryInterval); var data = await client.GetStatusAsync(beginRequestData.Id); // timeout if(data.RuntimeStatus != OrchestrationRuntimeStatus.Completed) { await client.TerminateAsync(beginRequestData.Id, "Timeout something took too long"); return new ContentResult() { Content = "{ error: \"Timeout something took too long\" }", ContentType = "application/json", StatusCode = (int)HttpStatusCode.InternalServerError }; } var output = data.Output.ToObject<MyOrchestrationDto>(); var completeResponseData = new CompleteResponseData { BeginRequestData = output.BeginRequest, Id2 = output.BeginRequest.Id + ".v2", MyActivityTwoResult = output.MyActivityTwoResult }; return new OkObjectResult(completeResponseData); } } }
The MyOrchestration class implements the Azure Durable Function orchestration. This has two activities and uses the body from the client API call as the input data. The result of each activity is added to the orchestration data.
using System.Collections.Generic; using System.Threading.Tasks; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Extensions.DurableTask; using Microsoft.Extensions.Logging; using DurableWait.Model; using DurableWait; namespace MyAzureFDurableWaitunctions.Orchestrations { public class MyOrchestration { [FunctionName(Constants.MyOrchestration)] public async Task<MyOrchestrationDto> RunOrchestrator( [OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log) { var myOrchestrationDto = new MyOrchestrationDto { BeginRequest = context.GetInput<BeginRequestData>() }; if (!context.IsReplaying) { log.LogWarning($"begin MyOrchestration with input id {myOrchestrationDto.BeginRequest.Id}"); } var myActivityOne = await context.CallActivityAsync<string>( Constants.MyActivityOne, context.GetInput<BeginRequestData>()); myOrchestrationDto.MyActivityOneResult = myActivityOne; if(!context.IsReplaying) { log.LogWarning($"myActivityOne completed {myActivityOne}"); } var myActivityTwo = await context.CallActivityAsync<string>( Constants.MyActivityTwo, myOrchestrationDto); myOrchestrationDto.MyActivityTwoResult = myActivityTwo; if (!context.IsReplaying) { log.LogWarning($"myActivityTwo completed {myActivityTwo}"); } return myOrchestrationDto; } } }
The Startup classes adds the services to the DI so that construction injection can be used in the implementation classes.
using Microsoft.Azure.Functions.Extensions.DependencyInjection; using Microsoft.Azure.KeyVault; using Microsoft.Azure.Services.AppAuthentication; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using DurableWait; using DurableWait.Activities; using System; using System.Reflection; [assembly: FunctionsStartup(typeof(Startup))] namespace DurableWait { public class Startup : FunctionsStartup { public override void Configure(IFunctionsHostBuilder builder) { builder.Services.AddOptions<MyConfiguration>() .Configure<IConfiguration>((settings, configuration) => { configuration.GetSection("MyConfiguration").Bind(settings); }); builder.Services.AddOptions<MyConfigurationSecrets>() .Configure<IConfiguration>((settings, configuration) => { configuration.GetSection("MyConfigurationSecrets").Bind(settings); }); builder.Services.AddLogging(); builder.Services.AddScoped<MyActivities>(); builder.Services.AddScoped<Processing>(); } public override void ConfigureAppConfiguration(IFunctionsConfigurationBuilder builder) { var builtConfig = builder.ConfigurationBuilder.Build(); var keyVaultEndpoint = builtConfig["AzureKeyVaultEndpoint"]; if (!string.IsNullOrEmpty(keyVaultEndpoint)) { // might need this depending on local dev env //var credential = new DefaultAzureCredential( // new DefaultAzureCredentialOptions { ExcludeSharedTokenCacheCredential = true }); // using Key Vault, either local dev or deployed builder.ConfigurationBuilder .SetBasePath(Environment.CurrentDirectory) .AddAzureKeyVault(new Uri(keyVaultEndpoint), new DefaultAzureCredential()) .AddJsonFile("local.settings.json", true) .AddEnvironmentVariables() .Build(); } else { // local dev no Key Vault builder.ConfigurationBuilder .SetBasePath(Environment.CurrentDirectory) .AddJsonFile("local.settings.json", true) .AddUserSecrets(Assembly.GetExecutingAssembly(), true) .AddEnvironmentVariables() .Build(); } } } }
If the process completes successfully, the result gets returned as required.
If the process fails, an error message is returned after the timeout. This was simulated using a thread sleep in an activity. The API call is set to timeout after 7 seconds.
Links:
https://damienbod.com/2018/12/23/using-azure-key-vault-with-asp-net-core-and-azure-app-services/
https://docs.microsoft.com/en-us/azure/azure-functions/durable/
https://github.com/Azure/azure-functions-durable-extension
https://damienbod.com/2019/03/14/running-local-azure-functions-in-visual-studio-with-https/
Microsoft Azure Storage Explorer
Microsoft Azure Storage Emulator
[…] Waiting for Azure Durable Functions to complete (Damien Bowden) […]
Thanks for this post! This gave me a bunch of inspiration for my azure function that leads into a service bus
—
andreaslengkeek@ssw.com.au
Thanks
Greetings Damien
Why timeout = 7 seconds? It looks like not enough.
Is it ready to use in production?