Azure Durable Functions: Fan-Out/Fan-in Pattern

Share This Post

Introduction

When working with Azure Functions, one common scenario is importing a file for processing. In our specific case, we encounter an Excel file with thousands of lines that need to be processed, checking the accuracy of values using multiple rules and data sources, followed by sending out notifications. Initially, with a small number of lines, the processing time was acceptable. However, as the size of the Excel file grew, reaching tens of thousands of lines, the processing time became unbearable. In order to overcome this challenge, we will explore the capabilities of Durable Functions and leverage their power to address this issue effectively.

Fan-out/fan-in Pattern

The “fan-out/fan-in” pattern is a technique used to parallelize and consolidate the execution of multiple tasks. During the fan-out phase, tasks are initiated in parallel to perform different activities concurrently. Then, during the fan-in phase, the function waits for all the parallel tasks to complete and combines their results. This pattern allows for efficient parallel execution, reduces processing time, and enables the consolidation of data or results obtained from parallel tasks. It is a scalable approach that maximizes resource utilization and enhances the overall efficiency of Durable Azure Functions.

fan out fan in

In our particular scenario, we will utilize the fan out/fan in pattern to tackle the time-consuming data processing task. Instead of handling the entire process within a single function, we will distribute and parallelize the workload across multiple functions. By employing this approach, the processing time will be significantly reduced, especially as the number of entries in the file increases. Through the fan out/fan in pattern, we can efficiently divide and conquer the data processing task, achieving improved performance and overall efficiency.

Step 1: Uploading a file

The initial step in the data processing workflow involves uploading a file. This is achieved by triggering the main function through the BlobTrigger, which monitors the specified storage for new file additions. To initiate the Fan-out/Fan-in pattern, the main function will invoke the Orchestrator function. The reason for having separate functions is that each function can only have a single trigger. To pass multiple parameters to the Orchestrator function, a serializable object can be utilized. It’s worth noting that complex parameters, such as streams or intricate entities, may encounter issues during transmission. Additionally, ensure that the Functions are configured at 64 bits to handle large data volumes effectively. The Orchestrator function is triggered using the IDurableOrchestrationClient.StartNewAsync method.

 [FunctionName("ImportExcel")]
    public async Task ImportExcel(
        [DurableClient] IDurableOrchestrationClient orchestrationClient,
        [BlobTrigger("storage/{name}")]
        Stream file,
        string name,
        IDictionary<string, string> metaData,
        ILogger log)
    {
        var data = await ParseExcel(file);
        var dataToBeProcessed = new DataToBeProcessed { Data = data, FileName = name, Email = metaData["email"]};

        var id = await orchestrationClient.StartNewAsync("DataOrchestrator", dataToBeProcessed);
        log.LogInformation("End ImportExcel. Orchestrator instance id: {Id}", id);
    }

Step 2: Calling the Orchestrator

The next step involves starting the Orchestrator function, which is responsible for managing the state of the data processing workflow. Within the Orchestrator function, the large dataset is divided into smaller chunks, in our example with a size of 1000 entries per chunk. To process these chunks in parallel, a helper activity function is called using the activityTrigger binding. The ProcessData function is invoked as follows:

ProcessData([ActivityTrigger] PartialData inputs...)

the data would lead to increased processing time as the file size grew. By leveraging the fan-out/fan-in pattern, multiple functions run concurrently on multiple virtual machines, thereby enhancing performance. The activity helpers are invoked using the context.CallActivityAsync method.

The Orchestrator function will wait for all the parallel tasks to be completed using the await Task.WhenAll(parallelTasks) statement. Once all the responses are collected, the final helper activity function is called. In this case, the purpose of the final helper activity is to notify the user that the data has been successfully processed.

The orchestrator cannot contain any await calls outside the orchestration context. Any async call needs to be processed in the activity functions.

    [FunctionName("DataOrchestrator")]
    public async Task DataOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
    {
        var input = context.GetInput<DataToBeProcessed>();
        var data = input.Data;
        var email = input.MetaData["email"];

        var parallelTasks = new List<Task<List<ResponseDto>>>();

        var split2 = data.Models.Chunk(1000);
        foreach (var partialList in split2)
        {
            var task = context.CallActivityAsync<List<ResponseDto>>("ProcessData", partialList);
            parallelTasks.Add(task);
        }

        await Task.WhenAll(parallelTasks);

        var resultVendors = new List<ResponseDto>();
        foreach (var task in parallelTasks)
        {
            resultPslVendors.AddRange(task.Result.response);
        }

        var resultLists = new ResultLists
        {
            ResultVendors = resultVendors,
            FileName = metaData.FileName,
            Email = email
        };
        await context.CallActivityAsync("SendNotifications", resultLists);
    }

Step 3: Activity Function: Processing the Data

The core of the data processing task lies within the Activity Function. While in traditional approaches there would be a single function handling the entire dataset, in the fan-out/fan-in pattern, multiple functions work in parallel to process the data. This allows for efficient and speedy processing, especially when dealing with large datasets.

