When building applications, usually we need to create long-running background tasks, that can be sent to one or more background processes. This is known as a producer/consumer pattern. For these scenarios, we need something more robust than a first-in, first-out queue, but we don’t need the complexity and infrastructure involved in a message broker such as RabbitMQ, which is where Channels comes in.
Let’s dive into Channels!
What are Channels?
Channels are a part of the .NET Core base library, meaning they are available to use without any package dependencies required. A Channel is a data structure to store data from a Producer, which can then be consumed by one or more Consumers.
It is important to note, that unlike a Publisher/Subscriber (Pub/Sub) model, where a message produced can be received by one or more Subscribers concurrently, Channels only allow for one Consumer to read a given message.
Let’s visualize this better with a diagram:
With this basic understanding of channels in mind, let’s look at the classes provided in the library, along with some of the features.
Bounded and Unbounded Channels
There are two variations we can specify when creating a channel, bounded and unbounded:
public static class Channel { public static Channel<T> CreateBounded(int capacity); public static Channel<T> CreateBounded(BoundedChannelOptions options); public static Channel<T> CreateUnbounded(); public static Channel<T> CreateUnbounded(UnboundedChannelOptions options); }
The CreateBounded()
method takes a parameter, capacity
, which is used to determine the number of items that can be in the channel at any one time. There is an overload for this method that takes a BoundedChannelOptions
parameter. This allows us to define some further constraints on the channel, such as what we want to happen when the channel is full. We can choose to drop the message completely or request that the producer waits until the channel has capacity before publishing its item.
Unlike the bounded Channel, the CreateUnbounded()
method provides us with a channel with no capacity limits. This gives us more flexibility but can cause issues if we are publishing a large number of items, as the channel could run into memory constraints.
There is also an overload to this method that takes an UnboundedChannelOptions
parameter. With this, we can specify if we want only a single producer to write to the channel, and similarly if we only want a single consumer of the items.
ChannelWriter (Producer)
The ChannelWriter<T>
abstract class is used by Producers to send messages into a channel to be consumed. Let’s look at some of the methods provided by the class:
public abstract class ChannelWriter<T> { public abstract bool TryWrite(T item); public abstract ValueTask<bool> WaitToWriteAysnc(CancelleationToken cancellationToken = default); public virtual ValueTask WriteAsync(T item, CancellationToken cancellationToken = default); public virtual bool TryComplete(Exception? error = null); }
The first and simplest method TryWrite()
will attempt to add the item passed in the parameter to the channel. Note, however, that it returns a bool
. If we call the TryWrite()
method and the channel is full, for example, if it’s a bounded channel with constraints, the item will not be added, and the method will return false.
Next, we have the WaitToWriteAsync()
method. This is an asynchronous method that we can use to determine if the channel is safe to write to. We would use this in scenarios where it is expensive to create the item the producer sends to the channel.
The WriteAsync()
method is similar to the TryWrite()
method, however, it is asynchronous, meaning the producer can await the result of adding an item to the channel before continuing. Unlike the other two methods, this is marked as virtual
, meaning there is a default implementation if we don’t wish to provide our own.
Finally, we’ll look at the TryComplete()
method. This attempts to notify the channel that it can close, but does not immediately close. Instead, once the channel releases all items to consumers, it will then close.
ChannelReader (Consumer)
ChannelReader<T>
is the class Consumers use to receive items from the channel. Most of the methods are the read variant of the ChannelWriter
class, but there are a couple of differentiations worth mentioning:
public abstract class ChannelReader<T> { public virtual bool TryPeek(out T item) public abstract bool TryRead(out T item); public virtual IAsyncEnumerable<T> ReadAllAsync(CancellationToken cancellationToken = default); }
Instead of providing just a TryRead()
method, there is also the TryPeek()
method. This allows us to view an item from the channel if it exists but doesn’t consume and remove the item from the channel.
The ReadAllAsync()
method, as the name suggests, allows us to read all the items from the channel. This returns an IAsyncEnumerable<T>
, which means we can stream the items from the channel asynchronously.
Now we understand the basics of writing to and reading from a channel, let’s create an application to see channels in action.
Create a Producer-Consumer Application with Channels
We’ll start with a simple console application, using either Visual Studio or dotnet new console
to scaffold the application.
Create Unbounded Channel
We’ll start with an unbounded channel. We don’t have to worry about capacity or other constraints with this channel type.
In the Program class, let’s create our channel:
var unboundedChannel = Channel.CreateUnbounded<string>();
That was simple!
To-do Producer
Next, we’ll create our Producer class:
public class Producer { private static readonly List<string> _todoItems = new() { "Make a coffee", "Read CodeMaze articles", "Go for a run", "Eat lunch" }; private readonly ChannelWriter<string> _channelWriter; public Producer(ChannelWriter<string> channelWriter) { _channelWriter = channelWriter; } public async Task ProduceWorkAsync() { foreach (var todo in _todoItems) { if (_channelWriter.TryWrite(todo)) { Console.WriteLine($"Added todo: '{todo}' to channel"); } await Task.Delay(500); } _channelWriter.Complete(); } }
The Producer
class takes a ChannelWriter<string>
in the constructor, which we’ll use to publish new items to our unbounded channel.
The ProduceWorkAsync()
method is where we write our _todoItems
list to the channel by calling the TryWrite()
method. We wrap this call in an if block as depending on the constraints of the channel, we may not be able to successfully write the item.
When all our items are published, we call the Complete()
method to let the channel know we have no more items to publish.
To-Do Consumer
Next up, the consumer:
public class Consumer { private readonly ChannelReader<string> _channelReader; public Consumer(ChannelReader<string> channelReader) { _channelReader = channelReader; } public async Task ConsumeWorkAsync() { try { while (true) { var todo = await _channelReader.ReadAsync(); Console.WriteLine($"Completing todo: {todo}"); await Task.Delay(1500); } } catch (ChannelClosedException) { Console.WriteLine("Channel was closed"); } } }
Our Consumer
class takes a ChannelReader<string>
in the constructor. The ConsumeWorkAsync()
method is where we listen and process the to-do items from the channel. We make use of the ReadAsync()
method to retrieve items from the channel. We must wrap our logic in a try-catch block and catch a ChannelClosedException
, as the channel will already be closed by the producer while our consumer is still trying to read items.
Back in the Program
class, let’s inject the ChannelWriter
and ChannelReader
parameters, and produce some items:
var producer = new Producer(unboundedChannel.Writer); var consumer = new Consumer(unboundedChannel.Reader); _ = Task.Factory.StartNew(async () => { await producer.ProduceWorkAsync(); } await consumer.ConsumeWorkAsync();
Here, we run our producer on a background thread using Task.Factory.StartNew, and await our consumer on the main thread.
Running our application, we see our producer adding the to-do items to the channel, and subsequently consumed:
Added todo: 'Make a coffee' to channel Completing todo: Make a coffee Added todo: 'Read CodeMaze articles' to channel Completing todo: Read CodeMaze articles Added todo: 'Go for a run' to channel Added todo: 'Eat lunch' to channel Completing todo: Go for a run Completing todo: Eat lunch Channel was closed
As we have run our producer on a background thread, our consumer can asynchronously read items from the channel.
Now we have a grasp on unbounded channels, producers, and consumers, let’s look at using a bounded channel.
Bounded Channel Alternative
Creating a bounded channel is very similar to an unbounded channel:
var boundedChannel = Channel.CreateBounded<string>(1);
Here, we create a bounded channel with a capacity of 1. With 4 items on our to-do list, let’s look at what happens now. We must remember to inject our bounded writer and reader classes into our producer and consumer:
var producer = new Producer(boundedChannel.Writer); var consumer = new Consumer(boundedChannel.Reader);
Running our application, we see that our producer doesn’t write all items to the channel:
Added todo: 'Make a coffee' to channel Completing todo: Make a coffee Added todo: 'Read CodeMaze articles' to channel Completing todo: Read CodeMaze articles Channel was closed
Why is this?
There are two aspects to this. First and foremost, we set the capacity to 1. This means we can only ever have 1 item sitting in the channel. If we call the TryWrite()
method when there is an item in the channel, it will not be written and false
will be returned.
Furthermore, earlier we discussed the BoundedChannelOptions
class, which can be passed as a parameter when creating a bounded channel. Let’s look at this class next.
BoundedChannelOptions
The BoundedChannelOptions
class includes a property, FullMode
, which is an enum of type BoundedChannelFullMode
:
public enum BoundedChannelFullMode { Wait, DropNewest, DropOldest, DropWrite }
By default, when we create a bounded channel, it sets FullMode
to BoundedChannelFullMode.Wait
. This means a producer will wait for capacity to become available before adding an item to the channel. As we use the TryWrite()
method in our producer, we do not wait for capacity to be freed up before attempting to write to the channel.
Let’s change the FullMode
property and see how that affects our application:
var boundedChannel = Channel.CreateBounded<string>(new BoundedChannelOptions(1) { FullMode = BoundedChannelFullMode.DropOldest });
We still define a capacity as a constructor parameter on the BoundedChannelOptions
class. By setting the FullMode
property to DropOldest
, we tell the channel to drop the oldest item from the channel when a producer attempts to write a new item.
Let’s see how this changes our application:
Added todo: 'Make a coffee' to channel Completing todo: Make a coffee Added todo: 'Read CodeMaze articles' to channel Added todo: 'Go for a run' to channel Added todo: 'Eat lunch' to channel Completing todo: Eat lunch Channel was closed
As expected, our producer writes all items to the channel. However, our consumer doesn’t receive all the items as our producer is adding items to the channel faster than our consumer reads them. We’d end up with similar outcomes if we chose one of the other enum values.
We can refactor our code so we no longer see these outcomes, so let’s look at that next.
Refactor Our Producer and Consumer
Fortunately, we can refactor a lot of our current code to make it easier to understand and maintain. Let’s start with our producer once again:
public async Task ProduceWorkAsync() { foreach (var todo in _todoItems) { await _channelWriter.WriteAsync(todo); Console.WriteLine($"Added todo: '{todo}' to channel"); await Task.Delay(500); } _channelWriter.Complete(); }
We simply use the WriteAsync()
method, which under the hood will call the WaitToWriteAsync()
method to ensure items can safely be written to the channel.
Next up, our consumer. We can refactor our consumer logic using the ReadAllAsync()
method:
public async Task ConsumeWorkAsync() { await foreach (var todo in _channelReader.ReadAllAsync()) { Console.WriteLine($"Completing todo: {todo}"); await Task.Delay(1500); } Console.WriteLine("All items read"); }
Using the ReadAllAsync()
method allows us to make use of async streams, meaning we no longer need a while loop or the try-catch block. When the channel is closed by the producer, the consumer will exit the foreach
loop and write to the console that all items have been read.
This is much cleaner and easier to maintain. Let’s set our bounded channel back to the default mode of Wait:
var boundedChannel = Channel.CreateBounded<string>(new BoundedChannelOptions(1) { FullMode = BoundedChannelFullMode.Wait });
Now when we run our application, we see all our to-dos processed as expected:
Added todo: 'Make a coffee' to channel Completing todo: Make a coffee Added todo: 'Read CodeMaze articles' to channel Completing todo: Read CodeMaze articles Added todo: 'Go for a run' to channel Completing todo: Go for a run Added todo: 'Eat lunch' to channel Completing todo: Eat lunch All items read
Also, we no longer have to deal with any exceptions when the channel closes.
Conclusion
Great! We covered quite a lot of detail about the producer and consumer applications by using channels in .NET. We looked at the two different types, unbounded and bounded, as well as writing and reading from these channel types. Finally, we improved our code to be more maintainable and to handle constraints around bounded channels. Channels are a great feature that allows us to create lightweight publisher/consumer applications without needing external infrastructure.