Often we come across scenarios where invoking a Web API endpoint triggers long-running tasks. In this article, we’re going to explore the different ways we can implement such a long-running task in an ASP.NET Core Web API.

To download the source code for this article, you can visit our GitHub repository.

Let’s dive into it.

Support Code Maze on Patreon to get rid of ads and get the best discounts on our products!
Become a patron at Patreon!

Long-Running Tasks Use Case

Let’s take an example from the E-Commerce domain. We normally find a checkout functionality in a shopping cart. This is potentially a long-running task. As the user clicks on the checkout button in UI, the system triggers several steps:

  • Stock availability check for the cart items
  • Tax calculation based on customer details
  • Payment processing through a third party payment gateway
  • Receipt generation and final email communication

This business process flow may fail in the stock check and payment processing steps. The system handles such failures and communicates the same to the end-user through email.

Overview of the Monolithic Application

We have prepared the sample monolith application that we are going to use in this article. We strongly suggest opening the source code while reading this article since it will help you to understand the entire implementation.

The Web API controller has a /checkout endpoint. The UI client application invokes this endpoint:

[HttpPost("checkout")]
[ProducesResponseType(typeof(CheckoutResponse), StatusCodes.Status200OK)]
public async Task<IActionResult> Checkout(CheckoutRequest request) 
    => Ok(await _checkoutCoordinator.ProcessCheckoutAsync(request));

The action method sends an instance of the CheckoutRequest class to an implementation of the ICheckoutCoordinaor.

Now, let’s see the registration code for ICheckoutCoordinaor with the DI container:

builder.Services.AddSingleton<ICheckoutCoordinator, CheckoutCoordinatorV1>();
//builder.Services.AddSingleton<ICheckoutCoordinator, CheckoutCoordinatorV2>();
//builder.Services.AddSingleton<ICheckoutCoordinator, CheckoutCoordinatorV3>();
//builder.Services.AddSingleton<ICheckoutCoordinator, CheckoutCoordinatorV4>();

We register four versions of the CheckoutCoordinator with the DI container. The different versions of the CheckoutCoordinator contain the different approaches to implementing a long-running task. Only one version is active at any given time.

The CheckCoordinatorV1 class implements the ICheckoutCoordinator interface.  This implementation executes the checkout tasks one after the other.  After all the processing is complete, the end-user receives the final response :

public async Task<CheckoutResponse> ProcessCheckoutAsync(CheckoutRequest request)
{
       var response = new CheckoutResponse 
       { 
            OrderId = Guid.NewGuid() 
       };

       if (!await _stockValidator.ValidateAsync(request.LineItems))
       {
           response.OrderStatus = OrderStatus.Failure;
           response.Message = "Item not available in stock";

           await _receiptGenerator.ProcessFailuresAsync(request.CustomerId, response);
                
           return response;
        }

        var tax = await _taxCalculator.CalculateTaxAsync(request.CustomerId, request.LineItems);
        var amount = request.LineItems.Sum(li => li.Quantity * li.Price) + tax;

        if (!await _paymentProcessor.ProcessAsync(request.CustomerId, request.PaymentInfo, amount))
        {
            response.OrderStatus = OrderStatus.Failure;
            response.Message = "Payment failure";
                
            await _receiptGenerator.ProcessFailuresAsync(request.CustomerId, response);
                
            return response;
        }

        response.OrderStatus = OrderStatus.Successful;
        response.Message = "Order was successfully placed. You will receive the receipt in email";
        await _receiptGenerator.GenerateAsync(request.CustomerId, response, amount);
        return response;            
}

This approach is blocking in nature as it makes the user wait for a long period to get a response till all the processing is complete.

Please note, that for this demo the StockValidator, TaxCalculator, PaymentProcessor, and ReceiptGenerator classes inside the Services folder do not contain any business logic. They only simulate a process flow that may take a few seconds (long-running tasks):

public class TaxCalculator : ITaxCalculator
{
    private Random _random = new Random();
    private readonly ILogger _logger;
    
    public TaxCalculator(ILogger<TaxCalculator> logger)
    {
        _logger = logger;
    }

