To PLINQ or not to PLINQ, that is the question

by Kurt Evenepoel 13 June 2010 13:04

I’m sure you’ve heard of LINQ before.  And I’m sure you’re running on at least a dual-core machine laptop, or maybe you have 8 logical cores with your i7.  PLINQ, or Parallel LINQ is part of the Task Parallel Library (http://msdn.microsoft.com/en-us/library/dd460717.aspx) .  This new library is the new “preferred” way of dealing with multithreading as of .NET 4, and PLINQ is LINQ- but parallelized.

So what does this concept of parallelized mean?  Normally your code runs on a thread, and a thread only runs on one core.  So in order for you to benefit from your fancy multi-core system you need to run different processes or programs at the same time, and really run enough of those for you to benefit fully from your system.  Alternatively you could write a lot of code to divide work between the cores.  Now with PLINQ, you don’t need to do much to divide work between cores- if you can write it with LINQ you can divide the work easily by changing a little.  Depending on how you wrote it it can be as easy as adding ToParallel():

LINQ


(from num in Enumerable.Range(0, 100)
select num % 9).Aggregate((i1, i2) => i1 + i2);


PLINQ

(from num in Enumerable.Range(0, 100).AsParallel()
select num % 9).Aggregate((i1, i2) => i1 + i2);

You then use .ForAll() instead of a foreach loop to offload it to your multi-core machine.

Trivial work

But should you try to parallelize everything with PLINQ? You can guess that the answer is “no”.  Creating new threads takes time, so if you’re really doing very basic processing, parallelizing it will hurt your performance.  Let’s take a look at the following example:


public class TrivialWork
{
    // putting this to 1000000 will show that
    // aggregates can overflow without exception
    // returning an unexpected (possibly negative) value
    private const int ARRAYSIZE = 10000;

    public static void NotParallel()
    {
        var nums = Enumerable.Range(0, ARRAYSIZE);

        Stopwatch watch = Stopwatch.StartNew();
        long result = 0;
        foreach (var item in nums)
        {
            result += item * 2;
        }
        watch.Stop();
        Console.WriteLine("Not parallel            : {0} {1}", watch.Elapsed, result);
    }

    public static void ParallelLoadBalancingPartitioner()
    {
        // Static partitioning requires indexable source.
        var nums = Enumerable.Range(0, ARRAYSIZE).ToArray();

        Stopwatch watch = Stopwatch.StartNew();
        // Create a load-balancing partitioner. Or specify false for static partitioning.
        Partitioner<int> customPartitioner = Partitioner.Create(nums, true);

        // The partitioner is the query's data source.
        var q = from x in customPartitioner.AsParallel()
                select x * 2;

        long result = 0;

        q.ForAll((x) =>
        {
            Interlocked.Add(ref result, x);
        });

        watch.Stop();
        Console.WriteLine("Parallel, load balancing: {0} {1}", watch.Elapsed, result);
    }

    public static void Parallel()
    {

        var nums = Enumerable.Range(0, ARRAYSIZE);

        Stopwatch watch = Stopwatch.StartNew();
              
        // The partitioner is the query'
s data source.
        var q = from num in nums.AsParallel()
                select num * 2;

        long result = 0;

        q.ForAll((num) =>
        {
            Interlocked.Add(ref result, num);
        });

        watch.Stop();
        Console.WriteLine("Parallel                : {0} {1}", watch.Elapsed, result);
    }

    public static void ParallelAggregate()
    {
        var nums = ParallelEnumerable.Range(0, ARRAYSIZE);

        Stopwatch watch = Stopwatch.StartNew();

        var q = (from num in nums select num * 2).AsParallel().Aggregate((i, o) => i + o);

        watch.Stop();
        Console.WriteLine("Parallel, aggregate     : {0} {1}", watch.Elapsed, q);
    }

    public static void ParallelNotLoadBalancing()
    {

        // static partitioning requires indexable source.
        var nums = Enumerable.Range(0, ARRAYSIZE).ToArray();

        Stopwatch watch = Stopwatch.StartNew();

        Partitioner<int> customPartitioner = Partitioner.Create(nums, false);

        var q = from num in customPartitioner.AsParallel()
                select num * 2;

        long result = 0;

        q.ForAll((num) =>
        {
            Interlocked.Add(ref result, num);
        });

        watch.Stop();
        Console.WriteLine("Parallel, chunked       : {0} {1}", watch.Elapsed, result);
    }
}


In static void main:

// it's not always better
// to use parallel calls
Console.WriteLine("-- Trivial work --");
TrivialWork.NotParallel();
TrivialWork.Parallel();
TrivialWork.ParallelLoadBalancingPartitioner();
TrivialWork.ParallelNotLoadBalancing();
TrivialWork.ParallelAggregate();


When I run this on my i5 I get the following results (but wrote to Debug instead)

-- Trivial work --
Not parallel            : 00:00:00.0001491 99990000
Parallel                : 00:00:00.0240524 99990000
Parallel, load balancing: 00:00:00.0128164 99990000
Parallel, chunked       : 00:00:00.0025036 99990000
Parallel, aggregate     : 00:00:00.0158607 99990000


Notice that the non-parallel version is much faster when it comes to just making a sum out of things.  Trivial processing is best kept sequential. 
Also notice that aggregates are vulnerable to overflow: try adding a few zeroes to the array size.  This is not an issue specific to PLINQ but keep it in mind.
Now let’s take a look at a better example when to parallelize:

Intensive work


public class IntensiveWork
{
    // putting this to 1000000 will show that
    // aggregates can overflow without exception
    // returning an unexpected (possibly negative) value
    private const int ARRAYSIZE = 10000;

    public static void NotParallel()
    {

        // Static partitioning requires indexable source. Load balancing
        // can use any IEnumerable.
        var nums = Enumerable.Range(0, ARRAYSIZE).ToArray();

        Stopwatch watch = Stopwatch.StartNew();
        long result = 0;
        foreach (var item in nums)
        {
            int value = DoIntensiveWork(item);
            result += value;
        }
        watch.Stop();
        Console.WriteLine("Not parallel            : {0}", watch.Elapsed);
    }

    public static void ParallelLoadBalancingPartitioner()
    {

        // Static partitioning requires indexable source. Load balancing
        // can use any IEnumerable.
        var nums = Enumerable.Range(0, ARRAYSIZE).ToArray();

        Stopwatch watch = Stopwatch.StartNew();
        Partitioner<int> customPartitioner = Partitioner.Create(nums, true);
        var q = from num in customPartitioner.AsParallel()
                select DoIntensiveWork(num);

        long result = 0;
        q.ForAll((x) =>
        {
            Interlocked.Add(ref result, x);
        });
        watch.Stop();
        Console.WriteLine("Parallel, load balancing: {0}", watch.Elapsed);
    }

    public static void Parallel()
    {
        var nums = Enumerable.Range(0, ARRAYSIZE);

        Stopwatch watch = Stopwatch.StartNew();
        var q = from num in nums.AsParallel()
                select DoIntensiveWork(num);

        long result = 0;
        q.ForAll((num) =>
        {
            Interlocked.Add(ref result, num);
        });
        watch.Stop();
        Console.WriteLine("Parallel                : {0}", watch.Elapsed);
    }

    public static void ParallelAggregate()
    {
        var nums = ParallelEnumerable.Range(0, ARRAYSIZE);
        Stopwatch watch = Stopwatch.StartNew();
        var q = (from num in nums select DoIntensiveWork(num)).AsParallel().Aggregate((i, o) => i + o);
        watch.Stop();
        Console.WriteLine("Parallel, aggregate     : {0}", watch.Elapsed);
    }

    public static void ParallelNotLoadBalancing()
    {

        var nums = Enumerable.Range(0, ARRAYSIZE).ToArray();
        Stopwatch watch = Stopwatch.StartNew();
        Partitioner<int> customPartitioner = Partitioner.Create(nums, false);

        // The partitioner is the query's data source.
        var q = from num in customPartitioner.AsParallel()
                select DoIntensiveWork(num);

        long result = 0;

        q.ForAll((num) =>
        {
            Interlocked.Add(ref result, num);
        });

        watch.Stop();
        Console.WriteLine("Parallel, chunked       : {0}", watch.Elapsed);
    }

    public static int DoIntensiveWork(int item)
    {
        Random rnd = new Random();
        int value = item;
        for (int i = 0; i < 200; ++i)
        {
            value += rnd.Next(10) / 2;
        }
        return value;
    }
}


In your static void main:

Console.WriteLine("-- Intensive work --");
IntensiveWork.NotParallel();
IntensiveWork.Parallel();
IntensiveWork.ParallelLoadBalancingPartitioner();
IntensiveWork.ParallelNotLoadBalancing();
IntensiveWork.ParallelAggregate();


These were my results:
-- Intensive work --

Not parallel            : 00:00:00.1050591
Parallel                : 00:00:00.0300402
Parallel, load balancing: 00:00:00.0286821
Parallel, chunked       : 00:00:00.0269859
Parallel, aggregate     : 00:00:00.0566970


You can see all of the parallel versions perform 40-70% better than just sequential.  But this is all still relatively fast and a lot of users won’t complain very hard if it’s a few milliseconds slower.

Blocking

Now let’s take a look at what happens when you have calculations that occasionally need to wait for an undetermined amount of time.  Let’s say because some files are on a slow pc on the network.

First let's add a utility class:


public static class RandomExtensions
{
    public static IEnumerable<int> Sequence(this Random random, int min, int max, int count)
    {
        for (int i = 0; i < count; ++i)
        {
            yield return random.Next(min, max);
        }
    }
}

Then start our work:


public class Blocking
{
    private const int ARRAYSIZE = 100;
    private static int[] _timerValues;

    static Blocking()
    {
        Random rnd = new Random();
        _timerValues = rnd.Sequence(100, 350, ARRAYSIZE).ToArray();
    }

    public static void NotParallel()
    {

        var nums = Enumerable.Range(0, ARRAYSIZE);

        Stopwatch watch = Stopwatch.StartNew();
        long result = 0;
        foreach (var item in nums)
        {
            int value = DoIntensiveWork(item);
            result += value;
        }
        watch.Stop();
        Console.WriteLine("Not parallel            : {0}", watch.Elapsed);
    }

    public static void ParallelLoadBalancingPartitioner()
    {

        // static partitioning requires an indexable source.
        var nums = Enumerable.Range(0, ARRAYSIZE).ToArray();

        Stopwatch watch = Stopwatch.StartNew();
        Partitioner<int> customPartitioner = Partitioner.Create(nums, true);

        // use the partitioner as source
        var q = from num in customPartitioner.AsParallel()
                select DoIntensiveWork(num);

        long result = 0;
        q.ForAll((num) =>
        {
            // thread-safe add
            Interlocked.Add(ref result, num);
        });
        watch.Stop();
        Console.WriteLine("Parallel, load balancing: {0}", watch.Elapsed);
    }

    public static void Parallel()
    {
        var nums = Enumerable.Range(0, ARRAYSIZE);

        Stopwatch watch = Stopwatch.StartNew();
        var q = from num in nums.AsParallel()
                select DoIntensiveWork(num);

        long result = 0;
        q.ForAll((num) =>
        {
            // thread-safe add
            Interlocked.Add(ref result, num);
        });
        watch.Stop();
        Console.WriteLine("Parallel                : {0}", watch.Elapsed);
    }

    public static void ParallelAggregate()
    {
        var nums = ParallelEnumerable.Range(0, ARRAYSIZE);
        Stopwatch watch = Stopwatch.StartNew();
        var q = (from num in nums select DoIntensiveWork(num)).AsParallel().Aggregate((i, o) => i + o);
        watch.Stop();
        Console.WriteLine("Parallel, aggregate     : {0}", watch.Elapsed);
    }

    public static void ParallelNotLoadBalancing()
    {

        var nums = Enumerable.Range(0, ARRAYSIZE).ToArray();
        Stopwatch watch = Stopwatch.StartNew();
        Partitioner<int> customPartitioner = Partitioner.Create(nums, false);

        // The partitioner is the query's data source.
        var q = from num in customPartitioner.AsParallel()
                select DoIntensiveWork(num);

        long result = 0;

        q.ForAll((num) =>
        {
            Interlocked.Add(ref result, num);
        });

        watch.Stop();
        Console.WriteLine("Parallel, chunked       : {0}", watch.Elapsed);
    }

    public static int DoIntensiveWork(int item)
    {
        Random rnd = new Random();
        int value = item;

        //pseudo random sleeper
        //sum of waited times is the same
        //over runs, but sleeping time differs
        Thread.Sleep(_timerValues[item]);
        return value;
    }
}


And in static void main:

Console.WriteLine("-- Blocking work --");
Blocking.NotParallel();
Blocking.Parallel();
Blocking.ParallelLoadBalancingPartitioner();
Blocking.ParallelNotLoadBalancing();
Blocking.ParallelAggregate();


These were my results:

-- Blocking work --
Not parallel            : 00:00:22.6460236
Parallel                : 00:00:05.9371790
Parallel, load balancing: 00:00:06.5457596
Parallel, chunked       : 00:00:05.9701180
Parallel, aggregate     : 00:00:06.7638934


Look at the amazing differences in speed.  Maybe your user won’t have noticed the differences before, but in this case he sure will.

So when would you use PLINQ to speed up your LINQ queries?  Some queries are very simple and barely do any processing, in fact the looping is half the work.  I wouldn’t necessarily parallelize those.  Some queries could take a lot of time, or there could be large differences in the processing time of one item or processing may have to wait for a lock to be released.  In these scenario’s you could consider changing your LINQ queries to PLINQ.  If you choose to parallelize with PLINQ, chunking or load-balancing are strategies you can use to fine tune the performance.  Transforming from LINQ to PLINQ requires minimal effort in many scenarios.

Comments

7/3/2013 9:35:53 PM #

Pingback from hirendhara.biz

PLINQ query giving overflow exception | Q Sites

hirendhara.biz |

Comments are closed

About me

I've started my company called 'Mad Savant'.  We focus on world domination. Holding multiple MCPD's and passionate about .NET, Microsoft in general, patterns, practises and writing quality software, I'm an experienced senior developer and fledgeling architect, having over 4j experience in .NET and over 10y development experience in total.

Other pages