In this article, we are going to demonstrate how the microservices architecture handles a long-running task execution. If you want to recap a similar task execution in a monolithic application, you can refer to this previous article.

To download the source code for this article, you can visit our GitHub repository.

Let’s dive into it.

Support Code Maze on Patreon to get rid of ads and get the best discounts on our products!
Become a patron at Patreon!

Microservices Implementation of the Checkout Process

Let’s rearchitect the shopping cart monolithic application and decompose it into multiple microservices based on business capability.

As a result, we now have five microservices catering to the checkout process:

  • ShoppingCartApi – Exposes the /checkout endpoint to UI client and triggers checkout process
  • StockValidatorService – Validates the availability of stocks for all the line items
  • TaxCalculatorService – Calculates the tax based online items and customer address
  • PaymentProcessingService – Processes the payment based on the credit card details, line items, and calculated tax
  • ReceptGeneratorService – Generates and saves the receipt for the purchase. Also, responsible for email communication with the customer

For this demo, the microservices do not contain any business logic. They simulate a process flow that may take a few seconds.

How Microservices Handle a Long-Running Task

In a normal scenario, the ShoppingCartApi would have communicated using a synchronous pattern like REST over HTTP with the StockValidatorService and so on. However, in that case, the end-user had to wait for a response till the ReceptGeneratorService would have generated a receipt or returned an error response. This would have resulted in the endpoint taking more than 6s to complete the request. So, this is not an intuitive design from the user experience perspective.

Here, our microservices solve this problem by communicating using an asynchronous communication pattern between them by utilizing RabbitMq queues for the long-running checkout process:

microservices architecture

In our demo application, the /checkout endpoint returns a status code of OK(200). The ReceptGeneratorService sends an email with the order status to the end-user after completion of the order processing. However, we can design the /checkout endpoint to return an HTTP status code Accepted(202) instead of OK(200) with another endpoint where the result will be available at the end of processing. The client application then may choose to call this endpoint till a response is available.

The Checkout Process Initiation

The ShoppingCartApi has a /checkout endpoint similar to the implementation we’ve already introduced. It initiates the long-running tasks related to the checkout process.

Now, let’s see how the CheckoutProcessor class implements the ProcessCheckoutAsync() method:

public class CheckoutProcessor : ICheckoutProcessor
{
    public BlockingCollection<CheckoutItem> CheckoutQueue { get; }

    public CheckoutProcessor()
    {
        CheckoutQueue = new BlockingCollection<CheckoutItem>(new ConcurrentQueue<CheckoutItem>());
    }

    public Task<CheckoutResponse> ProcessCheckoutAsync(CheckoutRequest request)
    {
        var response = new CheckoutResponse
        {
            OrderId = Guid.NewGuid(),
            OrderStatus = OrderStatus.Inprogress,
            Message = "Your order is in progress," 
                         + "you will receive an email with all details."
        };

        var item = new CheckoutItem
        {
           OrderId = response.OrderId,
           Request = request
        };

        CheckoutQueue.Add(item);

        return Task.FromResult(response);
    }
}

Just like in the monolithic implementation, we create an instance of the CheckoutResponse class with order status Inprogress, a relevant message, and, a new order id. The end-user receives this response.

Before returning the response to the end-user, the code creates an instance of a CheckoutItem class with the generated order id and other request properties. Now, a blocking collection enqueues this instance. The BlockingCollection<CheckoutItem> is exposed as a public property of the CheckoutProcessor class.

The CheckoutItem instance represents the message body for communication with the other microservices:

public class CheckoutItem
{
   public Guid OrderId { get; set; }

   public CheckoutRequest Request { get; set; }
}

The CheckoutBackroundWorker inherits from .NET’s BackgroundService class and sends the message to a RabbitMq message queue for further processing:

public CheckoutBackgroundWorker(IMessageProducer<CheckoutItem> producer,
                                ICheckoutProcessor checkoutProcessor)
{
    _producer = producer;
    _checkoutProcessor = checkoutProcessor;
           
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    await Task.Factory.StartNew(() =>  Process(stoppingToken), TaskCreationOptions.LongRunning);  
}

private void Process(CancellationToken stoppingToken)
{
    foreach (var item in _checkoutProcessor.CheckoutQueue.GetConsumingEnumerable(stoppingToken))
    {
        _producer.SendMessage(item);
    }
}

Here, we use the BlockingCollection.GetConsumingEnumerable on the CheckoutQueue property of the CheckoutProcessor class. Each item from the queue is published to a message queue using the CheckoutItemProducer class.

You may wonder why a BlockingCollection is used here instead of directly publishing the message on invocation of the /checkout endpoint. The publishing of the message to the queue blocks the response to the caller. To be more precise, the end-user does not receive a response till the code publishes the message to the queue. There is no need for this blocking in this scenario as the message publishing task can happen in parallel while the end-user receives the response.

The same can also be achieved using System.Threading.Channel.

The CheckoutItemProducer class implements the ProducerBase abstract class and the IMessageProducer interfaces.

Asynchronous Communication Between Microservices

This business case dictates that the checkout processes like stock validation, payment processing, etc. can run asynchronously. Hence, we’ll be using the asynchronous mode of communication between the microservices using the RabbitMq message broker.

The ShoppingCart Web Api

Invoking the /checkout endpoint triggers the checkout process. Further communication with the other microservices takes place using message queues:

public class CheckoutItemProducer : ProducerBase<CheckoutItem>
{
    public CheckoutItemProducer(ConnectionFactory connectionFactory,
                                ILogger<RabbitMqClientBase> logger,
                                ILogger<ProducerBase<CheckoutItem>> producerBaseLogger) 
        : base(connectionFactory, logger, producerBaseLogger)
    {
    }

    protected override string QueueName => "stock-validator";
}  

 First, the ShoppingCartApi sends the CheckoutItem message to the stock-validator queue using the CheckoutItemProducer class.

The Stock Validator Service Functionality

The StockValidatorService consumes the message from the stock-validator queue and executes stock validation logic:

public class StockValidatorConsumer : ConsumerBase, IHostedService
{
    private readonly IMessageProducer<Failure> _failureProducer;
    private readonly IMessageProducer<CheckoutItem> _successProducer;
    private readonly IValidator _validator;
    private readonly ILogger<StockValidatorConsumer> _logger;

    public StockValidatorConsumer(ConnectionFactory connectionFactory, 
                                  ILogger<RabbitMqClientBase> rabbitMqClientBaseLogger,
                                  ILogger<ConsumerBase> consumerBaseLogger,
                                  IMessageProducer<Failure> failureProducer,
                                  IMessageProducer<CheckoutItem> successProducer,
                                  IValidator validator,
                                  ILogger<StockValidatorConsumer> logger)
        : base(connectionFactory, rabbitMqClientBaseLogger, consumerBaseLogger)
    {
        _failureProducer = failureProducer;
        _successProducer = successProducer;
        _validator = validator;
        _logger = logger;
    }

    protected override async Task OnMessageReceived(object? sender, BasicDeliverEventArgs @event)
    {
        try
        {
            var body = Encoding.UTF8.GetString(@event.Body.ToArray());
            var checkoutItem = JsonConvert.DeserializeObject<CheckoutItem>(body);

            if (await _validator.ValidateAsync(checkoutItem.Request.LineItems))
            {
                _successProducer.SendMessage(checkoutItem);
                return;
            }

            var failureMessage = new Failure
            {
                CustomerId = checkoutItem.Request.CustomerId,
                OrderId = checkoutItem.OrderId,
                Message = "Item not available in stock",
                Source = typeof(Program).Assembly.GetName().Name ?? string.Empty
            };

            _failureProducer.SendMessage(failureMessage);
        }
        catch (Exception ex)
        {
            _logger.LogCritical(ex, "Error while retrieving message from queue.");
        }
        finally
        {
            Channel.BasicAck(@event.DeliveryTag, false);
        }
    }