    public async Task<int> CalculateTaxAsync(Guid CustomerId, IEnumerable<OrderLineItem> orderLineItems)
    {        
        await Task.Delay(500);
        _logger.LogInformation($"Customer lookup completed for customer {CustomerId}.");

        await Task.Delay(500);
        var tax = _random.Next(1, 200);

        _logger.LogInformation($"Tax value calculated for customer is {tax}.");

        return tax;
    }
}

Here, the first Task.Delay simulates customer lookup from a database/service. The customer address can be used for tax calculation. Similarly, the second delay simulates complex tax calculations for all line items for the customer. If you want, you can inspect all the other classes in the mentioned folder.

Let’s send the request to the /checkout endpoint which indicates the checkout of two cart items:

curl -X 'POST' \
  'https://localhost:7074/api/ShoppingCart/checkout' \
  -H 'accept: text/plain' \
  -H 'Content-Type: application/json' \
  -d '{
  "customerId": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
  "lineItems": [
    {
      "productId": "3fa85f64-5717-4562-b3fc-2c963f66afb4",
      "productName": "Pen",
      "price": 20,
      "quantity": 2
    },
     {
      "productId": "3fa85f64-5717-4562-b3fc-2c963f66afa5",
      "productName": "Bag",
      "price": 120,
      "quantity": 1
    }    
  ],
  "paymentInfo": {
    "creditCardNumber": "411111111111111",
    "creditCardType": 1,
    "cvv": 233
  }
}'

The Web API endpoint returns a successful response after a certain period:

{
  "orderId": "b4af9e92-7272-46b2-8bd2-b1e663118091",
  "orderStatus": "Successful",
  "message": "Order was successfully placed. You will receive the receipt in an email"
}

Request duration
6289 ms

As a result, we can see that the endpoint took more than 6s to complete the request. So, this is not an intuitive design from the user experience perspective.

Now, there are different solutions to this problem. Let’s look into the first approach.

Process Long-Running Tasks using Blocking Collection and TPL

The CheckoutCoordinatorV2 class uses the BlockingCollection<T> class for long-running task processing:

public class CheckoutCoordinatorV2 : ICheckoutCoordinator
{
    private BlockingCollection<QueueItem> _checkoutQueue;        
    ...
    ...
    
    public CheckoutCoordinatorV2(IStockValidator stockValidator,
                                 ITaxCalculator taxCalculator,
                                 IPaymentProcessor paymentProcessor,
                                 IReceiptGenerator receiptGenerator)
    {
       ...      
            
        _checkoutQueue = CreateCheckoutQueue();
    }

    private  BlockingCollection<QueueItem> CreateCheckoutQueue()
    {
        var queue = new BlockingCollection<QueueItem>(new ConcurrentQueue<QueueItem>());            

        Task.Factory.StartNew(async ()=> await ProcessAsync(queue), TaskCreationOptions.LongRunning);

        return  queue;
    }
    ...
    ...
}

The BlockingCollection<T> encapsulates producer-consumer collections like ConcurrentQueue, ConcurrentBag etc. It provides blocking and bounding capabilities.

The CheckoutCoordinatorV2 class constructor invokes the CreateCheckoutQueue() method to initialize the BlockingCollection instance. Internally, this BlockingCollection uses a ConcurrentQueue instance. It also offloads the queue item processing in a separate long-running task.

Furthermore, the Task.Factory.StartNew() method from TPL offloads the processing to a separate Task. The TaskCreationOptions.LongRunning parameter hints to the task scheduler that an additional thread might be required for the task so that it does not block the forward progress of other threads or work items on the local thread-pool queue.

Now, let’s implement the ProcessCheckoutAsync() method from the ICheckoutCoordinator interface:

public Task<CheckoutResponse> ProcessCheckoutAsync(CheckoutRequest request)
{
    var response = new CheckoutResponse 
    { 
        OrderId = Guid.NewGuid(),
        OrderStatus = OrderStatus.Inprogress,
        Message = "Your order is in progress and you will receive an email with all details once the processing completes."
    };

     var queueItem = new QueueItem
     {
         OrderId = response.OrderId,
         Request = request
     };
            
     _checkoutQueue.Add(queueItem);

     return Task.FromResult(response);

}

