We’ve already written about a regular Dictionary in C#. We’ve also explored the basics of ConcurrentDictionary as one of many Concurrent Collections in C#. In this article, we will go into more detail and explore the use cases in which ConcurrentDictionary can be useful to us.
Without further ado, let’s start slowly and work our way to more complex scenarios.
Basic Scenario
Even the simplest examples of real-life concurrent solutions are usually pretty complex and threaten to take the focus to the subject of parallel programming. To avoid this, we will focus on the critical components and reduce everything else.
So, our examples will have:
- shared state in the dictionary
- concurrent processing steps that update the shared state
Let’s first try the naive approach:
public class NaiveExample { public const int MaxIterations = 100; public const int MaxStateEntries = 10; public const int ProcessingSteps = 40; private Dictionary<int, int> _sharedState = new(); public void Run() { Parallel.ForEach(Enumerable.Range(0, ProcessingSteps), ProcessingStep); PrintState(); } private void ProcessingStep(int stepNumber) { for (int iteration = 0; iteration < MaxIterations; iteration++) { var entryKey = iteration % MaxStateEntries; if (!_sharedState.TryGetValue(entryKey, out int entryValue)) { entryValue = 0; } _sharedState[entryKey] = entryValue + 1; } } private void PrintState() { foreach (var entry in _sharedState) { Console.WriteLine($"{entry.Key}\t{entry.Value}"); } } }
This class has an internal state represented by a simple Dictionary
. Its processing requires multiple steps which can execute concurrently. Each step needs to modify the state based on its current value (in this example it increments the entry value).
When we execute this example there is a high probability that we’ll see this exception:
Unhandled exception. System.AggregateException: One or more errors occurred. (Destination array was not long enough. Check the destination index, length, and the array's lower bounds. (Parameter 'destinationArray')) ...
A little later in that very long string, we can also see the call stack of the inner exception:
System.ArgumentException: Destination array was not long enough. Check the destination index, length, and the array's lower bounds. (Parameter 'destinationArray') at System.Array.Copy(Array sourceArray, Int32 sourceIndex, Array destinationArray, Int32 destinationIndex, Int32 length, Boolean reliable) at System.Array.Copy(Array sourceArray, Array destinationArray, Int32 length) at System.Collections.Generic.Dictionary`2.Resize(Int32 newSize, Boolean forceNewHashCodes) at System.Collections.Generic.Dictionary`2.Resize() at System.Collections.Generic.Dictionary`2.TryInsert(TKey key, TValue value, InsertionBehavior behavior) at System.Collections.Generic.Dictionary`2.set_Item(TKey key, TValue value)
So what happens here is we try to insert an item into the dictionary and if thereās no room for another entry, the dictionary needs to resize. However, in the middle of the resize operation, some other task gets CPU time. It finds some of the internal structures already updated. This results in inconsistent internal array bounds.
Can We Avoid Resizing ConcurrentDictionary?
We may be tempted to size the dictionary properly to avoid this situation in the first place. Let’s change just the highlighted line in our previous example:
private Dictionary<int, int> _sharedState = new(MaxStateEntries);
And execute to see the output similar to this:
0 361 1 355 2 350 3 351 4 350 5 352 6 355 7 340 8 344 9 354
Now, since we have 40 processing steps that increment each of the 10 entries 10 times, we would expect each entry to end at 400 (40 x 10 = 400).
However, not one of the entries has that value.
We suspect that from the time the value is read from the dictionary (_sharedState.TryGetValue(entryKey, out int entryValue)
), before the new value is calculated and stored with the same key (_sharedState[entryKey] = entryValue + 1
), another processing step manages to update the value, and that change is lost.
What Else Can We Try?
Surely we could avoid this problem if we reduce the time between reading and updating the value.
Let’s change our example to use the increment operator (++
) to update it. We’ll even initialize the entries to reduce the chance of overwriting when the processing steps add the initial value:
public class SecondExample { public const int MaxIterations = 100; public const int MaxStateEntries = 10; public const int ProcessingSteps = 40; private Dictionary<int, int> _sharedState = new(MaxStateEntries) { { 0, 0 }, { 1, 0 }, { 2, 0 }, { 3, 0 }, { 4, 0 }, { 5, 0 }, { 6, 0 }, { 7, 0 }, { 8, 0 }, { 9, 0 }, }; public void Run() { Parallel.ForEach(Enumerable.Range(0, ProcessingSteps), ProcessingStep); PrintState(); } private void ProcessingStep(int stepNumber) { for (int iteration = 0; iteration < MaxIterations; iteration++) { var entryKey = iteration % MaxStateEntries; _sharedState[entryKey]++; Thread.Sleep(1); } } private void PrintState() { foreach (var entry in _sharedState) { Console.WriteLine($"{entry.Key}\t{entry.Value}"); } } }
Alas, this approach didn’t help either because we still get a similar output. All or most of the entries have a value of less than 400.
ConcurrentDictionary to the Rescue
But we are not naive. We know that these attempts are futile. The right solution is either to synchronize the access to the dictionary or to use ConcurrentDictionary
:
public class BetterApproach { public const int MaxIterations = 100; public const int MaxStateEntries = 10; publicĀ constĀ intĀ ProcessingStepsĀ =Ā 40; private ConcurrentDictionary<int, int> _sharedState = new(); public void Run() { Parallel.ForEach(Enumerable.Range(0, ProcessingSteps), ProcessingStep); PrintState(); } private void ProcessingStep(int stepNumber) { for (int iteration = 0; iteration < MaxIterations; iteration++) { var entryKey = iteration % MaxStateEntries; _sharedState.AddOrUpdate( entryKey, // the key that we want to update key => 1, // initial value factory, in case the key is not found (key, currentValue) => currentValue + 1); // updated value factory, in case the key already exists } } private void PrintState() { foreach (var entry in _sharedState) { Console.WriteLine($"{entry.Key}\t{entry.Value}"); } } }
We see that we needed to adjust how we used the shared state. The if
statement that decides when to initialize and when to update seems to be inside the ConcurrentDictionary – now we provide the initial value factory and updated value factory. We don’t even set the initial size.
This time around, however, we get consistent results each time:
0 400 1 400 2 400 3 400 4 400 5 400 6 400 7 400 8 400 9 400
Every entry has the expected value which is just what we wanted to accomplish.
How the ConcurrentDictionary Ensures Internal Consistency
Now let’s see how ConcurrentDictionary
achieves this. The obvious solution is to use locks to synchronize access to the entire dictionary. However, this would cause high contention on that lock and effectively reduce our example to a very complicated single-thread solution.
Luckily, its creators introduced multiple locks that protect access to various buckets and minimize locking scope. If different threads access buckets that map to different locks, they can work simultaneously.Ā
The frequency of these situations (without blocking) depends on the ratio of buckets to locks, which can change during the lifetime of the ConcurrentDictionary
. If we do not set it explicitly via the ConcurrencyLevel
constructor parameter, it will create the same number of locks as there are processors on the system. Every time the dictionary decides to resize, it will double the number of locks until it reaches its maximum (1024).Ā
By locking one or more of these locks, ConcurrentDictionary
ensures the consistency of its state and internal structures, but still allows a certain number of concurrent operations where possible.
But, enough of these details. If you want to read more about it, head out to the reference source. Armed with this knowledge, let’s explore antipatterns for using a ConcurrentDictionary
.
Volatile State
Now that we know how ConcurrentDictionary
protects its inner state, we can also think of some scenarios in which it can’t help us. Any action that would take two operations on a ConcurrentDictionary
will still suffer from the same issues.
Let’s pretend we didn’t follow through, and that we still use the same method to update values:
public class AntipatternExample { public const int MaxIterations = 100; public const int MaxStateEntries = 10; publicĀ constĀ intĀ ProcessingStepsĀ =Ā 40; private ConcurrentDictionary<int, int> _sharedState = new(); public void Run() { Parallel.ForEach(Enumerable.Range(0, ProcessingSteps), ProcessingStep); PrintState(); } private void ProcessingStep(int stepNumber) { for (int iteration = 0; iteration < MaxIterations; iteration++) { var entryKey = iteration % MaxStateEntries; if (!_sharedState.TryGetValue(entryKey, out int entryValue)) { _sharedState[entryKey] = 0; } _sharedState[entryKey]++; Thread.Sleep(1); } } private void PrintState() { foreach (var entry in _sharedState) { Console.WriteLine($"{entry.Key}\t{entry.Value}"); } } }
The output will demonstrate that some updates get overwritten:
0 303 1 279 2 310 3 309 4 308 5 318 6 287 7 306 8 286 9 288
A similar thing happens in other scenarios that may not be so obvious. For example, if we first call Count
and assert it is greater than 0, a successive call to First()
may throw an exception. Another example is calling Contains()
for a particular key and if it returns true
, there is still a possibility that getting a value with the same key using the indexer (dictionary[key]
) will throw an exception.
This is why a ConcurrentDictionary
offers methods that take this into account, such as TryRemove()
and TryUpdate()
, which underline the possibility that the given key may not be present anymore. This is also why it sometimes needs to take all locks for some operations.
ConcurrentDictionary Lock Escalation
Having in mind that the main advantage of a ConcurrentDictionary
is finer-grained locks that still allow concurrent access to data in different buckets, we should consider scenarios when it still needs all locks to ensure consistency.
Let’s imagine that we need to detect which processing step finds the empty shared state. There are many ways to do this, but let’s suppose we’ve chosen LINQ method Any()
:
public class ContentionExample { public const int MaxIterations = 100000; public const int MaxStateEntries = 10; public const int ProcessingSteps = 20; public IEnumerable<int> State => _sharedState.Values; private ConcurrentDictionary<int, int> _sharedState = new(); private Func<bool> CheckStateIsEmpty = () => false; public void Run() { var stopwatch = Stopwatch.StartNew(); RunWithEntryCheck(); Console.WriteLine($"\nChecking entries processing took {stopwatch.Elapsed.TotalSeconds:N2} s.\n"); stopwatch.Restart(); RunWithKeysCheck(); Console.WriteLine($"\nChecking keys processing took {stopwatch.Elapsed.TotalSeconds:N2} s.\n"); } public void RunWithKeysCheck() { _sharedState = new(); CheckStateIsEmpty = () => !_sharedState.Keys.Any(); Parallel.ForEach(Enumerable.Range(0, ProcessingSteps), ProcessingStep); } public void RunWithEntryCheck() { _sharedState = new(); CheckStateIsEmpty = () => !_sharedState.Any(); Parallel.ForEach(Enumerable.Range(0, ProcessingSteps), ProcessingStep); } private void ProcessingStep(int stepNumber) { int emptyHits = 0; for (int iteration = 0; iteration < MaxIterations; iteration++) { var entryKey = iteration % MaxStateEntries; if (CheckStateIsEmpty()) { emptyHits++; } _sharedState.AddOrUpdate( entryKey, key => 1, (key, currentValue) => currentValue + 1); } } }
Highlighted lines show the most important changes. We’ve increased the number of iterations and we measure and print running times for two very similar approaches. One approach checks are there Any()
entries in the dictionary and other checks for Any()
keys.
We can expect an output similar to this:
Checking entries processing took 1.44 s. Checking keys processing took 1.72 s.
Results may vary, but most of the time we can see that the approach that checks the keys collection takes more time.
If we inspect the reference source and compare the two sources (GetEnumerator()
and Keys
), we will spot one big difference – a call to the AcquireAllLocks()
method. By investigating further, we find some other methods that will also take all locks: Clear()
, CopyTo(array)
, ToArray()
, Count
, IsEmpty
, Keys
, Values
. One of them (IsEmpty
) even offers a way faster way of checking if the dictionary is empty compared with the approach we used in this example, even with all the locks it takes (but that’s another topic).
As with any other contention scenario, the slowdown this can cause greatly depends on the number of threads competing for the dictionary. So before you make any decisions about using or avoiding some of these methods, make sure you thoroughly test them with similar conditions that you expect in the production environment.
Possible ConcurrentDictionary Memory Leak
As we have already seen, to fully use the ConcurrentDictionary
we should rely on its compound AddOrUpdate()
method. It takes into account that until the very last moment, it’s not sure whether it will use the initial value or new value based on some existing. That’s why it expects us to provide a factory method for both scenarios.
Let’s explore how we can use these factories in practice:
public class MemoryLeakExample { public const int MaxIterations = 100; public const int MaxStateEntries = 10; public const int ProcessingSteps = 20; public IEnumerable<int> State => _sharedState.Values; public int[] NewValueCreated { get; private set; } = new int[MaxStateEntries]; public int[] UpdateValueCreated { get; private set; } = new int[MaxStateEntries]; private ConcurrentDictionary<int, int> _sharedState = new(); public void Run() { Parallel.ForEach(Enumerable.Range(0, ProcessingSteps), ProcessingStep); PrintState(); } private void ProcessingStep(int stepNumber) { for (int iteration = 0; iteration < MaxIterations; iteration++) { var entryKey = iteration % MaxStateEntries; _sharedState.AddOrUpdate( entryKey, InitialValueFactory, UpdatedValueFactory); } } private int UpdatedValueFactory(int key, int currentValue) { Thread.Sleep(3); Interlocked.Increment(ref UpdateValueCreated[key]); return currentValue + 1; } private int InitialValueFactory(int key) { Thread.Sleep(3); Interlocked.Increment(ref NewValueCreated[key]); return 1; } private void PrintState() { foreach (var entry in _sharedState) { Console.WriteLine($"{entry.Key}\t{entry.Value}"); } Console.WriteLine(new string('-', 20)); for (int key = 0; key < MaxStateEntries; key++) { Console.WriteLine($@"Initial value for key [{key}] created {NewValueCreated[key]} times."); } Console.WriteLine(new string('-', 20)); for (int key = 0; key < MaxStateEntries; key++) { Console.WriteLine($@"Update value for key [{key}] created {UpdateValueCreated[key]} times."); } } }
We’ve extracted these factory methods and added counters to track the number of created values. We also added Thread.Sleep(3)
to simulate some work required to construct the value. If you are not familiar with the Interlocked.Increment()
function don’t worry. It will increment the given value atomically without any locking.
When we run this example we can see a lot more text on the output.Ā
0 200 1 200 2 200 3 200 4 200 5 200 6 200 7 200 8 200 9 200 -------------------- Initial value for key [0] created 20 times. Initial value for key [1] created 1 times. Initial value for key [2] created 2 times. Initial value for key [3] created 1 times. Initial value for key [4] created 1 times. Initial value for key [5] created 1 times. Initial value for key [6] created 1 times. Initial value for key [7] created 1 times. Initial value for key [8] created 1 times. Initial value for key [9] created 1 times. -------------------- Update value for key [0] created 615 times. Update value for key [1] created 424 times. Update value for key [2] created 441 times. Update value for key [3] created 453 times. Update value for key [4] created 434 times. Update value for key [5] created 449 times. Update value for key [6] created 424 times. Update value for key [7] created 432 times. Update value for key [8] created 454 times. Update value for key [9] created 449 times.
The first block shows us that the processing steps still complete all updates as expected. The second block, however, shows us that they invoked the initial value factory method more than once for some keys. Finally, the third block is the most surprising. We can see that the processing steps created more than twice the number of update values we expected.
In an example such as this, that’s not a problem, since we create initial values easily and unused values do not require additional work for disposal. But, in more complex examples in which we need to create unmanaged resources, we need to pay additional attention to properly dispose of unused values created by these factory methods. Otherwise, we can cause other problems with memory (handle) leaks.
Conclusion
As we have seen, ConcurrentDictionary in C# is a very straightforward alternative to a classic Dictionary when there is a shared state that we access from multiple concurrent threads. It minimizes the locking scope to allow greater concurrency while still keeping its internal state consistent. However, it does not solve all issues that arise in concurrent programming.
That’s why in some cases it may be simpler to protect the classic Dictionary by some other means, be it mutex, ReaderWriterLockSlim
, or something else.