    protected override string QueueName => "stock-validator";
    ...
    ...
        
}

The StockValidatorConsumer class inherits from the ConsumerBase and is registered as a concrete implementation for the IHostedService interface.

builder.Services.AddHostedService<StockValidatorConsumer>();

This ensures that the StockValidatorService application executes the ConsumerBase constructor code at the application startup. The constructor code contains message subscription logic.

All microservices in this demo use the same strategy for all background workers that implement the ConsumerBase.

So, in case of successful stock validation we publish the CheckoutItem message to the stock-validation-successful queue:

public class StockValidationSuccessfulProducer : ProducerBase<CheckoutItem>
{        
    public StockValidationSuccessfulProducer(ConnectionFactory connectionFactory,
                                             ILogger<RabbitMqClientBase> logger,
                                             ILogger<ProducerBase<CheckoutItem>> producerBaseLogger) 
        : base(connectionFactory, logger, producerBaseLogger)
    {
    }

    protected override string QueueName => "stock-validation-successful";
}

The StockValidationSuccessfulProducer inherits the ProducerBase abstract class. This class contains the logic for publishing the messages. All microservices in this demo use the same strategy for message publishing by implementing the ProducerBase class.

But, on failure, we publish the Failure message to the stock-validation-failure queue:

public class Failure
{
    public Guid OrderId { get; set; }
        
    public Guid CustomerId { get; set; }

    public string Message { get; set; }

    public string Source { get; set; }
}

public class StockValidationFailureProducer : ProducerBase<Failure>
{
    public StockValidationFailureProducer(ConnectionFactory connectionFactory, 
                                          ILogger<RabbitMqClientBase> logger, 
                                          ILogger<ProducerBase<Failure>> producerBaseLogger)
            : base(connectionFactory, logger, producerBaseLogger)
    {
    }

    protected override string QueueName => "stock-validation-failure";
}

The StockValidationFailureProducer class is responsible for publishing the failure message.

The Tax Calculator Service Functionality

The TaxCalculatorService contains a background worker class StockValidationSuccessConsumer that is the consumer for the stock-validation-successful queue messages:

protected override async Task OnMessageReceived(object? sender, BasicDeliverEventArgs @event)
{
    try
    {
        var body = Encoding.UTF8.GetString(@event.Body.ToArray());
        var checkoutItem = JsonConvert.DeserializeObject<CheckoutItem>(body);

        var tax = await _calculator.CalculateTaxAsync(checkoutItem.Request.CustomerId, 
                                                      checkoutItem.Request.LineItems);

        var taxMessage = new Tax
        {
            OrderId = checkoutItem.OrderI
            Request = checkoutItem.Request,
            TaxAmount = tax
        };

        _taxProducer.SendMessage(taxMessage);
               
     }
     catch (Exception ex)
     {
        _logger.LogCritical(ex, "Error while retrieving message from queue.");
     }
     finally
     {
        Channel.BasicAck(@event.DeliveryTag, false);
     }
}
 
protected override string QueueName => "stock-validation-successful";

On receiving a message it calculates the tax and publishes the Tax message containing the tax amount to the tax queue:

public class Tax : CheckoutItem
{
   public int TaxAmount { get; set; }
}

public class TaxProducer : ProducerBase<Tax>
{
    public TaxProducer(ConnectionFactory connectionFactory, 
                       ILogger<RabbitMqClientBase> logger, 
                       ILogger<ProducerBase<Tax>> producerBaseLogger) 
        : base(connectionFactory, logger, producerBaseLogger)
    {
    }

    protected override string QueueName => "tax";       
}

Here, the Tax message class derives from CheckoutItem and has an additional property TaxAmount. The TaxProducer class is responsible for publishing this message to the queue. 

The Payment Processing Service Functionality

