Published on

Efficient Data Transfer With Azure Queue Storage

9 min read
Authors
  • avatar
    Name
    Ivan Gechev
    Twitter

In an era driven by data, the seamless and efficient transfer of information has become the essence of modern applications and services. No matter if you're a developer working on a globally distributed application or a service provider committed to delivering the best performance to your customers, data transfer efficiency is a vital concern.

Enter Azure Queue Storage, a powerful solution in Microsoft's Azure cloud ecosystem that empowers us to optimize data transfer within our applications and services. Azure Storage Queues offer a robust and scalable framework for asynchronous communication, enabling efficient data transfer without the constraints of synchronous interactions.

If you're ready to optimize your data transfer workflows and supercharge your cloud-based systems, let's delve into the world of Azure Queue Storage and discover how they can revolutionize the way you move data within your digital ecosystem.

Understanding Azure Storage Queues

Azure Queue Storage uses cloud-based queues to facilitate communication among various components of a distributed application. Within these queues, we can store messages produced by a sender component and subsequently processed by a receiver component. The presence of queues empowers our application to seamlessly expand its capacity in response to increased demands.

We can consider using Azure Queue Storage if our application needs to store a large number of messages (each message can be up to 64 KB), needs to be able to track message processing and requires server-side logs of all of the transactions executed against our queues.

Queues have a strict URL format:

https://<storage account name>.queue.core.windows.net/<queue name>

Depending on your choice of authentication, you will either need the full URL or just the queue name. In this post, I'll opt for a connection string plus the queue's name to authenticate.

Use Cases for Efficient Data Transfer

Let's imagine that we have a microservices application that has one service for placing orders and a service for shipping those orders. Those two need a way to communicate with each other and we will use the Azure Storage Queues for that purpose.

Prerequisites

Before we start, we need to install a single NuGet package.

We can use the .NET CLI:

CLI
dotnet add package Azure.Storage.Queues

Or the Package Manager inside Visual Studio:

PackageManager
Install-Package Azure.Storage.Queues

Once we have the package installed, we can move on to our services.

Creating the Queue Service

Let's create the interface for our Queue service:

IQueueService.cs
public interface IQueueService
{
    QueueClient QueueClient { get; }

    Task<SendReceipt> AddMessageAsync(
        string message,
        CancellationToken cancellationToken = default);

    Task<QueueMessage> ReceiveMessageAsync(
        CancellationToken cancellationToken = default);

    Task<bool> DeleteMessageAsync(
        QueueMessage message,
        CancellationToken cancellationToken = default);
}

We create the IQueueService interface and add one property called QueueClient that will hold the Azure Queue Client. Then we add three methods that we will use to add, retrieve and delete messages from the Azure Queue.

The AddMessageAsync() method takes in a string representing the message and returns a Task that holds an object of SendReceipt type. We move on to the ReceiveMessageAsync() method. It takes no parameters, except the default cancellation token, a QueueMessage object wrapped in a Task. And finally, we have the DeleteMessageAsync() method that takes in a QueueMessage object and returns a boolean representing whether the message was deleted successfully or not.

Next, we start implementing the QueueService itself:

QueueService.cs
public class QueueService : IQueueService
{
    public QueueClient QueueClient { get; }

    public QueueService()
    {
        QueueClient = new QueueClient(
            Environment.GetEnvironmentVariable("AZURE_STORAGE_CONNECTION_STRING"),
            Environment.GetEnvironmentVariable("AZURE_STORAGE_QUEUE_NAME"));
    }
}

In the constructor, we initialize the QueueClient property by creating a new QueueClient instance. To its constructor, we pass the connection string and the queue name, both stored as environment variables.

After this is done, we move to the AddMessageAsync() method:

QueueService.cs
public async Task<SendReceipt> AddMessageAsync(
   string message,
   CancellationToken cancellationToken = default)
{
    return await QueueClient.SendMessageAsync(
        messageText: message,
        visibilityTimeout: TimeSpan.FromSeconds(10),
        timeToLive: TimeSpan.FromDays(1),
        cancellationToken: cancellationToken);
}

In it, we await and return the result of the QueueClient's SendMessageAsync() method. The method itself has several overloads and we can only pass along the message which will be enough, but we opt for the one where we have the most control. The overload we choose first takes in the message, then the visibilityTimeout which represent how long the message will be hidden from the queue, the timeToLive represents how long the message will stay in the queue before it is deleted, and finally the cancellation token.

Next, we move on to retrieving messages:

QueueService.cs
public async Task<QueueMessage> ReceiveMessageAsync(
    CancellationToken cancellationToken = default)
{
    var response = await QueueClient
        .ReceiveMessageAsync(cancellationToken: cancellationToken);

    return response.Value;
}

To receive messages from the queue, we can use the ReceiveMessageAsync() method of the QueueClient. Then we return the response.Value which returns a QueueMessage object.

Finally, we implement the logic for deleting messages:

QueueService.cs
public async Task<bool> DeleteMessageAsync(
    QueueMessage message,
    CancellationToken cancellationToken = default)
{
    var result = await QueueClient
        .DeleteMessageAsync(
            message.MessageId,
            message.PopReceipt,
            cancellationToken);

    return !result.IsError;
}

This method takes in QueueMessage object which we use to extract vital parameters for queue message deletion. Inside the method, we call the DeleteMessageAsync() method of the QueueClient. To it, we pass the MessageId and the PopReceipt from the message object, as well as the cancellation token. In the end, we use ! in combination with the result's IsError property - if it is true, meaning an error occurred and the message was not deleted, we will return false.

Utilizing the Queue Service

After our QueueService is done, we move on to the OrderService:

OrderService.cs
public class OrderService : IOrderService
{
    private readonly IQueueService _queueService;

    public OrderService(IQueueService queueService)
    {
        ArgumentNullException.ThrowIfNull(queueService, nameof(queueService));

        _queueService = queueService;
    }

    public async Task PlaceOrderAsync(
        Order order,
        CancellationToken cancellationToken = default)
    {
        var json = JsonSerializer.Serialize(order);

        await _queueService
            .AddMessageAsync(
                json,
                cancellationToken);
    }
}

We create a very simple OrderService class. It has a constructor that takes in an IQueueService object and one method called PlaceOrderAsync(). The method itself takes in an object of type Order (this can be any piece of data you want to transfer between services). We also serialize the order and pass it to the AddMessageAsync() of our injected IQueueService instance.

Posting our message is wrapped up so we move to interacting with it:

ShippingService.cs
public class ShippingService : IShippingService
{
    private readonly IQueueService _queueService;

    public ShippingService(IQueueService queueService)
    {
        ArgumentNullException.ThrowIfNull(queueService, nameof(queueService));

        _queueService = queueService;
    }

    public async Task GetOrderForShippingAsync(
        CancellationToken cancellationToken = default)
    {
        var orderMessage = await _queueService
            .ReceiveMessageAsync(cancellationToken);

        var order = JsonSerializer.Deserialize<Order>(orderMessage.Body);

        Console.WriteLine($"Order with ID: {order?.Id} has been shipped!");

        await _queueService
            .DeleteMessageAsync(
                orderMessage,
                cancellationToken);
    }
}

We create the ShippingService class that has an identical constructor to our OrderService. It also has the parameterless GetOrderForShippingAsync() method. Inside it, we get the first message in our Azure Queue by using the ReceiveMessageAsync() method. Then we deserialize the Body of the message, print a line on the console signifying that we have shopped the order and finally call the DeleteMessageAsync() method to delete the processed message from the queue.

Conclusion

Azure Queue Storage is a formidable component of Microsoft's Azure cloud ecosystem, empowering us to optimize data transfer within our applications and services. With Azure Storage Queues, we gain access to a robust and scalable framework for asynchronous communication, liberating data transfer from the shackles of synchronous interactions. If you're ready to supercharge your cloud-based systems and streamline your data transfer workflows, you should definitely consider using Azure Queue Storage.