Rate limiting an Azure queue

In my previous post (“Singleton” Azure Functions) I offered a tip on how to set up an Azure function to serve as a system-wide “singleton”. One of the use cases I was looking for was to set up a service to act as a “broker between two Service Bus queues“. I will not go into details, suffice to say, there is a requirement to only allow, let’s say, 100 messages per second to pass through the queue.

As I am a strong proponent of “less talk, more code” so here is a rough version that gets the job done:

private static readonly SemaphoreSlim _semaphore = new new SemaphoreSlim(1, 1);
private static readonly QueueClient _outputQueueClient = new QueueClient("{connection string}");
private static double _nextDispatchAt;
private const double _rate = 100; // the rate!

/* Method creates a task scheduled to run approximately 
   at a given "time" */
private static Task MoveMessageToOutputQueue(Message message) 
{ 
    Contract.Assert(message != null);

    /* Get a lock to make sure _nextDispatchAt is modified 
       atomically. This is a lazy implementation but it does the job. */
    _semaphore.Wait(); 
    try 
    { 
        /* Calculate the next "click". */
        var now = DateTime.UtcNow.Ticks; 
        var delta = _nextDispatchAt - now; 
        var click = TimeSpan.TicksPerMillisecond * (1000 / _rate); 
        if (delta > 0) 
        { 
            /* We need to wait until the next dispatch. */ 
            var millisToWait = (_nextDispatchAt - now) / TimeSpan.TicksPerMillisecond; 
            _nextDispatchAt += click;

            /* If the number of ticks is below the millisecond 
               threshold, execute directly. */
            return millisToWait > 0 ?
                Task.Delay((int)millisToWait).ContinueWith(t => _outputQueueClient.SendAsync(message)) :
                _outputQueueClient.SendAsync(message);
        }
        else 
        {
            /* The execution is direct as there is no waiting time. */ 
            _nextDispatchAt = now + click; 
            return _outputQueueClient.SendAsync(message);
        } 
    } 
    finally 
    { 
        _semaphore.Release();
    }
}

/* This is the actual Azure function which is set up to listen on the "input-queue" */
[FunctionName(nameof(MoveMessageFromInputToOutput))]
public static async Task MoveMessageFromInputToOutput(
[ServiceBusTrigger("input-queue")] Microsoft.ServiceBus.Messaging.BrokeredMessage message, 
TraceWriter log)
{
    Contract.Assert(message != null);
    Contract.Assert(log != null);
    log.Debug($"Moving incoming message with label \"{message.Label}\" ...");

    /* Make a copy of the message (just the body and whatever fields are interesting). */
    Message outputMessage;
    using (var ms = new MemoryStream())
    {
        await message.GetBody<Stream>().CopyToAsync(ms).ConfigureAwait(false);
        outputMessage = new Message(ms.ToArray())
        {
            Label = message.Label,
            ContentType = message.ContentType
        };
    }

    await MoveMessageToOutputQueue(outputMessage).ConfigureAwait(false);
}

To make sure the higher rates are handled, and that the function does not starve, one can update the host.json as follows:

{
    "serviceBus": {
        "maxConcurrentCalls": 32, // Threads will mostly be awaiting
        "prefetchCount": 256 // The more the merrier
    }
}

After some testing it turns out that the rate limiter is almost perfect on “low rates” such as 100 messages/second. Due limitations of the Service Bus and other factors this code will not surpass ~600 messages/second. In such cases, multiple instances of the rate limiter should be deployed, and dividing the load between them.

You May Also Like

About the Author: Alexandru Ciobanu

Leave a Reply

Your email address will not be published.

This site uses Akismet to reduce spam. Learn how your comment data is processed.