Here, we create an instance of the CheckoutResponse class with an order status of Inprogress, a relevant message, and, a new order id.

Before returning the response to the end-user, we create an instance of the QueueItem model class with the generated order id and other request properties. Then, the blocking collection enqueues the instance. This class instance acts as intermediate storage for order id and request data. This ensures that there is no data loss when the long-running background task processes the queue items in the blocking collection.

Additionally, let’s inspect CheckoutResponse and the QueueItem classes:

public class CheckoutResponse
{
    public Guid OrderId { get; set; }
             
    public OrderStatus OrderStatus { get; set; }

    public string Message { get; set; }
}

public class QueueItem
{
    public Guid OrderId { get; set; }

    public CheckoutRequest Request { get; set; }
}

Asynchronous Processing in a Background Task

Now, let’s inspect the logic that we offload to the background task:

private async Task ProcessAsync(BlockingCollection<QueueItem> queue)
{           
     foreach (var item in queue.GetConsumingEnumerable())
     {
         await ProcessEachQueueItemAsync(item);
     }
}

We use the BlockingCollection<T>.GetConsumingEnumerable() in the ProcessAsync() method to remove items until adding is completed and the collection is empty. This is called a mutating enumeration or consuming enumeration because unlike a typical foreach loop, this enumerator modifies the source collection by removing items.

We can also see that we call the ProcessEachQueueItemAsync() method inside the ProcessAsync() method:

private async Task ProcessEachQueueItemAsync(QueueItem item)
{
     var response = new CheckoutResponse
     {
          OrderId = item.OrderId
     };

     if (!await _stockValidator.ValidateAsync(item.Request.LineItems))
     {
          response.OrderStatus = OrderStatus.Failure;
          response.Message = "Item not available in stock";
                
          await _receiptGenerator.ProcessFailuresAsync(item.Request.CustomerId, response);

          return;
     }

     var tax = await _taxCalculator.CalculateTaxAsync(item.Request.CustomerId, item.Request.LineItems);
     var amount = item.Request.LineItems.Sum(li => li.Quantity * li.Price) + tax;

     if (!await _paymentProcessor.ProcessAsync(item.Request.CustomerId, item.Request.PaymentInfo, amount))
     {
          response.OrderStatus = OrderStatus.Failure;
          response.Message = "Payment failure";

          await _receiptGenerator.ProcessFailuresAsync(item.Request.CustomerId, response);

          return;
     }

     response.OrderStatus = OrderStatus.Successful;
     response.Message = "Order was successfully placed. You will receive the receipt in email";
     await _receiptGenerator.GenerateAsync(item.Request.CustomerId, response, amount);
} 

In the ProcessEachQueueItemAsync() method the checkout process continues one after the other as per the business rules. But now, this processing completely happens in a non-blocking way without making the end-user wait for a response till the processing completes. On completion of processing, the ReceiptGenerator sends an email to the end-user with all the relevant details.

The controller action can also return an HTTP status code Accepted(202) instead of OK(200) with an endpoint where the result may be available at the end of processing. The client application then may choose to call this endpoint till a response is available.

Process Long-Running Task using Reactive Extensions

The CheckoutCoordinatorV3 class uses a Subject instance to implement the observer pattern, and we need to register the required services in the Program class:

builder.Services.AddSingleton<ReplaySubject<QueueItem>>();
builder.Services.AddSingleton<IObservable<QueueItem>>(x => x.GetRequiredService<ReplaySubject<QueueItem>>());
builder.Services.AddSingleton<IObserver<QueueItem>>(x => x.GetRequiredService<ReplaySubject<QueueItem>>());

Here, we register a singleton instance of the ReplaySubject<T> with the DI container as the concrete implementation for both the IObserver<T> and IObservable<T> interfaces. This ensures that the same instance of the ReplaySubject<T> is injected as both the observer and observable components on the resolution of dependency.

Now, let’s inspect the CheckoutCoordinatorV3 class:

public CheckoutCoordinatorV3(IObserver<QueueItem> checkoutStream)
{
     _checkoutStream = checkoutStream;
}

public Task<CheckoutResponse> ProcessCheckoutAsync(CheckoutRequest request)
{
    ...
    ...
    _checkoutStream.OnNext(queueItem);

    return Task.FromResult(response);
}

We inject IObserver<QueueItem> through the constructor with the ReplaySubject<QueueItem> as an implementation. The CheckoutCoordinaorV3 class adds new data to the observable stream on getting a new request from the controller.

We also need an observer that will react to the arrival of new data in the data stream. The ObserverBackgroundWorker which is implemented as a .NET Core BackgroundService acts as the observer in this case:

public class ObserverBackgroundWorker : BackgroundService
{
    private readonly IStockValidator _stockValidator;
    private readonly ITaxCalculator _taxCalculator;
    private readonly IPaymentProcessor _paymentProcessor;
    private readonly IReceiptGenerator _receiptGenerator;
    private readonly IObservable<QueueItem> _checkoutStream;        
    private IDisposable? _subscription;

    public ObserverBackgroundWorker(IStockValidator stockValidator, ITaxCalculator taxCalculator,
        IPaymentProcessor paymentProcessor, IReceiptGenerator receiptGenerator,
        IObservable<QueueItem> checkoutStream)
    {
        _stockValidator = stockValidator;
        _taxCalculator = taxCalculator;
        _paymentProcessor = paymentProcessor;
        _receiptGenerator = receiptGenerator;
        _checkoutStream = checkoutStream;            
    }       

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _subscription = _checkoutStream.Subscribe(async item => await ProcessItemAsync(item));
        return Task.CompletedTask;            
    }
    ...
    ...      
}

The ExecuteAsync() method from the BackgroundService subscribes to the ReplaySubject<QueueItem> instance. The DI container injects the ReplaySubject<QueueItem> instance as IObservable<QueueItem>.

The ProcessItemAsync() gets executed as a callback when new data is added to the data stream from the CheckoutCoordinatorV3 class. This method continues the checkout processes one after the other as per the business rule in a non-blocking way similar to the ProcessEachQueueItemAsync() method from the previous section.

Note that we register the ObserverBackgroundWorker class in the DI container as the implementation of the IHostedService interface:

builder.Services.AddHostedService<ObserverBackgroundWorker>();

Process Long-Running Task using System.Threading.Channel

This section uses a System.Threading.Channel to implement a producer-consumer pattern. The CheckoutCoordinatorV4 class acts as the producer here:

public CheckoutCoordinatorV4(ICheckoutProcessingChannel checkoutProcessingChannel)
{
    _checkoutProcessingChannel = checkoutProcessingChannel; 
}

public async Task<CheckoutResponse> ProcessCheckoutAsync(CheckoutRequest request)
{
    ...
    ...
  
    await _checkoutProcessingChannel.AddQueueItemAsync(queueItem);  
            
    return response;
}

Similar to the previous two sections, here also, the ProcessCheckoutAsync() method returns a response with in-progress status to the end-user. But, before sending the response, we invoke the AddQueueItemAsync() method from the CheckoutProcessingChannel class.

Now, let’s see how we implement the channel communication in the CheckoutProcessingChannel class:

public class CheckoutProcessingChannel : ICheckoutProcessingChannel
{
    private const int MaxMessagesInChannel = 100;
    private readonly Channel<QueueItem> _channel;
    private readonly ILogger<CheckoutProcessingChannel> _logger;

    public CheckoutProcessingChannel(ILogger<CheckoutProcessingChannel> logger)
    {
        _logger = logger;

        var options = new BoundedChannelOptions(MaxMessagesInChannel)
        {
           SingleWriter = false,
           SingleReader = true
        };

        _channel = Channel.CreateBounded<QueueItem>(options);            
    }