The PaymentProcessingService listens for messages from the tax queue using the background worker TaxConsumer class. On arrival of a message, it consumes it, calculates the total amount, and tries to process the payment:

protected override async Task OnMessageReceived(object? sender, BasicDeliverEventArgs @event)
{
    try
    {
        var body = Encoding.UTF8.GetString(@event.Body.ToArray());
        var taxMessage = JsonConvert.DeserializeObject<Tax>(body);
               
        var amount = taxMessage.Request.LineItems.Sum(li => li.Quantity * li.Price) + taxMessage.TaxAmount;
                
        if (await _paymentProcessor.ProcessAsync(taxMessage.Request.CustomerId, 
                                                 taxMessage.Request.PaymentInfo, 
                                                 amount))
        {
            var paymentSuccessMessage = new PaymentSuccess
            {
                 OrderId = taxMessage.OrderId,
                 Request = taxMessage.Request,
                 Amount = amount
            };
            _successProducer.SendMessage(paymentSuccessMessage);
            return;
         }

         var failureMessage = new Failure
         {
            CustomerId = taxMessage.Request.CustomerId,
            OrderId = taxMessage.OrderId,
            Message = "Payment failure",
            Source = typeof(Program).Assembly.GetName().Name ?? string.Empty
         };

         _failureProducer.SendMessage(failureMessage);
     }
     catch (Exception ex)
     {
         _logger.LogCritical(ex, "Error while retrieving message from queue.");
     }
     finally
     {
          Channel.BasicAck(@event.DeliveryTag, false);
     }
}
 
protected override string QueueName => "tax";

Now, in case of successful payment processing, we publish the PaymentSuccess message containing the amount to the payment-successful queue:

public class PaymentSuccess : CheckoutItem
{
    public int Amount { get; set; }
}

public class PaymentSuccessfulProducer : ProducerBase<PaymentSuccess>
{       
    public PaymentSuccessfulProducer(ConnectionFactory connectionFactory,
                                     ILogger<RabbitMqClientBase> logger,
                                     ILogger<ProducerBase<PaymentSuccess>> producerBaseLogger
            : base(connectionFactory, logger, producerBaseLogger)
    {
    }

    protected override string QueueName => "payment-successful";
}

The PaymentSuccess message class inherits from CheckoutItem and has an additional property Amount. The PaymentSuccessfulProducer publishes the payment successful message to the assigned queue.

However, on payment failure, we publish the Failure message  to the payment-failure queue:

public class PaymentFailureProducer : ProducerBase<Failure>
{
    public PaymentFailureProducer(ConnectionFactory connectionFactory,
                                  ILogger<RabbitMqClientBase> logger,
                                  ILogger<ProducerBase<Failure>> producerBaseLogger)
        : base(connectionFactory, logger, producerBaseLogger)
    {
    }

