In this article, we are going to explore PLINQ, which is the parallel implementation of LINQ. We are going to focus on when, and how to efficiently use it.
If you need a refresher on LINQ, you can read LINQ Basic Concepts in C# article as well.
Let’s start!
What is Parallel LINQ (PLINQ)?
We can safely say that PLINQ is an extension of LINQ, developed to improve our queries’ performance. With LINQ, we execute our queries in a sequential mode. This means that we process the elements in the source set one after the other. While this is typically not a performance issue, it may become one when the processing involves many CPU-bound operations. PLINQ comes to the rescue and allows us to parallelize the work on the available processors in our system.
Why doesn’t LINQ parallelize the query execution natively?
Parallel programming is a double-edged sword. If we use it incorrectly, we could do more harm than good. For this reason, choosing whether to parallelize the execution of a query is our responsibility. If we choose to enable PLINQ, we must be aware that doesn’t come for free.
For the work to be parallelized, the source set has to be split into partitions. A partitioning algorithm assigns each data partition to a thread. When all the elements in the source set have been processed, results may be merged. Data partitioning and merging the results considerably impact the overall execution time.
Moreover, the parallel execution of a query is safe only under the assumption of delegates’ independency. PLINQ cannot replace LINQ when the delegate reads and modifies an external variable, for example. In this case, to use PLINQ, we need to make our code thread-safe. With LINQ, this is unnecessary because we have the guarantee that each element is processed sequentially.
Let’s see how to use it!
A Simple Example of PLINQ in Action
Let’s imagine we work in a company that sells computers. We have just built a bunch of new models that we want to put on the market. Let’s create a new Computer
class that represents them:
public class Computer { public int GbRam { get; init; } public int GbSecondaryMemory { get; init; } public int MIPS { get; set; } }
Before a computer leaves the company, we need to perform a certain number of tests on it. Each test process returns the number of successful tests, but could take a lot of time:
public int Test(int testCount) { if (testCount > 0) Thread.Sleep(testCount); return new Random().Next() % (testCount + 1); }
With 50 computers and 20 tests for each one, this LINQ query takes at least 1 second:
computers.Sum(comp => comp.Test(20));
With PLINQ, we can execute it in much less time, by parallelizing the delegate of the Sum()
method and the sum itself:
computers.AsParallel().Sum(comp => comp.Test(20));
To enable PLINQ, we just need to call the AsParallel()
method on an IEnumerable. This object is converted to a ParallelEnumerable
object, which will parallelize the sum execution. We can also observe that PLINQ APIs are fully compatible with those of LINQ. This also means that a parallel query grants deferred execution, just like a normal LINQ query.
We can even set the maximum number of concurrently executing threads used by the query. If we wanted to process the same query with only 2 threads:
computers.AsParallel().WithDegreeOfParallelism(2) .Sum(comp => comp.Test(20));
Adjusting the degree of parallelism is useful in 2 situations. We could want to reduce the number of spawn threads to avoid keeping all the cores busy on a single query. But we can also decide to prioritize the execution of our query by incrementing the degree of parallelism.
Nonetheless, using twice the number of threads doesn’t usually mean halving the execution time. The overhead caused by data partitioning and thread synchronization is higher and higher as the degree of parallelism increases.
Before diving deeper into the subject, we need to understand when PLINQ is better than LINQ.
How Do We Choose Between PLINQ and LINQ?
Choosing between PLINQ and LINQ is not always easy.
The first thing we should ask ourselves is: does that query need to be sped up? Premature or unnecessary optimization could cause our application to require more resources than necessary. We should keep in mind that using PLINQ means stealing some resources that can be devoted to other processes.
When to Avoid Using PLINQ
Let’s see an example where PLINQ is not suitable.
We have 50 new computers, but they are very similar to some models that we are already selling. In this case, the test phase is almost useless, which is why the testCount
parameter is 0:
computers.AsParallel().Sum(comp => comp.Test(0))
The overhead generated by parallelization is greater than the processing time of each element. By removing the AsParallel()
call, the execution time decreases. In general, if we can quickly execute the delegate and we don’t have too many elements, then we don’t need PLINQ.
The next step is to analyze the query to find the parts that are delightfully parallel. A query, or a part of it, is delightfully parallel when it lends itself easily to scheduling on multiple threads. A delegate that is completely independent of the outer scope is delightfully parallel. Our job is to maximize the delightfully parallel parts and minimize those that bring some synchronization overhead. After some performance tests, we should be able to tell whether PLINQ is the right choice.
It might be obvious to say, but we should consider the system specifics too. If we know in advance the load and the number of cores in the production environment, our tests should take into account those limits as well.
There are some situations where the use of PLINQ requires special care.
Don’t Parallelize Nested Loops
When we use nested loops, parallelizing both the outer and inner loops is typically inappropriate. Let’s see it with an example. We have 100 computer categories, and each computer should be tested according to its category rules:
Parallel.For(1, 100, i => { ( from comp in computers.AsParallel().Where(c => c.GetPrice() > i) select comp.Test(0) ).ToList(); });
This is a simplified example, but the problem is in the AsParallel()
call. The outer loop will probably use all of the available cores in the system. For this reason, the query, which represents the inner loop, should run sequentially. When possible, rewriting those loops as one, and then parallelizing it, is the best option we have. Otherwise, the loop to parallelize is the outer one.
Locks Decrease the Speed-up Gain
Another case of harming parallelization is when each element accesses an object protected by an exclusive lock. In our example, this can happen if we want to register each computer in the company network. The registration server processes one request at a time, so we are going to emulate it with:
public class Computer { private static readonly object networkObj = new object(); public IPAddress IPAddress { get; set; } = null!; // ... public string RegisterToNetwork() { lock (networkObj) { IPAddress = new IPAddress(new Random().Next()); return IPAddress.ToString(); } } }
If we wanted to find all the received IP addresses, this query would be inefficient:
from comp in computers.AsParallel() select Computer.RegisterToNetwork()
Again, this parallelized version is inefficient because the involved threads cannot access the RegisterToNetwork()
method independently. A sequential query would perform better because we don’t have the overhead caused by the synchronization on the exclusive lock. In a more realistic scenario, the lock groups only the strictly necessary statements, not the entire method body. For this reason, we should make some tests to determine the possible speed-up gain of a parallel query.
Parallelize Only CPU-Bound Operations
It is very important to distinguish between CPU-bound and I/O-bound operations. For the next query, we want to ping the registered computers to find out if they are reachable. Let’s create a dummy Ping()
method:
public async Task<int> Ping() { await Task.Delay(30); return 0; }
In the actual implementation, the created task would wait for the ping reply instead of a simple Task.Delay()
method call. We can write a PLINQ version of the query:
computers.AsParallel().Select(comp => comp.Ping().Result)
The problem is that we are synchronously executing many asynchronous tasks in parallel. We should use PLINQ when we deal with CPU-bound operations. On the other hand, asynchronous programming is the correct way to handle I/O-bound operations, like this one. Let’s rewrite the query using the async/await pattern:
await Task.WhenAll(computers.Select(comp => comp.Ping()))
Take a look at this article where we talk about Executing Multiple Tasks Asynchronously to find out how the Task.WhenAll()
method works!
Even if we choose to enable PLINQ with the AsParallel()
method, the query might still be executed sequentially. This could happen because PLINQ looks at the query shape and tries to match it with some predefined ones. If there is a match, PLINQ thinks that the parallel version of the query would probably be slower than the sequential one. At that point, PLINQ executes the query sequentially unless you force the parallel execution with:
enumerable.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism)
We are now ready to explore the depths of PLINQ!
How Do MergeOptions Change the Perceived Execution Time?
PLINQ allows us to adjust the perceived execution time of our queries. When we need our query execution to finish as soon as possible, then the execution time is the priority. On the other hand, if we want to reduce the waiting time before the caller receives the first result, then we have to prioritize the latency. Let’s see some possible scenarios.
In our company, there is a dashboard from where we can run the tests on the newly produced computers. This testing phase could last for hours, but we would like to see the results of each computer once they are ready. In this case, we need to minimize the latency:
from comp in computers .AsParallel() .WithMergeOptions(ParallelMergeOptions.NotBuffered) select comp.Test(10)
With the ParallelMergeOptions.NotBuffered
option, PLINQ doesn’t buffer the results when enumerating the query. This way, we get lower latency but a slightly higher overall execution time. Piping results one by one involves some overhead.
Buffering the Query Results
In case we wanted to create a report from the tests’ results, the most logical choice would be to buffer them:
var query = from comp in computers .AsParallel() .WithMergeOptions(ParallelMergeOptions.FullyBuffered) select comp.Test(1); // build report
With the ParallelMergeOptions.FullyBuffered
option, we get the highest latency, but the lowest execution time. If the report has to show the median of the tests’ results, we need to wait for all the tests’ results anyway. The aggregation operations, like Sum()
or Average()
, overwrite the specified ParallelMergeOptions
with FullyBuffered
. After all, it is the only option that makes sense when aggregating.
The last option, which is also the default one, is:
from comp in computers .AsParallel() .WithMergeOptions(ParallelMergeOptions.AutoBuffered) select comp.Test(1);
ParallelMergeOptions.AutoBuffered
is a tradeoff between FullyBuffered
and NotBuffered
because results are returned in chunks. Unfortunately, the chunk size is internally computed and we cannot change it in our queries.
If we need to measure the latency of a query, first we get its enumerator, then we request the first element:
query.GetEnumerator().MoveNext()
By measuring the execution time of the above line, we get the actual latency of the query.
We have seen that enabling PLINQ is easy, but can we go back to LINQ with the same ease?
Mixing PLINQ and LINQ to Get the Best Results
We can use the AsSequential()
method to reverse a parallel query to a sequential one. Given a ParallelQuery
object, the AsSequential()
method casts it to an IEnumerable
object. Let’s see an example of where the AsSequential()
method could come in handy.
We calculate the selling prices of our computers using this formula:
(GbRam * 10) + (GbSecondaryMemory * 0.1) + (MIPS * 0.01)
According to the internal policies, we have to calculate the prices twice, to make sure there is no mistake. We have just noticed that many computer models share the same components, and therefore the same price. Thus, we decide to create a cache that has the keys in the form (GbRam, GbSecondaryMemory, MIPS)
, and the price as value:
var cache = new ConcurrentDictionary<Tuple<int, int, int>, double>();
This cache is a ConcurrentDictionary
because we are going to access it in a parallel query:
var query = computers.AsParallel().Select(comp => { var key = Tuple.Create(comp.GbRam, comp.GbSecondaryMemory, comp.MIPS); return cache.GetOrAdd(key, k => comp.GetPrice()); });
The second time we run this query, we don’t need to execute it in parallel because all the elements have already been cached. Parallel execution is inconvenient because there won’t be any cache miss, making the delegate very fast. We can re-execute the query with:
query.AsSequential().ToList();
The problem is that this parallel query doesn’t preserve the order of the original elements. This means that we cannot compare the results of the 2 queries because they are equivalent, but not equal. In other words, they contain the same elements, but in any order.
Preserving the Elements’ Order With AsOrdered
With parallel execution, each thread operates on a different data partition. Unfortunately, when we merge the results, we completely lose the partitions’ original order. Regarding the previous example, the order of the prices is non-deterministic. We can change this behavior by calling the AsOrdered()
method:
from comp in computers.AsParallel().AsOrdered() select comp.GetPrice();
We are processing the elements in the source set in parallel, but the final list preserves the original order. Before returning the prices, the merging component has an additional task. It has to make sure that they are sorted as if the query would have been executed sequentially.
Preserving the elements’ order incurs an overhead, although it is not so high. We should always try to minimize the parts in our query where the order is essential. We can remove this overhead, when not necessary, by calling the AsUnordered()
method on a ParallelEnumerable
. Be careful not to confuse AsOrdered()
and AsSequential()
. The output is the same, but the former executes the query in parallel, whereas the latter sequentially.
Let’s see how exception handling works!
How to Cancel Queries or Handle the Exceptions
Until now, we have assumed that our queries always terminate successfully. This is an ideal scenario because we might have to cancel them or handle the thrown exceptions.
As for canceling queries, PLINQ perfectly integrates with the cancellation pattern. We have written an article about Cancellation Tokens so you might want to read it if you need to learn how to use them in detail. With PLINQ, we can specify the cancellation token with the WithCancellation()
method.
In our company, the testing phase cannot last for more than 3 hours. After 3 hours we can stop the query using a cancellation token:
var TIME_LIMIT = 1000 * 60 * 60 * 3; // 3 hours CancellationTokenSource cts = new CancellationTokenSource(); var query = from comp in computers.AsParallel().WithCancellation(cts.Token) select comp.Test(100); Assert.ThrowsException<OperationCanceledException>(() => { var task = Task.Delay(TIME_LIMIT).ContinueWith(t => cts.Cancel()); query.ToList(); });
We have initially created a CancellationTokenSource
object that we are going to use to trigger the canceling action. Before running the query, we start a new task asynchronously, which represents the timeout for the query. After it completes, we cancel the parallel query by calling the Cancel()
method on the token source. We have written an article about Tasks vs Threads where we explain the ContinueWith()
method of the Task
class.
When a query is canceled, the thrown exception type is OperationCanceledException
. Let’s see what happens when we throw an exception in a parallel query.
How to Handle Exceptions in Parallel Queries
In a sequential query, only one exception can be thrown because we process one element at a time. In a parallel query, several threads could throw an exception at the same time. We don’t want to lose any of those exceptions, so how do we get them?
Let’s find this out with an example! In the computer company, we have a specific test phase for low-end computers. If we find some computers more expensive than 200$ in the source set, we must throw an ArgumentOutOfRangeException
exception:
var query = computers.AsParallel().Select(comp => { var price = comp.GetPrice(); if (price > 200) throw new ArgumentOutOfRangeException($"Price ({price}) should be lower than 200"); return comp.Test(1); });
Here is how we can handle them:
try { query.ToList(); } catch (AggregateException exc) { exc.Handle(ex => ex is ArgumentOutOfRangeException); }
With the Handle()
method, we return a boolean that specifies whether we have handled the exception. If the boolean value is false
, then the exception is re-thrown. When we throw an exception during the query execution, all the other threads stop working as soon as possible. Then PLINQ gathers all the exceptions the threads have thrown in the meantime if any. The returned AggregateException
contains all the thrown exceptions.
So far, we have seen 2 ways to run a parallel query: using the foreach
construct, or the ToList()
method. They are not the only ones.
Processing Query Output With foreach, ToList(), and ForAll()
We have 3 ways to process a PLINQ query.
The first one is the pipelining processing. PLINQ usually chooses this model when we enumerate the query results with the foreach
loop. In this case, all the threads but 1 work on the query execution. The remaining thread is the one that has called the GetEnumerator()
method on the query object. This thread synchronously waits for any available output element and returns it on a MoveNext()
call. We have many producers and a single consumer, and this could lead to uneven work distribution. The advantage is that we are reducing the memory needed to hold the results.
The second approach is stop-and-go. In this model, PLINQ exploits all the threads’ processing power, to create the output as soon as possible. This is slightly more efficient than pipelining because all the threads work until the entire source set is processed. Moreover, we don’t have the synchronization overhead caused by the producer/consumer messages exchange. The latency and memory usage are higher because we have to store the entire result set. When all the elements are ready, we can access them.
PLINQ uses this approach in very specific situations:
- With
ParallelMergeOptions.FullyBuffered
- When we call the
ToList()
/ToArray()
methods on a query object - When we sort results before returning them
The reason behind the first 2 cases is simple. It doesn’t make sense to have a consumer thread if we are specifically asking PLINQ to fully buffer the results. The last condition is reasonable as well. Sorting a sequence is an expensive and high-latency operation. Moreover, PLINQ should not return an element until the sequence is fully sorted.
Efficiently Enumerating Queries With ForAll
The last approach is the inverted enumeration. This is particularly efficient because it skips the merging phase. The results are returned as soon as they are ready, making this model the best one in terms of performance. Let’s see how to use it with an example.
The computer company can put computers on the market only after the testing phase:
var query = from comp in computers.AsParallel() select (comp, comp.Test(0)); query.ForAll(elem => elem.Item1.PutOnTheMarket());
With the ForAll()
method, we neither merge the results nor preserve their order. The PutOnTheMarket()
method is called on each result element as soon as it is available. The only overhead we have in this case comes from the threads’ synchronization. This is the reason why we should always use this method, whenever it is possible. We just have to be careful that the delegate in the ForAll()
method doesn’t rely on a shared state.
We can make our queries even more performant, let’s quickly see how!
Partitioning and Custom Aggregations to Speed Up PLINQ Queries
One of the factors that affect the parallelization performances is the partitioning method. The partitioning strategy chosen by PLINQ is normally very good, but we can create our own.
Why should we do that?
Mainly, by choosing the partitions to assign to each thread we can leverage the data similarity. For example, if each thread has a cache object and it processes similar elements, then the execution is certainly faster.
Another reason for implementing our custom partitioning method is to guarantee even work distribution. This is not easy unless you know your data source very well. Besides, it is reasonable to implement a new partitioning for PLINQ when we need it and the default partitioning strategy is inefficient.
Another useful method is the Aggregate()
method. We can use it in LINQ too, but PLINQ overloads it with some more complex but powerful definitions. We may want to create a custom aggregating function when the available aggregating functions are not suitable. With the Aggregate()
method, we can specify 2 types of delegates. The first one is executed on each partition, while the other one combines the partial results.
Conclusions
PLINQ is a powerful library that speeds up CPU-intensive queries. However, we should always measure the query performance before blindly enabling PLINQ. Understanding the APIs that this library provides is fundamental for the optimal execution of a parallel query.