Optimizing Service Bus message processing concurrency using Logic apps Stateless flow.
Published Dec 12 2023 07:11 AM 2,991 Views

In this article I will show how to utilize host configuration for Service bus trigger in logic app Standard, which is doing autocomplete,

these are function app setting and since Logic app standard service bus trigger is built on top of function app it will be used here . 

 

the peek lock trigger is not covered in this article.

 

Mohammed_Barqawi_2-1701955288552.png

 

The difference between stateful and stateless

There is architecture difference between Stateful and stateless flow, the below diagram explains that. 

 

Stateless flow


SB article.png

Stateful flow
SB article.png

The main difference is that the stateless mode is designed to work on a single process on a single worker. The benefit of this behavior in terms of service bus autocomplete trigger is to ensure that all messages are processed successfully and if there is a failure, the whole batch will be sent back to the queue.

 

How the test case was designed 

in the host.json I used setting  for MessageBatchSize and to study how it will affect the message handling 

I used json like the below :

 

 

 

 

 

 

 

{
  "version": "2.0",
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle.Workflows",
    "version": "[1.*, 2.0.0)"
  },
  "extensions": {
    "serviceBus": {
      "maxMessageBatchSize": 10,
      "minMessageBatchSize": 5
    }
  }
}

 

 

 

 

 

I used the same flow in both stateful and stateless scenarios.

 

Mohammed_Barqawi_0-1701960257347.png

 

In real-life scenarios, the Delay HTTP call will be the actual stateful flow that will process the messages.

I did not use the "Call Logic App action" because it is not called synchronously in stateless flows.

Another benefit of calling the message processor as HTTP is to distribute the load between apps, so the message handler app could have different scaling settings than the message processor.

 

It is also important to orchestrate between the Service Bus lock duration and the expected execution time of one run.

 

Mohammed_Barqawi_4-1702039209239.png

 

 

Test Cases 

beside the natural of the flow (stateless/ stateful) I consider testing the flow with and without session enabled 

 

1-Stateless without session enabled.

Full image Full image 

Mohammed_Barqawi_7-1702388476597.jpeg

 The diagram shows that logic app batches are sometimes processed in parallel but not the same worker.

2-Stateless with one Session enabled

Full image 

Mohammed_Barqawi_8-1702388477230.jpeg

 No overlaps in batches execution even there is multiple workers handling Runs.

3-Stateless with multiple sessions maxMessageBatchSize=1

Full image 

Mohammed_Barqawi_9-1702388477271.jpeg

 there were 2 sessions in the queue and there is parallel handling for messages but with different sessions. 

4-Stateless without Session maxMessageBatchSize=1 and Maximum Burst is one

Full image 

Mohammed_Barqawi_10-1702388477162.jpeg

 All the message handling is done by one worker.

5-Stateful with session enabled.

Full image 

Mohammed_Barqawi_11-1702388477250.jpeg

 There are overlaps, so the next run will not wait for the previous run to complete.

 

6- Stateful without session.

Full image 

Mohammed_Barqawi_12-1702388477285.jpeg

 There are overlaps, so the next run will not wait for the previous run to complete. 


Below the logic app code that I used for testing 

 

 

 

{
    "definition": {
        "$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#",
        "actions": {
            "HTTP": {
                "inputs": {
                    "method": "GET",
                    "retryPolicy": {
                        "type": "none"
                    },
                    "uri": "****"
                },
                "operationOptions": "DisableAsyncPattern",
                "runAfter": {
                    "Initialize_COMPUTER_NAME": [
                        "SUCCEEDED"
                    ]
                },
                "type": "Http"
            },
            "Initialize_COMPUTER_NAME": {
                "inputs": {
                    "variables": [
                        {
                            "name": "COMPUTERNAME",
                            "type": "string",
                            "value": "@{appsetting('COMPUTERNAME')}"
                        }
                    ]
                },
                "runAfter": {
                    "Initialize_count": [
                        "SUCCEEDED"
                    ]
                },
                "type": "InitializeVariable"
            },
            "Initialize_count": {
                "inputs": {
                    "variables": [
                        {
                            "name": "countt",
                            "type": "integer",
                            "value": "@length(triggerBody())"
                        }
                    ]
                },
                "runAfter": {
                    "Initialize_sessionID": [
                        "SUCCEEDED"
                    ]
                },
                "type": "InitializeVariable"
            },
            "Initialize_sessionID": {
                "inputs": {
                    "variables": [
                        {
                            "name": "sessionID",
                            "type": "string",
                            "value": "@{triggerBody()?[0]?['sessionId']}"
                        }
                    ]
                },
                "runAfter": {},
                "type": "InitializeVariable"
            },
            "Insert_or_Update_Entity": {
                "inputs": {
                    "parameters": {
                        "entity": {
                            "COMPUTERNAME": "@{variables('COMPUTERNAME')}",
                            "MessageCount": "@{variables('countt')}",
                            "PartitionKey": "name",
                            "RowKey": "@{workflow()?['run']?['name']}",
                            "Session": "@{variables('sessionID')}",
                            "end": "@{utcNow()}",
                            "kind": "Stateless",
                            "triggerStart": "@{trigger()['startTime']}"
                        },
                        "tableName": "LASBlog"
                    },
                    "serviceProviderConfiguration": {
                        "connectionName": "azureTables",
                        "operationId": "upsertEntity",
                        "serviceProviderId": "/serviceProviders/azureTables"
                    }
                },
                "runAfter": {
                    "HTTP": [
                        "SUCCEEDED"
                    ]
                },
                "type": "ServiceProvider"
            }
        },
        "contentVersion": "1.0.0.0",
        "outputs": {},
        "triggers": {
            "When_messages_are_available_in_a_queue": {
                "inputs": {
                    "parameters": {
                        "isSessionsEnabled": true,
                        "queueName": "session"
                    },
                    "serviceProviderConfiguration": {
                        "connectionName": "serviceBus",
                        "operationId": "receiveQueueMessages",
                        "serviceProviderId": "/serviceProviders/serviceBus"
                    }
                },
                "type": "ServiceProvider"
            }
        }
    },
    "kind": "Stateless"
}

 

 

 

 

 

Co-Authors
Version history
Last update:
‎Dec 12 2023 05:44 AM
Updated by: