This post shows how to implement an Azure Durable function flow with an external HTTP API input. The flow is started using a HTTP request, runs an activity, waits for the external input from a HTTP API which could be any Azure function input and then runs a second activity. The application is implemented using C# and uses Version 3 Azure functions.
Code: https://github.com/damienbod/AzureDurableFunctions
Posts in this series
- Using External Inputs in Azure Durable functions
- Azure Functions Configuration and Secrets Management
- Using Key Vault and Managed Identities with Azure Functions
- Waiting for Azure Durable Functions to complete
- Azure Durable Functions Monitoring and Diagnostics
- Retry Error Handling for Activities and Orchestrations in Azure Durable Functions
Azure Durable Functions
Azure Durable functions provides a simple way of implementing workflows in a serverless architecture. Durable functions are built on top of Azure functions and supports chaining, Fan-out/fan-in, external inputs, stateful flows, eternal flows with excellent diagnostic and monitoring APIs.
Durable function Orchestration
Orchestrations connect the activities of workflows or sub orchestrations together. The flows can be stateful and are slightly uncharacteristic to normal application code execution. The code in the orchestration can be re-run many times, but the activities are only run once. The results of the activities are always returned. The orchestration function code must be deterministic. The orchestrations are durable and reliable. When the code is run the first time, it is not replaying and after this, the part of the code is replaying. This can be checked with the IsReplaying property of the IDurableOrchestrationContext context. Every time the orchestration is started, it uses an instance ID to connect to the future steps , past steps. You can provide your own instance ID or auto generate this.
Further docs can be found here.
using System.Collections.Generic; using System.Threading.Tasks; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Extensions.DurableTask; using Microsoft.Extensions.Logging; using MyAzureFunctions.Model; namespace MyAzureFunctions.Orchestrations { public class MyOrchestration { [FunctionName(Constants.MyOrchestration)] public async Task<MyOrchestrationDto> RunOrchestrator( [OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log) { var myOrchestrationDto = new MyOrchestrationDto { InputStartData = context.GetInput<string>() }; if (!context.IsReplaying) { log.LogWarning($"begin MyOrchestration with input {context.GetInput<string>()}"); } var myActivityOne = await context.CallActivityAsync<string>( Constants.MyActivityOne, context.GetInput<string>()); myOrchestrationDto.MyActivityOneResult = myActivityOne; if(!context.IsReplaying) { log.LogWarning($"myActivityOne completed {myActivityOne}"); } var myActivityTwoInputEvent = await context.WaitForExternalEvent<string>( Constants.MyExternalInputEvent); myOrchestrationDto.ExternalInputData = myActivityTwoInputEvent; var myActivityTwo = await context.CallActivityAsync<string>( Constants.MyActivityTwo, myActivityTwoInputEvent); myOrchestrationDto.MyActivityTwoResult = myActivityTwo; if (!context.IsReplaying) { log.LogWarning($"myActivityTwo completed {myActivityTwo}"); } return myOrchestrationDto; } } }
Durable function Activities
Activities are normally only executed once for each run of a flow. The activity can use the IDurableActivityContext as an input parameter, or some typed parameter. You can only pass a single param to an activity. This is where you can implement the business of the flow. The orchestration glues this together. The result of the activity can be used many times, but the activity itself is only run once, unless using the ContinueAsNew method.
using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Extensions.DurableTask; using Microsoft.Extensions.Logging; namespace MyAzureFunctions.Activities { public class MyActivities { [FunctionName(Constants.MyActivityOne)] public string MyActivityOne([ActivityTrigger] IDurableActivityContext context, ILogger log) { string name = context.GetInput<string>(); log.LogInformation($"Activity {Constants.MyActivityOne} {name}."); return $"{Constants.MyActivityOne} {name}!"; } [FunctionName(Constants.MyActivityTwo)] public string MyActivityTwo([ActivityTrigger] IDurableActivityContext context, ILogger log) { string name = context.GetInput<string>(); log.LogInformation($"Activity {Constants.MyActivityTwo} {name}."); return $"{Constants.MyActivityTwo} {name}!"; } } }
External Input
External inputs in a workflow are extremely useful when waiting for an HTTP call, any UI user event, or any external events. A timer can be set to execute a separate flow result if the event is not called or completed within a certain time limit. This can be very useful. The example below is waiting for an API call with the instance ID and raises the event with the data intended for the flow. The orchestration would then continue with the next part of the flow using the data from this event code.
using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Extensions.Http; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; using Microsoft.Azure.WebJobs.Extensions.DurableTask; namespace MyAzureFunctions.Apis { public class ExternalHttpPostInput { [FunctionName(Constants.ExternalHttpPostInput)] public async Task<IActionResult> Run( [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req, [DurableClient] IDurableOrchestrationClient client, ILogger log) { string instanceId = req.Query["instanceId"]; var status = await client.GetStatusAsync(instanceId); await client.RaiseEventAsync(instanceId, Constants.MyExternalInputEvent, "inputDataTwo"); log.LogInformation("C# HTTP trigger function processed a request."); string responseMessage = string.IsNullOrEmpty(instanceId) ? "This HTTP triggered function executed successfully. Pass an instanceId in the query string" : $"Received, processing, {instanceId}"; return new OkObjectResult(responseMessage); } } }
Start the flow with an HTTP API call
The orchestration is started using the StartNewAsync method. The example starts this with a null for the instance ID so that a new ID is auto generated.
using System.Threading.Tasks; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Extensions.Http; using Microsoft.Extensions.Logging; using Microsoft.Azure.WebJobs.Extensions.DurableTask; using System.Net.Http; namespace MyAzureFunctions.Apis { public class BeginFlowWithHttpPost { [FunctionName(Constants.BeginFlowWithHttpPost)] public async Task<HttpResponseMessage> HttpStart( [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestMessage req, [DurableClient] IDurableOrchestrationClient starter, ILogger log) { string instanceId = await starter.StartNewAsync(Constants.MyOrchestration, null, "input data to start flow"); log.LogInformation($"Started orchestration with ID = '{instanceId}'."); return starter.CreateCheckStatusResponse(req, instanceId); } } }
When the API is requested, the flow starts and the durable function details are returned. This would need to be changed in a production app, security would be required, only a post request should be supported and properly logging, diagnostics would need to be supported.
The state of the flow can be viewed using the statusQueryGetUri link.
After the external HTTP is called, the flow continues. The state is then updated, and the end result can be viewed, used in any way.
Links:
https://docs.microsoft.com/en-us/azure/azure-functions/durable/
https://github.com/Azure/azure-functions-durable-extension
https://damienbod.com/2019/03/14/running-local-azure-functions-in-visual-studio-with-https/
Microsoft Azure Storage Explorer
Microsoft Azure Storage Emulator
[…] Using External Inputs in Azure Durable functions (Damien Bowden) […]
[…] Using External Inputs in Azure Durable functions — Software Engineering […]