In this article, we are going to demonstrate how the microservices architecture handles a long-running task execution. If you want to recap a similar task execution in a monolithic application, you can refer to this previous article.
Let’s dive into it.
Microservices Implementation of the Checkout Process
Let’s rearchitect the shopping cart monolithic application and decompose it into multiple microservices based on business capability.
As a result, we now have five microservices catering to the checkout process:
ShoppingCartApi
– Exposes the/checkout
endpoint to UI client and triggers checkout processStockValidatorService
– Validates the availability of stocks for all the line itemsTaxCalculatorService
– Calculates the tax based online items and customer addressPaymentProcessingService
– Processes the payment based on the credit card details, line items, and calculated taxReceptGeneratorService
– Generates and saves the receipt for the purchase. Also, responsible for email communication with the customer
For this demo, the microservices do not contain any business logic. They simulate a process flow that may take a few seconds.
How Microservices Handle a Long-Running Task
In a normal scenario, the ShoppingCartApi
would have communicated using a synchronous pattern like REST over HTTP with the StockValidatorService
and so on. However, in that case, the end-user had to wait for a response till the ReceptGeneratorService
would have generated a receipt or returned an error response. This would have resulted in the endpoint taking more than 6s to complete the request. So, this is not an intuitive design from the user experience perspective.
Here, our microservices solve this problem by communicating using an asynchronous communication pattern between them by utilizing RabbitMq queues for the long-running checkout process:
In our demo application, the /checkout
endpoint returns a status code of OK(200)
. The ReceptGeneratorService
sends an email with the order status to the end-user after completion of the order processing. However, we can design the /checkout
endpoint to return an HTTP status code Accepted(202)
instead of OK(200)
with another endpoint where the result will be available at the end of processing. The client application then may choose to call this endpoint till a response is available.
The Checkout Process Initiation
The ShoppingCartApi
has a /checkout
endpoint similar to the implementation we’ve already introduced. It initiates the long-running tasks related to the checkout process.
Now, let’s see how the CheckoutProcessor
class implements the ProcessCheckoutAsync()
method:
public class CheckoutProcessor : ICheckoutProcessor { public BlockingCollection<CheckoutItem> CheckoutQueue { get; } public CheckoutProcessor() { CheckoutQueue = new BlockingCollection<CheckoutItem>(new ConcurrentQueue<CheckoutItem>()); } public Task<CheckoutResponse> ProcessCheckoutAsync(CheckoutRequest request) { var response = new CheckoutResponse { OrderId = Guid.NewGuid(), OrderStatus = OrderStatus.Inprogress, Message = "Your order is in progress," + "you will receive an email with all details." }; var item = new CheckoutItem { OrderId = response.OrderId, Request = request }; CheckoutQueue.Add(item); return Task.FromResult(response); } }
Just like in the monolithic implementation, we create an instance of the CheckoutResponse
class with order status Inprogress
, a relevant message, and, a new order id. The end-user receives this response.
Before returning the response to the end-user, the code creates an instance of a CheckoutItem
class with the generated order id and other request properties. Now, a blocking collection enqueues this instance. The BlockingCollection<CheckoutItem>
is exposed as a public property of the CheckoutProcessor
class.
The CheckoutItem
instance represents the message body for communication with the other microservices:
public class CheckoutItem { public Guid OrderId { get; set; } public CheckoutRequest Request { get; set; } }
The CheckoutBackroundWorker
inherits from .NET’s BackgroundService
class and sends the message to a RabbitMq message queue for further processing:
public CheckoutBackgroundWorker(IMessageProducer<CheckoutItem> producer, ICheckoutProcessor checkoutProcessor) { _producer = producer; _checkoutProcessor = checkoutProcessor; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await Task.Factory.StartNew(() => Process(stoppingToken), TaskCreationOptions.LongRunning); } private void Process(CancellationToken stoppingToken) { foreach (var item in _checkoutProcessor.CheckoutQueue.GetConsumingEnumerable(stoppingToken)) { _producer.SendMessage(item); } }
Here, we use the BlockingCollection.GetConsumingEnumerable
on the CheckoutQueue
property of the CheckoutProcessor
class. Each item from the queue is published to a message queue using the CheckoutItemProducer
class.
You may wonder why a BlockingCollection
is used here instead of directly publishing the message on invocation of the /checkout
endpoint. The publishing of the message to the queue blocks the response to the caller. To be more precise, the end-user does not receive a response till the code publishes the message to the queue. There is no need for this blocking in this scenario as the message publishing task can happen in parallel while the end-user receives the response.
The same can also be achieved using System.Threading.Channel
.
The CheckoutItemProducer
class implements the ProducerBase
abstract class and the IMessageProducer
interfaces.
Asynchronous Communication Between Microservices
This business case dictates that the checkout processes like stock validation, payment processing, etc. can run asynchronously. Hence, we’ll be using the asynchronous mode of communication between the microservices using the RabbitMq message broker.
The ShoppingCart Web Api
Invoking the /checkout
endpoint triggers the checkout process. Further communication with the other microservices takes place using message queues:
public class CheckoutItemProducer : ProducerBase<CheckoutItem> { public CheckoutItemProducer(ConnectionFactory connectionFactory, ILogger<RabbitMqClientBase> logger, ILogger<ProducerBase<CheckoutItem>> producerBaseLogger) : base(connectionFactory, logger, producerBaseLogger) { } protected override string QueueName => "stock-validator"; }
First, the ShoppingCartApi
sends the CheckoutItem
message to the stock-validator
queue using the CheckoutItemProducer
class.
The Stock Validator Service Functionality
The StockValidatorService
consumes the message from the stock-validator
queue and executes stock validation logic:
public class StockValidatorConsumer : ConsumerBase, IHostedService { private readonly IMessageProducer<Failure> _failureProducer; private readonly IMessageProducer<CheckoutItem> _successProducer; private readonly IValidator _validator; private readonly ILogger<StockValidatorConsumer> _logger; public StockValidatorConsumer(ConnectionFactory connectionFactory, ILogger<RabbitMqClientBase> rabbitMqClientBaseLogger, ILogger<ConsumerBase> consumerBaseLogger, IMessageProducer<Failure> failureProducer, IMessageProducer<CheckoutItem> successProducer, IValidator validator, ILogger<StockValidatorConsumer> logger) : base(connectionFactory, rabbitMqClientBaseLogger, consumerBaseLogger) { _failureProducer = failureProducer; _successProducer = successProducer; _validator = validator; _logger = logger; } protected override async Task OnMessageReceived(object? sender, BasicDeliverEventArgs @event) { try { var body = Encoding.UTF8.GetString(@event.Body.ToArray()); var checkoutItem = JsonConvert.DeserializeObject<CheckoutItem>(body); if (await _validator.ValidateAsync(checkoutItem.Request.LineItems)) { _successProducer.SendMessage(checkoutItem); return; } var failureMessage = new Failure { CustomerId = checkoutItem.Request.CustomerId, OrderId = checkoutItem.OrderId, Message = "Item not available in stock", Source = typeof(Program).Assembly.GetName().Name ?? string.Empty }; _failureProducer.SendMessage(failureMessage); } catch (Exception ex) { _logger.LogCritical(ex, "Error while retrieving message from queue."); } finally { Channel.BasicAck(@event.DeliveryTag, false); } } protected override string QueueName => "stock-validator"; ... ... }
The StockValidatorConsumer
class inherits from the ConsumerBase
and is registered as a concrete implementation for the IHostedService
interface.
builder.Services.AddHostedService<StockValidatorConsumer>();
This ensures that the StockValidatorService
application executes the ConsumerBase
constructor code at the application startup. The constructor code contains message subscription logic.
All microservices in this demo use the same strategy for all background workers that implement the ConsumerBase
.
So, in case of successful stock validation we publish the CheckoutItem
message to the stock-validation-successful
queue:
public class StockValidationSuccessfulProducer : ProducerBase<CheckoutItem> { public StockValidationSuccessfulProducer(ConnectionFactory connectionFactory, ILogger<RabbitMqClientBase> logger, ILogger<ProducerBase<CheckoutItem>> producerBaseLogger) : base(connectionFactory, logger, producerBaseLogger) { } protected override string QueueName => "stock-validation-successful"; }
The StockValidationSuccessfulProducer
inherits the ProducerBase
abstract class. This class contains the logic for publishing the messages. All microservices in this demo use the same strategy for message publishing by implementing the ProducerBase
class.
But, on failure, we publish the Failure
message to the stock-validation-failure
queue:
public class Failure { public Guid OrderId { get; set; } public Guid CustomerId { get; set; } public string Message { get; set; } public string Source { get; set; } } public class StockValidationFailureProducer : ProducerBase<Failure> { public StockValidationFailureProducer(ConnectionFactory connectionFactory, ILogger<RabbitMqClientBase> logger, ILogger<ProducerBase<Failure>> producerBaseLogger) : base(connectionFactory, logger, producerBaseLogger) { } protected override string QueueName => "stock-validation-failure"; }
The StockValidationFailureProducer
class is responsible for publishing the failure message.
The Tax Calculator Service Functionality
The TaxCalculatorService
contains a background worker class StockValidationSuccessConsumer
that is the consumer for the stock-validation-successful
queue messages:
protected override async Task OnMessageReceived(object? sender, BasicDeliverEventArgs @event) { try { var body = Encoding.UTF8.GetString(@event.Body.ToArray()); var checkoutItem = JsonConvert.DeserializeObject<CheckoutItem>(body); var tax = await _calculator.CalculateTaxAsync(checkoutItem.Request.CustomerId, checkoutItem.Request.LineItems); var taxMessage = new Tax { OrderId = checkoutItem.OrderI Request = checkoutItem.Request, TaxAmount = tax }; _taxProducer.SendMessage(taxMessage); } catch (Exception ex) { _logger.LogCritical(ex, "Error while retrieving message from queue."); } finally { Channel.BasicAck(@event.DeliveryTag, false); } } protected override string QueueName => "stock-validation-successful";
On receiving a message it calculates the tax and publishes the Tax
message containing the tax amount to the tax
queue:
public class Tax : CheckoutItem { public int TaxAmount { get; set; } } public class TaxProducer : ProducerBase<Tax> { public TaxProducer(ConnectionFactory connectionFactory, ILogger<RabbitMqClientBase> logger, ILogger<ProducerBase<Tax>> producerBaseLogger) : base(connectionFactory, logger, producerBaseLogger) { } protected override string QueueName => "tax"; }
Here, the Tax
message class derives from CheckoutItem
and has an additional property TaxAmount
. The TaxProducer
class is responsible for publishing this message to the queue.
The Payment Processing Service Functionality
The PaymentProcessingService
listens for messages from the tax
queue using the background worker TaxConsumer
class. On arrival of a message, it consumes it, calculates the total amount, and tries to process the payment:
protected override async Task OnMessageReceived(object? sender, BasicDeliverEventArgs @event) { try { var body = Encoding.UTF8.GetString(@event.Body.ToArray()); var taxMessage = JsonConvert.DeserializeObject<Tax>(body); var amount = taxMessage.Request.LineItems.Sum(li => li.Quantity * li.Price) + taxMessage.TaxAmount; if (await _paymentProcessor.ProcessAsync(taxMessage.Request.CustomerId, taxMessage.Request.PaymentInfo, amount)) { var paymentSuccessMessage = new PaymentSuccess { OrderId = taxMessage.OrderId, Request = taxMessage.Request, Amount = amount }; _successProducer.SendMessage(paymentSuccessMessage); return; } var failureMessage = new Failure { CustomerId = taxMessage.Request.CustomerId, OrderId = taxMessage.OrderId, Message = "Payment failure", Source = typeof(Program).Assembly.GetName().Name ?? string.Empty }; _failureProducer.SendMessage(failureMessage); } catch (Exception ex) { _logger.LogCritical(ex, "Error while retrieving message from queue."); } finally { Channel.BasicAck(@event.DeliveryTag, false); } } protected override string QueueName => "tax";
Now, in case of successful payment processing, we publish the PaymentSuccess
message containing the amount to the payment-successful
queue:
public class PaymentSuccess : CheckoutItem { public int Amount { get; set; } } public class PaymentSuccessfulProducer : ProducerBase<PaymentSuccess> { public PaymentSuccessfulProducer(ConnectionFactory connectionFactory, ILogger<RabbitMqClientBase> logger, ILogger<ProducerBase<PaymentSuccess>> producerBaseLogger : base(connectionFactory, logger, producerBaseLogger) { } protected override string QueueName => "payment-successful"; }
The PaymentSuccess
message class inherits from CheckoutItem
and has an additional property Amount
. The PaymentSuccessfulProducer
publishes the payment successful message to the assigned queue.
However, on payment failure, we publish the Failure
message to the payment-failure
queue:
public class PaymentFailureProducer : ProducerBase<Failure> { public PaymentFailureProducer(ConnectionFactory connectionFactory, ILogger<RabbitMqClientBase> logger, ILogger<ProducerBase<Failure>> producerBaseLogger) : base(connectionFactory, logger, producerBaseLogger) { } protected override string QueueName => "payment-failure"; }
The PaymentFailureProducer
class publishes the failure message to the payment-failure
queue.
The Receipt Generator Service Functionality
The ReceiptGeneratorService
is a consumer to three queues. They are, stock-validation-failure
, payment-failure
, and payment-successful
:
protected override async Task OnMessageReceived(object? sender, BasicDeliverEventArgs @event) { try { var body = Encoding.UTF8.GetString(@event.Body.ToArray()); var failureMessage = JsonConvert.DeserializeObject<Failure>(body); await _generator.ProcessFailuresAsync(failureMessage.CustomerId, failureMessage.OrderId, failureMessage.Message); } catch (Exception ex) { _logger.LogCritical(ex, "Error while retrieving message from queue."); } finally { Channel.BasicAck(@event.DeliveryTag, false); } }
So, this is the common code for failure scenarios implemented by the PaymentFailureConsumer
and StockValidationFailureConsumer
classes. In this case, the ProcessFailuresAsync()
method from ReceiptGenerator
class is executed. This method simulates storing the reason for failure and sending an email to the user informing the status of the order.
Let’s explore the PaymentSuccessfulConsumer
class now:
protected override async Task OnMessageReceived(object? sender, BasicDeliverEventArgs @event) { try { var body = Encoding.UTF8.GetString(@event.Body.ToArray()); var successMessage = JsonConvert.DeserializeObject<PaymentSuccess>(body); await _generator.GenerateAsync(successMessage.Request.CustomerId, successMessage.OrderId, successMessage.Amount); } catch (Exception ex) { _logger.LogCritical(ex, "Error while retrieving message from queue."); } finally { Channel.BasicAck(@event.DeliveryTag, false); } } protected override string QueueName => "payment-successful";
At last, if the payment is successful, we execute the GenerateAsync()
method from the ReciptGenerator
class. This method simulates the generation and storing of the receipt and also sends an email with the necessary details to the user.
The producer and consumer classes in the microservices derive from respective base classes which abstract most of the RabbitMq interaction. Now, let’s explore them in detail.
Using RabbitMq for Asynchronous Message Processing
We are using the NuGet package RabbitMQ.Client
for interfacing with RabbitMq:
PM> Install-Package RabbitMQ.Client -Version 6.2.4
All the microservices implement the RabbitMq producer and consumer code. To avoid code duplication, we have created some base classes.
The RabbitMqClientBase
class encapsulates the logic for connecting to the RabbitMq server:
protected RabbitMqClientBase(ConnectionFactory connectionFactory, ILogger<RabbitMqClientBase> logger) { _connectionFactory = connectionFactor _logger = logger Connect(); } private void Connect() { if (_connection == null || _connection.IsOpen == false) { _connection = _connectionFactory.CreateConnection(); } if (Channel == null || Channel.IsOpen == false) { Channel = _connection.CreateModel(); Channel.QueueDeclare(queue: QueueName, durable: false, exclusive: false, autoDelete: false); } }
The constructor invokes the Connect()
method and it checks if the connection is already established. If it is not, it creates a connection. This method also creates the queue if it is already not present in the RabbitMq server.
We register the ConnectionFactory
as a singleton in the DI container and inject it via constructor to the RabbitMqClientBase
class:
builder.Services.AddSingleton(sp => { var uri = new Uri("URL FOR RABBITMQ SERVER"); return new ConnectionFactory { Uri = uri }; });
Replace the placeholder RabbitMq server URL with an original endpoint depending on the RabbitMq server/cluster installation.
The ProducerBase Code
Now, Let’s find out how we implement the ProducerBase
class that is inherited by all message producers.
public interface IMessageProducer<in T> { void SendMessage(T message); } public abstract class ProducerBase<T> : RabbitMqClientBase, IMessageProducer<T> { private readonly ILogger<ProducerBase<T>> _logger; protected ProducerBase(ConnectionFactory connectionFactory, ILogger<RabbitMqClientBase> logger, ILogger<ProducerBase<T>> producerBaseLogger) : base(connectionFactory, logger) { _logger = producerBaseLogger; } public virtual void SendMessage(T message) { try { var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)); var properties = Channel.CreateBasicProperties(); properties.ContentType = "application/json"; properties.DeliveryMode = 1; // Doesn't persist to disk properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()); Channel.BasicPublish(exchange: "", routingKey: QueueName, body: body, basicProperties: properties); } catch (Exception ex) { _logger.LogCritical(ex, "Error while publishing"); } } }
The ProducerBase
class inherits from the RabbitMqClientBase
class. It ensures that the application establishes a connection with the RabbitMQ server before sending any messages.
This class also implements the SendMessage()
method from the IMessageProducer
interface. This method converts the message to a byte[]
from POCO and sets some metadata properties for the message. Then, it publishes the message to the queue represented by the QueueName
property from RabbitMqClientBase
class.
As seen in the previous section, The producer classes in the microservices projects simply need to inherit from the ProducerBase
class and override the QueueName
property to publish to the intended queue.
The ConsumerBase Code
Let’s explore the ConsumerBase
class that is inherited by all message consumers:
public abstract class ConsumerBase : RabbitMqClientBase { protected ConsumerBase(ConnectionFactory connectionFactory, ILogger<RabbitMqClientBase> logger, ILogger<ConsumerBase> consumerLogger) : base(connectionFactory, logger) { try { var consumer = new AsyncEventingBasicConsumer(Channel); consumer.Received += OnMessageReceived; Channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer); } catch (Exception ex) { consumerLogger.LogCritical(ex, "Error while consuming message"); } } }
The ConsumerBase
class inherits from the RabbitMqClientBase
class as well. The constructor instantiates an AsyncEventingBasicConsumer
instance and registers the OnMessageReceived()
method as the callback to message arrival. The OnMessageReceived()
method is defined as abstract here. Finally, It calls the BasicConsume()
method on the queue represented by the QueueName
from RabbitMqClientBase
.
As seen in the previous section. The consumer classes in the individual projects inherit from this ConsumerBase
class and override the OnMessageReceived()
method as per business logic. They also override the QueueName
property to consume from the intended queue.
We have already seen that the consumers implement the IHostedService
from .NET so that the application startup executes the code in the constructor. This also ensures that the RabbitMq connection is closed gracefully by calling the Dispose()
method from the RabbitMqClientBase
class when the application stops.
public void Dispose() { try { Channel?.Close(); Channel?.Dispose(); Channel = null; _connection?.Close(); _connection?.Dispose(); _connection = null; } catch (Exception ex) { _logger.LogCritical(ex, "Cannot dispose RabbitMQ channel or connection"); } }
All the consumers invoke this method from the StopAsync()
method from the IHostedService
interface.
The User Experience
Now, let’s send the same request from the previous article to the /checkout
endpoint. This indicates the checkout of two cart items.
The Web API endpoint returns a successful response:
{ "orderId": "82db8a24-57d0-469d-a4e2-33525f4a8679", "orderStatus": "Inprogress", "message": "Your order is in progress, you will receive an email with all details." } Request duration 128 MS
Now, we can see that the endpoint takes much less to return a response to the end-user. The rest of the checkout processing happens at each responsible microservice, independent of the response sent to the user. This is evident from the console logs entries from individual microservices:
info: Microservice.StockValidatorService.Validator[0] Stock is validated. info: Microservice.TaxCalculatorService.Calculator[0] Customer lookup completed for customer 3fa85f64-5717-4562-b3fc-2c963f66afa6. info: Microservice.TaxCalculatorService.Calculator[0] Tax value calculated for customer is 36. info: Microservice.PaymentProcessingService.PaymentProcessor[0] Payment of 196 has been processed for customer 3fa85f64-5717-4562-b3fc-2c963f66afa6 info: Microservice.ReceiptGeneratorService.ReceiptGenerator[0] Receipt Generated and Order Status persisted in DB. info: Microservice.ReceiptGeneratorService.ReceiptGenerator[0] Email is sent with Order Status and receipt.
Conclusion
The demo microservice used in this article ignores some best practices and business logic for brevity and focuses primarily on the long-running task execution. However, we have structured most of the code here keeping in mind the robustness associated with enterprise-level applications.