    protected override string QueueName => "payment-failure";
}

The PaymentFailureProducer class publishes the failure message to the payment-failure queue.

The Receipt Generator Service Functionality

The ReceiptGeneratorService is a consumer to three queues. They are, stock-validation-failure, payment-failure, and payment-successful:

protected override async Task OnMessageReceived(object? sender, BasicDeliverEventArgs @event)
{
    try
    {
        var body = Encoding.UTF8.GetString(@event.Body.ToArray());
        var failureMessage = JsonConvert.DeserializeObject<Failure>(body);

        await _generator.ProcessFailuresAsync(failureMessage.CustomerId, 
                                              failureMessage.OrderId, 
                                              failureMessage.Message);               
    }
    catch (Exception ex)
    {
       _logger.LogCritical(ex, "Error while retrieving message from queue.");
    }
    finally
    {
       Channel.BasicAck(@event.DeliveryTag, false);
    }
}

So, this is the common code for failure scenarios implemented by the PaymentFailureConsumer and StockValidationFailureConsumer classes. In this case, the ProcessFailuresAsync() method from ReceiptGenerator class is executed. This method simulates storing the reason for failure and sending an email to the user informing the status of the order.

Let’s explore the PaymentSuccessfulConsumer class now:

protected override async Task OnMessageReceived(object? sender, BasicDeliverEventArgs @event)
{
    try
    {
         var body = Encoding.UTF8.GetString(@event.Body.ToArray());
         var successMessage = JsonConvert.DeserializeObject<PaymentSuccess>(body);

         await _generator.GenerateAsync(successMessage.Request.CustomerId, 
                                        successMessage.OrderId, 
                                        successMessage.Amount);

    }
    catch (Exception ex)
    {
        _logger.LogCritical(ex, "Error while retrieving message from queue.");
    }
    finally
    {
        Channel.BasicAck(@event.DeliveryTag, false);
    }
}

protected override string QueueName => "payment-successful";

At last, if the payment is successful, we execute the GenerateAsync() method from the ReciptGenerator class. This method simulates the generation and storing of the receipt and also sends an email with the necessary details to the user.

The producer and consumer classes in the microservices derive from respective base classes which abstract most of the RabbitMq interaction. Now, let’s explore them in detail.

Using RabbitMq for Asynchronous Message Processing

We are using the NuGet package RabbitMQ.Client for interfacing with RabbitMq:

PM> Install-Package RabbitMQ.Client -Version 6.2.4

All the microservices implement the RabbitMq producer and consumer code. To avoid code duplication, we have created some base classes.

The RabbitMqClientBase class encapsulates the logic for connecting to the RabbitMq server:

protected RabbitMqClientBase(ConnectionFactory connectionFactory, 
                             ILogger<RabbitMqClientBase> logger)
{
    _connectionFactory = connectionFactor
    _logger = logger
    Connect();
}

private void Connect()
{
    if (_connection == null || _connection.IsOpen == false)
    {
         _connection = _connectionFactory.CreateConnection();
    }

    if (Channel == null || Channel.IsOpen == false)
    {
         Channel = _connection.CreateModel();                
         Channel.QueueDeclare(queue: QueueName, durable: false, exclusive: false, autoDelete: false);                
    }
}

The constructor invokes the Connect() method and it checks if the connection is already established. If it is not, it creates a connection. This method also creates the queue if it is already not present in the RabbitMq server.

We register the ConnectionFactory as a singleton in the DI container and inject it via constructor to the RabbitMqClientBase class:

builder.Services.AddSingleton(sp =>
{
    var uri = new Uri("URL FOR RABBITMQ SERVER");
    return new ConnectionFactory
    {
        Uri = uri
    };
});

Replace the placeholder RabbitMq server URL with an original endpoint depending on the RabbitMq server/cluster installation.

The ProducerBase Code

Now, Let’s find out how we implement the ProducerBase class that is inherited by all message producers.

public interface IMessageProducer<in T>
{
    void SendMessage(T message);
}

public abstract class ProducerBase<T> : RabbitMqClientBase, IMessageProducer<T>
{
    private readonly ILogger<ProducerBase<T>> _logger;

    protected ProducerBase(ConnectionFactory connectionFactory,
                           ILogger<RabbitMqClientBase> logger,
                           ILogger<ProducerBase<T>> producerBaseLogger) 
       : base(connectionFactory, logger)
    {
        _logger = producerBaseLogger;
    }

    public virtual void SendMessage(T message)
    {
        try
        {
            var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
            var properties = Channel.CreateBasicProperties();                
            properties.ContentType = "application/json";
            properties.DeliveryMode = 1; // Doesn't persist to disk
            properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds());
            Channel.BasicPublish(exchange: "", routingKey: QueueName, body: body, basicProperties: properties);
         }
         catch (Exception ex)
         {
            _logger.LogCritical(ex, "Error while publishing");
         }
    }
}

The ProducerBase class inherits from the RabbitMqClientBase class. It ensures that the application establishes a connection with the RabbitMQ server before sending any messages.

