/ Azure

Service Bus and large files - the easy way

Azure Service Bus has a message size limit of 256 KB for the standard offering, and 1 MB for the premium tier (the quotas can be found here). While this may seem small, that is actually quite large for something that is designed to be a messaging system. Occasionally however, you may need to send a larger file as part of the message.

I am currently working on a project that uses Azure Service Fabric in combination with Service Bus. We use Service Bus as a buffer for message ingestion and processing, similar to the recomendations here, from the Azure Customer Advisory Team. This architecture works well, as it allows us to buffer incoming requests, providing elasticity during high traffic times.

One problem that we have had with this design is the limitation of message size in Service Bus. At times, we have large datasets to pass between services, so 1 MB isn't quite enough room for us. So, we turned to the design pattern of Claim Check (mentioned about 2/3 of the way down on the page) to help us out. While this is a common pattern for this scenario, it can be a daunting task to implement for those who don't have too much experience in the area.

Having worked with Sean Feldman before (not this guy, but this guy), I knew of a library he had created called ServiceBus.AttachmentPlugin. This library implements claim check in a very simple, and easy to understand way. The library also takes advantage of a feature in the new Service Bus .NET Standard Library; Plugin support!

To get a better idea of what this pattern looks like, and how to implement it using the ServiceBus.AttachmentPlugin, lets write our own sample. We will do so using .NET Core, the command prompt, and Visual Studio Code (so you can do this on any platform).

  1. Start with a new .NET Core console application.
mkdir SbStoragePlugin
cd SbStoragePlugin
dotnet new console
dotnet add package ServiceBus.AttachmentPlugin
code .
  1. Optional: Add the following to the .csproj file, under the first PropertyGroup node:
<LangVersion>latest</LangVersion>

It should then look like this:

  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>netcoreapp2.0</TargetFramework>
    <LangVersion>latest</LangVersion>
  </PropertyGroup>

I'm adding the above because I want to use one of C#'s latest features; Async Mains.

  1. Create a Service Bus namespace if you don't already have one.

Since the Main method is now async, I'm just going to throw all of our code in there, for the sake of brevity.

  1. Start by creating a QueueClient, just as you would normally, and then create an AzureStorageAttachmentConfiguration object.
var storageConfiguration = new AzureStorageAttachmentConfiguration(StorageAccountConnectionString);
var queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
  1. Register the AzureStorageAttachmentConfiguration with the QueueClient.
queueClient.RegisterAzureStorageAttachmentPlugin(storageConfiguration);
  1. Then we just use Service Bus as we normally would, except without worrying about messages over 256 KB. Here is what my full solution looks like:
using System;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using ServiceBus.AttachmentPlugin;

namespace SbStoragePlugin
{
    public class Program
    {
        public const string QueueName = "queue1";
        public const string ServiceBusConnectionString = "{SB_CONNECTION_STRING}";
        public const string StorageAccountConnectionString = "{STORAGE_CONNECTION_STRING}";

        public static async Task Main(string[] args)
        {
            var storageConfiguration = new AzureStorageAttachmentConfiguration(StorageAccountConnectionString, "claimcheck");
            var queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
            queueClient.RegisterAzureStorageAttachmentPlugin(storageConfiguration);

            Task OnMessageHandlerExceptions(ExceptionReceivedEventArgs exceptionArgs)
            {
                Console.WriteLine($"Exception recieved in the message handler: {exceptionArgs.Exception.Message}");
                return Task.CompletedTask;
            }

            var messageHandlerOptions = new MessageHandlerOptions(OnMessageHandlerExceptions)
            {
                AutoComplete = false
            };

            queueClient.RegisterMessageHandler(async (message, token) =>
            {
                Console.WriteLine($"Received message: {message.MessageId}");
                await queueClient.CompleteAsync(message.SystemProperties.LockToken);
            }, messageHandlerOptions);

            var random = new Random();
            for (var i = 0; i < 10; i++)
            {
                var messageBytes = new byte[300 * 1024];
                random.NextBytes(messageBytes);

                var message = new Message(messageBytes)
                {
                    MessageId = Guid.NewGuid().ToString()
                };

                await queueClient.SendAsync(message);
                Console.WriteLine($"Sent message: {message.MessageId}");
            }

            Console.ReadKey();
            await queueClient.CloseAsync();
        }
    }
}

One thing that you will have to keep in mind with this plugin, is that you will need some sort of process to clean up stale blobs. This isn't something that is built into the library because every Service Bus application has its own usage pattern which will dictate how long files should stick around for. One solution for this, might be an Azure Function on a scheduled timer.

Also, there are some options that you can set in this library. The most interesting one for me, is that you can provide a Func<Message, bool> (takes a message as a parameter and returns a bool) to the AzureStorageAttachmentConfiguration in order to calculate your own message size. The Func returns a bool, which simply determines whether or not the message should be claim-checked. This is especially useful in case you use both Standard and Premium Service Bus messaging, considering the different message sizes. For example, you could do something like the following to detemine the maximum message size allowed, based off Environment Variables:

Task<bool> ShouldUseStorage(Message message)
{
    var currentEnvironment = Environment.GetEnvironmentVariable("CurrentEnvironment") ?? "DEV";
    var isProd = currentEnvironment.Equals("PROD", StringComparison.InvariantCultureIgnoreCase);
    
    if (isProd)
    {
        // If we are in production and using Premium messaging, we should allow a message to be up to 1000 KB before using claim check.
        return message.Body.Length > 1000 * 1024;
    }
    // Otherwise, we let a message up to 200 KB go through Service Bus without using claim check.
    return message.Body.Length > 200 * 1024;
}

var config = AzureStorageAttachmentConfiguration(
				connectionString,
				Settings.ServiceBus.StorageContainerForAttachments,
				Settings.ServiceBus.MessagePropertyWithAttachments,
				ShouldUseStorage);

So anyway, I really like the library, and I know Sean would appreciate any feedback that anyone has. Let me know if there are any questions, or if I can dig deeper into any part of this.

-- John