Each parallel function performs its assigned task independently, processing a subset of the data. By distributing the workload across multiple functions, the overall processing time is significantly reduced. This parallel processing capability is a key advantage of the fan-out/fan-in pattern, as it leverages the scalability and resources of the underlying Azure infrastructure.

The Activity Function plays a crucial role in efficiently processing the data by executing tasks in parallel and ensuring optimal utilization of resources. This approach not only improves performance but also enables better utilization of available computing power, resulting in faster and more efficient data processing.

   [FunctionName("ProcessData")]
    public async Task<List<ResponseDto>> ProcessData([ActivityTrigger] PartialData inputs, ILogger log)
    {
        return await ProcessExcelFile(inputs.Models, inputs.Token, inputs.Email);
    }

Step 4: Activity Function: Sending Notifications

Once all the parallel tasks in the fan-out/fan-in pattern are completed, the orchestrator function takes control and instructs another activity function to send notifications. This final step ensures that users are notified about the completion of the data processing task.

The activity function responsible for sending notifications is triggered by the orchestrator using the activityTrigger binding. It takes the necessary data and parameters to generate and send the appropriate notifications to the intended recipients.

By separating the notification-sending process into its own activity function, the overall system architecture becomes more modular and maintainable. It also allows for easier extensibility, as modifications or enhancements to the notification mechanism can be made independently without affecting the other components of the process.

[FunctionName("SendNotifications")]
    public async Task SendNotifications([ActivityTrigger] ResultLists inputs, ILogger log)
    {
        await SendNotifications(inputs.ResultPslVendors, inputs.Email);
    }

Checking logs

By examining the logs, you can track the initiation of the orchestrator function, the number of required activities (in this case, five), and the moment when notifications were dispatched.

2023-06-30T13:18:48Z   [Information]   Executed 'DataOrchestrator' (Succeeded, Id=892088e1-815d-422d-99c2-555187e2a20d, Duration=2170ms)
2023-06-30T13:27:18Z   [Information]   Executing 'DataOrchestrator' (Reason='(null)', Id=5cd3b96a-a36c-49ac-9acf-378f89ea548b)
2023-06-30T13:27:18Z   [Information]   af0e4d7735e54208bb65274993f2fc85: Function 'DataOrchestrator (Orchestrator)' started. IsReplay: False. Input: (22171712 bytes). State: Started. SequenceNumber: 79. TaskEventId: -1
2023-06-30T13:27:18Z   [Information]   af0e4d7735e54208bb65274993f2fc85: Function 'ProcessData (Activity)' scheduled. Reason: DataOrchestrator. IsReplay: False. State: Scheduled. SequenceNumber: 80.
2023-06-30T13:27:18Z   [Information]   af0e4d7735e54208bb65274993f2fc85: Function 'ProcessData (Activity)' scheduled. Reason: DataOrchestrator. IsReplay: False. State: Scheduled. SequenceNumber: 81.
2023-06-30T13:27:18Z   [Information]   af0e4d7735e54208bb65274993f2fc85: Function 'ProcessData (Activity)' scheduled. Reason: DataOrchestrator. IsReplay: False. State: Scheduled. SequenceNumber: 82.
2023-06-30T13:27:18Z   [Information]   af0e4d7735e54208bb65274993f2fc85: Function 'ProcessData (Activity)' scheduled. Reason: DataOrchestrator. IsReplay: False. State: Scheduled. SequenceNumber: 83.
2023-06-30T13:27:18Z   [Information]   af0e4d7735e54208bb65274993f2fc85: Function 'ProcessData (Activity)' scheduled. Reason: DataOrchestrator. IsReplay: False. State: Scheduled. SequenceNumber: 84.
2023-06-30T13:28:46Z   [Information]   Executed ‘DataOrchestrator' (Succeeded, Id=27198a53-60c2-41e1-9c8e-8a20abd8180d, Duration=1092ms)
CTA Software

Conclusion

The implementation of the fan-out/fan-in pattern in the durable Azure functions brought about a significant improvement in processing time. By splitting the data into multiple parallel tasks and leveraging the power of distributed computing, the time-consuming process was optimized. This transition from a single function to multiple functions working in parallel showcases the effectiveness of this pattern in enhancing overall performance and efficiency.

Author

  • Oriol Saludes

    Experienced Full Stack Engineer with a demonstrated history of working in the information technology and services industry. Skilled in PHP, Spring Boot, Java, Kotlin, Domain-Driven Design (DDD), TDD and Front-end Development. Strong engineering professional with a Engineer's degree focused in Computer Engineering from Universitat Oberta de Catalunya (UOC).

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

Subscribe To Our Newsletter

Get updates from our latest tech findings

Have a challenging project?

We Can Work On It Together

apiumhub software development projects barcelona
Secured By miniOrange