This class also implements the SendMessage() method from the IMessageProducer interface. This method converts the message to a byte[] from POCO and sets some metadata properties for the message. Then, it publishes the message to the queue represented by the QueueName property from RabbitMqClientBase class.

As seen in the previous section, The producer classes in the microservices projects simply need to inherit from the ProducerBase class and override the QueueName property to publish to the intended queue.

The ConsumerBase Code

Let’s explore the ConsumerBase class that is inherited by all message consumers:

public abstract class ConsumerBase : RabbitMqClientBase
{ 
    protected ConsumerBase(ConnectionFactory connectionFactory,
                           ILogger<RabbitMqClientBase> logger,
                           ILogger<ConsumerBase> consumerLogger) 
       : base(connectionFactory, logger)
    {            
          
       try
       {
           var consumer = new AsyncEventingBasicConsumer(Channel);
           consumer.Received += OnMessageReceived;
           Channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
        }
        catch (Exception ex)
        {
           consumerLogger.LogCritical(ex, "Error while consuming message");
        }
   }
}
  

The ConsumerBase class inherits from the RabbitMqClientBase class as well. The constructor instantiates an AsyncEventingBasicConsumer instance and registers the OnMessageReceived() method as the callback to message arrival. The OnMessageReceived() method is defined as abstract here. Finally, It calls the BasicConsume() method on the queue represented by the QueueName from RabbitMqClientBase.

As seen in the previous section. The consumer classes in the individual projects inherit from this ConsumerBase class and override the OnMessageReceived() method as per business logic. They also override the QueueName property to consume from the intended queue.

We have already seen that the consumers implement the IHostedService from .NET so that the application startup executes the code in the constructor. This also ensures that the RabbitMq connection is closed gracefully by calling the Dispose() method from the RabbitMqClientBase class when the application stops.

public void Dispose()
{
    try
    {
        Channel?.Close();
        Channel?.Dispose();
        Channel = null;

        _connection?.Close();
        _connection?.Dispose();
        _connection = null;
    }
    catch (Exception ex)
    {
        _logger.LogCritical(ex, "Cannot dispose RabbitMQ channel or connection");
    }
}

All the consumers invoke this method from the StopAsync() method from the IHostedService interface.

The User Experience

Now, let’s send the same request from the previous article to the /checkout endpoint. This indicates the checkout of two cart items.

The Web API endpoint returns a successful response:

{
  "orderId": "82db8a24-57d0-469d-a4e2-33525f4a8679",
  "orderStatus": "Inprogress",
  "message": "Your order is in progress, you will receive an email with all details."
}

Request duration
128 MS

Now, we can see that the endpoint takes much less to return a response to the end-user. The rest of the checkout processing happens at each responsible microservice, independent of the response sent to the user. This is evident from the console logs entries from individual microservices:

info: Microservice.StockValidatorService.Validator[0]
      Stock is validated.

info: Microservice.TaxCalculatorService.Calculator[0]
      Customer lookup completed for customer 3fa85f64-5717-4562-b3fc-2c963f66afa6.
info: Microservice.TaxCalculatorService.Calculator[0]
      Tax value calculated for customer is 36.

info: Microservice.PaymentProcessingService.PaymentProcessor[0]
      Payment of 196 has been processed for customer 3fa85f64-5717-4562-b3fc-2c963f66afa6

info: Microservice.ReceiptGeneratorService.ReceiptGenerator[0]
      Receipt Generated and Order Status persisted in DB.
info: Microservice.ReceiptGeneratorService.ReceiptGenerator[0]
      Email is sent with Order Status and receipt.

Conclusion

The demo microservice used in this article ignores some best practices and business logic for brevity and focuses primarily on the long-running task execution. However, we have structured most of the code here keeping in mind the robustness associated with enterprise-level applications.

Liked it? Take a second to support Code Maze on Patreon and get the ad free reading experience!
Become a patron at Patreon!