Often we come across scenarios where invoking a Web API endpoint triggers long-running tasks. In this article, we’re going to explore the different ways we can implement such a long-running task in an ASP.NET Core Web API.
Let’s dive into it.
Long-Running Tasks Use Case
Let’s take an example from the E-Commerce domain. We normally find a checkout functionality in a shopping cart. This is potentially a long-running task. As the user clicks on the checkout button in UI, the system triggers several steps:
- Stock availability check for the cart items
- Tax calculation based on customer details
- Payment processing through a third party payment gateway
- Receipt generation and final email communication
This business process flow may fail in the stock check and payment processing steps. The system handles such failures and communicates the same to the end-user through email.
Overview of the Monolithic Application
We have prepared the sample monolith application that we are going to use in this article. We strongly suggest opening the source code while reading this article since it will help you to understand the entire implementation.
The Web API controller has a /checkout
endpoint. The UI client application invokes this endpoint:
[HttpPost("checkout")] [ProducesResponseType(typeof(CheckoutResponse), StatusCodes.Status200OK)] public async Task<IActionResult> Checkout(CheckoutRequest request) => Ok(await _checkoutCoordinator.ProcessCheckoutAsync(request));
The action method sends an instance of the CheckoutRequest
class to an implementation of the ICheckoutCoordinaor
.
Now, let’s see the registration code for ICheckoutCoordinaor
with the DI container:
builder.Services.AddSingleton<ICheckoutCoordinator, CheckoutCoordinatorV1>(); //builder.Services.AddSingleton<ICheckoutCoordinator, CheckoutCoordinatorV2>(); //builder.Services.AddSingleton<ICheckoutCoordinator, CheckoutCoordinatorV3>(); //builder.Services.AddSingleton<ICheckoutCoordinator, CheckoutCoordinatorV4>();
We register four versions of the CheckoutCoordinator
with the DI container. The different versions of the CheckoutCoordinator
contain the different approaches to implementing a long-running task. Only one version is active at any given time.
The CheckCoordinatorV1
class implements the ICheckoutCoordinator
interface. This implementation executes the checkout tasks one after the other. After all the processing is complete, the end-user receives the final response :
public async Task<CheckoutResponse> ProcessCheckoutAsync(CheckoutRequest request) { var response = new CheckoutResponse { OrderId = Guid.NewGuid() }; if (!await _stockValidator.ValidateAsync(request.LineItems)) { response.OrderStatus = OrderStatus.Failure; response.Message = "Item not available in stock"; await _receiptGenerator.ProcessFailuresAsync(request.CustomerId, response); return response; } var tax = await _taxCalculator.CalculateTaxAsync(request.CustomerId, request.LineItems); var amount = request.LineItems.Sum(li => li.Quantity * li.Price) + tax; if (!await _paymentProcessor.ProcessAsync(request.CustomerId, request.PaymentInfo, amount)) { response.OrderStatus = OrderStatus.Failure; response.Message = "Payment failure"; await _receiptGenerator.ProcessFailuresAsync(request.CustomerId, response); return response; } response.OrderStatus = OrderStatus.Successful; response.Message = "Order was successfully placed. You will receive the receipt in email"; await _receiptGenerator.GenerateAsync(request.CustomerId, response, amount); return response; }
This approach is blocking in nature as it makes the user wait for a long period to get a response till all the processing is complete.
Please note, that for this demo the StockValidator
, TaxCalculator
, PaymentProcessor,
and ReceiptGenerator
classes inside the Services
folder do not contain any business logic. They only simulate a process flow that may take a few seconds (long-running tasks):
public class TaxCalculator : ITaxCalculator { private Random _random = new Random(); private readonly ILogger _logger; public TaxCalculator(ILogger<TaxCalculator> logger) { _logger = logger; } public async Task<int> CalculateTaxAsync(Guid CustomerId, IEnumerable<OrderLineItem> orderLineItems) { await Task.Delay(500); _logger.LogInformation($"Customer lookup completed for customer {CustomerId}."); await Task.Delay(500); var tax = _random.Next(1, 200); _logger.LogInformation($"Tax value calculated for customer is {tax}."); return tax; } }
Here, the first Task.Delay
simulates customer lookup from a database/service. The customer address can be used for tax calculation. Similarly, the second delay simulates complex tax calculations for all line items for the customer. If you want, you can inspect all the other classes in the mentioned folder.
Let’s send the request to the /checkout
endpoint which indicates the checkout of two cart items:
curl -X 'POST' \ 'https://localhost:7074/api/ShoppingCart/checkout' \ -H 'accept: text/plain' \ -H 'Content-Type: application/json' \ -d '{ "customerId": "3fa85f64-5717-4562-b3fc-2c963f66afa6", "lineItems": [ { "productId": "3fa85f64-5717-4562-b3fc-2c963f66afb4", "productName": "Pen", "price": 20, "quantity": 2 }, { "productId": "3fa85f64-5717-4562-b3fc-2c963f66afa5", "productName": "Bag", "price": 120, "quantity": 1 } ], "paymentInfo": { "creditCardNumber": "411111111111111", "creditCardType": 1, "cvv": 233 } }'
The Web API endpoint returns a successful response after a certain period:
{ "orderId": "b4af9e92-7272-46b2-8bd2-b1e663118091", "orderStatus": "Successful", "message": "Order was successfully placed. You will receive the receipt in an email" } Request duration 6289 ms
As a result, we can see that the endpoint took more than 6s to complete the request. So, this is not an intuitive design from the user experience perspective.
Now, there are different solutions to this problem. Let’s look into the first approach.
Process Long-Running Tasks using Blocking Collection and TPL
The CheckoutCoordinatorV2
class uses the BlockingCollection<T>
class for long-running task processing:
public class CheckoutCoordinatorV2 : ICheckoutCoordinator { private BlockingCollection<QueueItem> _checkoutQueue; ... ... public CheckoutCoordinatorV2(IStockValidator stockValidator, ITaxCalculator taxCalculator, IPaymentProcessor paymentProcessor, IReceiptGenerator receiptGenerator) { ... _checkoutQueue = CreateCheckoutQueue(); } private BlockingCollection<QueueItem> CreateCheckoutQueue() { var queue = new BlockingCollection<QueueItem>(new ConcurrentQueue<QueueItem>()); Task.Factory.StartNew(async ()=> await ProcessAsync(queue), TaskCreationOptions.LongRunning); return queue; } ... ... }
The BlockingCollection<T> encapsulates producer-consumer collections like ConcurrentQueue
, ConcurrentBag
etc. It provides blocking and bounding capabilities.
The CheckoutCoordinatorV2
class constructor invokes the CreateCheckoutQueue()
method to initialize the BlockingCollection
instance. Internally, this BlockingCollection
uses a ConcurrentQueue
instance. It also offloads the queue item processing in a separate long-running task.
Furthermore, the Task.Factory.StartNew()
method from TPL offloads the processing to a separate Task
. The TaskCreationOptions.LongRunning
parameter hints to the task scheduler that an additional thread might be required for the task so that it does not block the forward progress of other threads or work items on the local thread-pool queue.
Now, let’s implement the ProcessCheckoutAsync()
method from the ICheckoutCoordinator
interface:
public Task<CheckoutResponse> ProcessCheckoutAsync(CheckoutRequest request) { var response = new CheckoutResponse { OrderId = Guid.NewGuid(), OrderStatus = OrderStatus.Inprogress, Message = "Your order is in progress and you will receive an email with all details once the processing completes." }; var queueItem = new QueueItem { OrderId = response.OrderId, Request = request }; _checkoutQueue.Add(queueItem); return Task.FromResult(response); }
Here, we create an instance of the CheckoutResponse
class with an order status of Inprogress
, a relevant message, and, a new order id.
Before returning the response to the end-user, we create an instance of the QueueItem
model class with the generated order id and other request properties. Then, the blocking collection enqueues the instance. This class instance acts as intermediate storage for order id and request data. This ensures that there is no data loss when the long-running background task processes the queue items in the blocking collection.
Additionally, let’s inspect CheckoutResponse
and the QueueItem
classes:
public class CheckoutResponse { public Guid OrderId { get; set; } public OrderStatus OrderStatus { get; set; } public string Message { get; set; } } public class QueueItem { public Guid OrderId { get; set; } public CheckoutRequest Request { get; set; } }
Asynchronous Processing in a Background Task
Now, let’s inspect the logic that we offload to the background task:
private async Task ProcessAsync(BlockingCollection<QueueItem> queue) { foreach (var item in queue.GetConsumingEnumerable()) { await ProcessEachQueueItemAsync(item); } }
We use the BlockingCollection<T>.GetConsumingEnumerable()
in the ProcessAsync()
method to remove items until adding is completed and the collection is empty. This is called a mutating enumeration or consuming enumeration because unlike a typical foreach
loop, this enumerator modifies the source collection by removing items.
We can also see that we call the ProcessEachQueueItemAsync()
method inside the ProcessAsync()
method:
private async Task ProcessEachQueueItemAsync(QueueItem item) { var response = new CheckoutResponse { OrderId = item.OrderId }; if (!await _stockValidator.ValidateAsync(item.Request.LineItems)) { response.OrderStatus = OrderStatus.Failure; response.Message = "Item not available in stock"; await _receiptGenerator.ProcessFailuresAsync(item.Request.CustomerId, response); return; } var tax = await _taxCalculator.CalculateTaxAsync(item.Request.CustomerId, item.Request.LineItems); var amount = item.Request.LineItems.Sum(li => li.Quantity * li.Price) + tax; if (!await _paymentProcessor.ProcessAsync(item.Request.CustomerId, item.Request.PaymentInfo, amount)) { response.OrderStatus = OrderStatus.Failure; response.Message = "Payment failure"; await _receiptGenerator.ProcessFailuresAsync(item.Request.CustomerId, response); return; } response.OrderStatus = OrderStatus.Successful; response.Message = "Order was successfully placed. You will receive the receipt in email"; await _receiptGenerator.GenerateAsync(item.Request.CustomerId, response, amount); }
In the ProcessEachQueueItemAsync()
method the checkout process continues one after the other as per the business rules. But now, this processing completely happens in a non-blocking way without making the end-user wait for a response till the processing completes. On completion of processing, the ReceiptGenerator
sends an email to the end-user with all the relevant details.
The controller action can also return an HTTP status code Accepted(202)
instead of OK(200)
with an endpoint where the result may be available at the end of processing. The client application then may choose to call this endpoint till a response is available.
Process Long-Running Task using Reactive Extensions
The CheckoutCoordinatorV3
class uses a Subject
instance to implement the observer pattern, and we need to register the required services in the Program
class:
builder.Services.AddSingleton<ReplaySubject<QueueItem>>(); builder.Services.AddSingleton<IObservable<QueueItem>>(x => x.GetRequiredService<ReplaySubject<QueueItem>>()); builder.Services.AddSingleton<IObserver<QueueItem>>(x => x.GetRequiredService<ReplaySubject<QueueItem>>());
Here, we register a singleton instance of the ReplaySubject<T>
with the DI container as the concrete implementation for both the IObserver<T>
and IObservable<T>
interfaces. This ensures that the same instance of the ReplaySubject<T>
is injected as both the observer and observable components on the resolution of dependency.
Now, let’s inspect the CheckoutCoordinatorV3
class:
public CheckoutCoordinatorV3(IObserver<QueueItem> checkoutStream) { _checkoutStream = checkoutStream; } public Task<CheckoutResponse> ProcessCheckoutAsync(CheckoutRequest request) { ... ... _checkoutStream.OnNext(queueItem); return Task.FromResult(response); }
We inject IObserver<QueueItem>
through the constructor with the ReplaySubject<QueueItem>
as an implementation. The CheckoutCoordinaorV3
class adds new data to the observable stream on getting a new request from the controller.
We also need an observer that will react to the arrival of new data in the data stream. The ObserverBackgroundWorker
which is implemented as a .NET Core BackgroundService
acts as the observer in this case:
public class ObserverBackgroundWorker : BackgroundService { private readonly IStockValidator _stockValidator; private readonly ITaxCalculator _taxCalculator; private readonly IPaymentProcessor _paymentProcessor; private readonly IReceiptGenerator _receiptGenerator; private readonly IObservable<QueueItem> _checkoutStream; private IDisposable? _subscription; public ObserverBackgroundWorker(IStockValidator stockValidator, ITaxCalculator taxCalculator, IPaymentProcessor paymentProcessor, IReceiptGenerator receiptGenerator, IObservable<QueueItem> checkoutStream) { _stockValidator = stockValidator; _taxCalculator = taxCalculator; _paymentProcessor = paymentProcessor; _receiptGenerator = receiptGenerator; _checkoutStream = checkoutStream; } protected override Task ExecuteAsync(CancellationToken stoppingToken) { _subscription = _checkoutStream.Subscribe(async item => await ProcessItemAsync(item)); return Task.CompletedTask; } ... ... }
The ExecuteAsync()
method from the BackgroundService
subscribes to the ReplaySubject<QueueItem>
instance. The DI container injects the ReplaySubject<QueueItem>
instance as IObservable<QueueItem>
.
The ProcessItemAsync()
gets executed as a callback when new data is added to the data stream from the CheckoutCoordinatorV3
class. This method continues the checkout processes one after the other as per the business rule in a non-blocking way similar to the ProcessEachQueueItemAsync()
method from the previous section.
Note that we register the ObserverBackgroundWorker
class in the DI container as the implementation of the IHostedService
interface:
builder.Services.AddHostedService<ObserverBackgroundWorker>();
Process Long-Running Task using System.Threading.Channel
This section uses a System.Threading.Channel
to implement a producer-consumer pattern. The CheckoutCoordinatorV4
class acts as the producer here:
public CheckoutCoordinatorV4(ICheckoutProcessingChannel checkoutProcessingChannel) { _checkoutProcessingChannel = checkoutProcessingChannel; } public async Task<CheckoutResponse> ProcessCheckoutAsync(CheckoutRequest request) { ... ... await _checkoutProcessingChannel.AddQueueItemAsync(queueItem); return response; }
Similar to the previous two sections, here also, the ProcessCheckoutAsync()
method returns a response with in-progress status to the end-user. But, before sending the response, we invoke the AddQueueItemAsync()
method from the CheckoutProcessingChannel
class.
Now, let’s see how we implement the channel communication in the CheckoutProcessingChannel
class:
public class CheckoutProcessingChannel : ICheckoutProcessingChannel { private const int MaxMessagesInChannel = 100; private readonly Channel<QueueItem> _channel; private readonly ILogger<CheckoutProcessingChannel> _logger; public CheckoutProcessingChannel(ILogger<CheckoutProcessingChannel> logger) { _logger = logger; var options = new BoundedChannelOptions(MaxMessagesInChannel) { SingleWriter = false, SingleReader = true }; _channel = Channel.CreateBounded<QueueItem>(options); } public async Task<bool> AddQueueItemAsync(QueueItem item, CancellationToken ct = default) { while (await _channel.Writer.WaitToWriteAsync(ct) && !ct.IsCancellationRequested) { if (_channel.Writer.TryWrite(item)) { _logger.LogInformation($"{item.OrderId} added to channel."); return true; } } return false; } public IAsyncEnumerable<QueueItem> ReadAllAsync(CancellationToken ct = default) => _channel.Reader.ReadAllAsync(ct); public bool TryCompleteWriter(Exception? ex = null) => _channel.Writer.TryComplete(ex); }
In the constructor, we create a bounded channel of the QueueItem
type. The channel allows multiple producers to support multiple concurrent checkout requests to the Web API endpoint. The bounded channel limits the capacity of the channel such that producers will have to wait if the channel is full. The AddQueueItemAsync()
method adds the item to the channel as long as there is a capacity to do so. Otherwise, it will asynchronously wait for space to be available.
The CheckoutProcessingChannel
class is a singleton wrapper to a System.Threading.Channel
instance that we use for communication between the producer and consumer:
builder.Services.AddSingleton<ICheckoutProcessingChannel, CheckoutProcessingChannel>();
A channel is a synchronization concept that supports passing data concurrently between producers and consumers. One of many producers can write data into the channel. Similarly, one or more consumers can read the same data from the channel.
Now, let’s inspect the ExecuteAsync()
method from the ChannelBackgroundWorker
class:
protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await foreach (var item in _checkoutProcessingChannel.ReadAllAsync()) { await ProcessItemAsync(item); } }
We read the data from the channel in the ExecuteAsync()
method. We use a new feature introduced in C# 8.0 called async streams.
So, the async streams feature allows awaiting an asynchronous foreach loop. The ReadAllAsync()
method in the CheckoutProcessingChannel
wrapper class exposes an IAsyncEnumerable
, which supports the await foreach
syntax.
This allows awaiting asynchronously for the availability of new data from the channel. So, every time a new QueueItem
instance is available in the channel, the AsyncEnumerator
will trigger the foreach
loop to run. Hence, the ProcessItemAsync()
is executed every time a new queue item is available in the channel. This method implementation is the same as the last two previous sections.
Here also, we register the ChannelBackgroundWorker
class in the DI container as the implementation of the IHostedService
interface:
builder.Services.AddHostedService<ChannelBackgroundWorker>();
The Updated User Experience
Now, let’s send the same request as earlier to the /checkout
endpoint which indicates the checkout of two cart items.
The Web API endpoint returns a successful response:
{ "orderId": "c3b73ebf-f309-44b3-a5cb-a6530f1ce13b", "orderStatus": "Inprogress", "message": "Your order is in progress and you will receive an email with all details once the processing completes." } Request Duration 115 ms
Here, we can see that the endpoint takes less time to return a response to the end-user. Of course, this means that our app handles long-running tasks much faster now. So, the rest of the checkout processing happens at its own pace in the background. This is evident from the console logs entries:
info: Monolith.ShoppingCartApi.Services.StockValidator[0] Stock is validated. info: Monolith.ShoppingCartApi.Services.TaxCalculator[0] Customer lookup completed for customer 3fa85f64-5717-4562-b3fc-2c963f66afa6. info: Monolith.ShoppingCartApi.Services.TaxCalculator[0] Tax value calculated for customer is 32. info: Monolith.ShoppingCartApi.Services.PaymentProcessor[0] Payment of 192 has been processed for customer 3fa85f64-5717-4562-b3fc-2c963f66afa6 info: Monolith.ShoppingCartApi.Services.ReceiptGenerator[0] Receipt Generated and Order Status persisted in DB. info: Monolith.ShoppingCartApi.Services.ReceiptGenerator[0] Email is sent with Order Status and receipt.
Conclusion
This article covers the different ways to process long-running tasks in an ASP.NET Core Monolithic application. However, in the next article, we will see the processing of a long-running task in a microservices architecture using a messaging platform like RabbitMQ.