    public async Task<bool> AddQueueItemAsync(QueueItem item, CancellationToken ct = default)
    {
        while (await _channel.Writer.WaitToWriteAsync(ct) && !ct.IsCancellationRequested)
        {
           if (_channel.Writer.TryWrite(item))
           {
              _logger.LogInformation($"{item.OrderId} added to channel.");
              return true;
           }
        }

        return false;
    }

    public IAsyncEnumerable<QueueItem> ReadAllAsync(CancellationToken ct = default) 
       => _channel.Reader.ReadAllAsync(ct);

    public bool TryCompleteWriter(Exception? ex = null) 
       => _channel.Writer.TryComplete(ex);
}

In the constructor, we create a bounded channel of the QueueItem type. The channel allows multiple producers to support multiple concurrent checkout requests to the Web API endpoint. The bounded channel limits the capacity of the channel such that producers will have to wait if the channel is full. The AddQueueItemAsync() method adds the item to the channel as long as there is a capacity to do so. Otherwise, it will asynchronously wait for space to be available.

The CheckoutProcessingChannel class is a singleton wrapper to a System.Threading.Channel instance that we use for communication between the producer and consumer:

builder.Services.AddSingleton<ICheckoutProcessingChannel, CheckoutProcessingChannel>();

A channel is a synchronization concept that supports passing data concurrently between producers and consumers. One of many producers can write data into the channel. Similarly, one or more consumers can read the same data from the channel.

Now, let’s inspect the ExecuteAsync() method from the ChannelBackgroundWorker class:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    await foreach (var item in _checkoutProcessingChannel.ReadAllAsync())
    { 
        await ProcessItemAsync(item);
    }
}

We read the data from the channel in the ExecuteAsync() method. We use a new feature introduced in C# 8.0 called async streams.

So, the async streams feature allows awaiting an asynchronous foreach loop. The ReadAllAsync() method in the CheckoutProcessingChannel wrapper class exposes an IAsyncEnumerable, which supports the await foreach syntax.

This allows awaiting asynchronously for the availability of new data from the channel. So, every time a new QueueItem instance is available in the channel, the AsyncEnumerator will trigger the foreach loop to run. Hence, the ProcessItemAsync() is executed every time a new queue item is available in the channel. This method implementation is the same as the last two previous sections.

Here also, we register the ChannelBackgroundWorker class in the DI container as the implementation of the IHostedService interface:

builder.Services.AddHostedService<ChannelBackgroundWorker>();

The Updated User Experience

Now, let’s send the same request as earlier to the /checkout endpoint which indicates the checkout of two cart items.

The Web API endpoint returns a successful response:

{
  "orderId": "c3b73ebf-f309-44b3-a5cb-a6530f1ce13b",
  "orderStatus": "Inprogress",
  "message": "Your order is in progress and you will receive an email with all details once the processing completes."
}

Request Duration
115 ms

Here, we can see that the endpoint takes less time to return a response to the end-user. Of course, this means that our app handles long-running tasks much faster now. So, the rest of the checkout processing happens at its own pace in the background. This is evident from the console logs entries:

info: Monolith.ShoppingCartApi.Services.StockValidator[0]
      Stock is validated.
info: Monolith.ShoppingCartApi.Services.TaxCalculator[0]
      Customer lookup completed for customer 3fa85f64-5717-4562-b3fc-2c963f66afa6.
info: Monolith.ShoppingCartApi.Services.TaxCalculator[0]
      Tax value calculated for customer is 32.
info: Monolith.ShoppingCartApi.Services.PaymentProcessor[0]
      Payment of 192 has been processed for customer 3fa85f64-5717-4562-b3fc-2c963f66afa6
info: Monolith.ShoppingCartApi.Services.ReceiptGenerator[0]
      Receipt Generated and Order Status persisted in DB.
info: Monolith.ShoppingCartApi.Services.ReceiptGenerator[0]
      Email is sent with Order Status and receipt.

Conclusion

This article covers the different ways to process long-running tasks in an ASP.NET Core Monolithic application. However, in the next article, we will see the processing of a long-running task in a microservices architecture using a messaging platform like RabbitMQ.

Liked it? Take a second to support Code Maze on Patreon and get the ad free reading experience!
Become a patron at Patreon!