Hooked on LINQ

Hooked on LINQ - Developers' Wiki
for .NET Language Integrated Query

Companion book for this site
LINQ to Objects Using C# 4.0:
Using and Extending LINQ to Objects and Parallel LINQ (PLINQ)
Quick Search

Advanced Search »
Edit

Chapter 9 - Parallel LINQ to Objects





Edit

Parallel LINQ to Objects Features

Edit

Listing 9-1 : GeoNames Parsing example - Sequential.

This sample shows how to use the a sequential LINQ query performs when parsing text from a file (780Mb file).

public void Listing_9_1_GeoNamesSequential_Simple()
{
    const int nameColumn = 1;
    const int countryColumn = 8;
    const int elevationColumn = 15;
 
    Stopwatch watch = new Stopwatch();
    watch.Start();
 
    // to minimize the download, a single region file is supplied
    //var lines = File.ReadLines(Path.Combine(Environment.CurrentDirectory, "Data/AU.txt"));
 
    var lines = File.ReadLines(Path.Combine(
        Environment.CurrentDirectory, "Data/AllCountries.txt"));
 
    var q = from line in lines
            let fields = line.Split(new char[] { '\t' })
            let elevation = string.IsNullOrEmpty(
                     fields[elevationColumn]) ?
                     0 : int.Parse(fields[elevationColumn])
            where elevation > 8000 // elevation in m's
            orderby elevation descending
            select new
            {
                name = fields[nameColumn] ?? "",
                elevation = elevation,
                country = fields[countryColumn]
            };
 
    foreach (var x in q)
    {
        if (x != null)
            Console.WriteLine("{0} ({1}m) – located in {2}",
                x.name, x.elevation, x.country);
    }
 
    Console.WriteLine("Elapsed time: {0}ms",
        watch.ElapsedMilliseconds);
}
 

Console output (Execution time: 45225ms): [Hide/Show]


Top



Edit

Listing 9-2 : GeoNames Parsing example - Parallel.

This sample shows how to use the AsParallel() operator when parsing text from a file (780Mb).

public void Listing_9_2_GeoNamesParallel_Simple()
{
    const int nameColumn = 1;
    const int countryColumn = 8;
    const int elevationColumn = 15;
 
    Stopwatch watch = new Stopwatch();
    watch.Start();
 
    // to minimize the download, a single region file is supplied
    //var lines = File.ReadLines(Path.Combine(Environment.CurrentDirectory, "Data/AU.txt"));
 
    var lines = File.ReadLines(Path.Combine(
        Environment.CurrentDirectory, "Data/AllCountries.txt"));
 
    var q = from line in lines.AsParallel()
            let fields = line.Split(new char[] { '\t' })
            let elevation = string.IsNullOrEmpty(
                     fields[elevationColumn]) ?
                     0 : int.Parse(fields[elevationColumn])
            where elevation > 8000 // elevation in m's
            orderby elevation descending
            select new
            {
                name = fields[nameColumn] ?? "",
                elevation = elevation,
                country = fields[countryColumn]
            };
 
    foreach (var x in q)
    {
        if (x != null)
            Console.WriteLine("{0} ({1}m) – located in {2}",
                x.name, x.elevation, x.country);
    }
 
    Console.WriteLine("Elapsed time: {0}ms",
        watch.ElapsedMilliseconds);
}
 

Console output (Execution time: 28583ms): [Hide/Show]


Top



Edit

Listing 9 : Simplest Parallel LINQ Query.

This sample shows how to make a LINQ query parallel.

public void Listing_9_SimplestParallel()
{
   // var data =
   //     new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
 
    var data = Enumerable.Range(0, 11);
 
    // sequential
    var q = from i in data
            select i;
 
    // un–ordered parallel
    var q1 = from i in data.AsParallel()
             select i;
    
    // ordered parallel using sort
    var q2 = from i in data.AsParallel()
             orderby i
             select i;
 
    // ordered parallel using .AsOrdered() extension
    var q3 = from i in data.AsParallel().AsOrdered()
             select i;
 
    Console.WriteLine("q={0}ms, q1={1}ms, q2={2}ms, q3={3}ms",
        MeasureTime(delegate { q.ToArray(); }, 100000),
        MeasureTime(delegate { q1.ToArray(); }, 100000),
        MeasureTime(delegate { q2.ToArray(); }, 100000),
        MeasureTime(delegate { q3.ToArray(); }, 100000)
    );
 
    Console.WriteLine("sequential");
    foreach (var item in q)
        Console.Write(item + " ");
 
    Console.WriteLine("");
    Console.WriteLine("Un–ordered Parallel");
    foreach (var item in q1)
        Console.Write(item + " ");
 
    Console.WriteLine();
    Console.WriteLine("Ordered Parallel");
    foreach (var item in q2)
        Console.Write(item + " ");
 
    Console.WriteLine();
    Console.WriteLine(".AsOrdered() Parallel");
    foreach (var item in q3)
        Console.Write(item + " ");
 
}
 

Console output (Execution time: 17819ms): [Hide/Show]


Top



Edit

Listing 9-3 : Simple Parallel LINQ Query with real Work.

This sample shows how to make a LINQ query parallel, and undertake lengthy work (10ms thread wait in this case) on each item.

public void Listing_9_3_SimpleParallel()
{
    var data = 
        new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
 
    // MySlowFunction(i) simply adds 10ms Sleep on 
    // the current thread, and returns the 'i' value
 
    // sequential
    var q = from i in data
            select MySlowFunction(i);
 
    // un–ordered parallel
    var q1 = from i in data.AsParallel()
             select MySlowFunction(i);
 
    // ordered parallel using .orderby
    var q2 = from i in data.AsParallel()
             orderby i
             select MySlowFunction(i);
 
    // ordered parallel using .AsOrdered()
    var q3 = from i in data.AsParallel().AsOrdered()
             select MySlowFunction(i);
 
    Console.WriteLine("q={0}ms, q1={1}ms, q2={2}ms, q3={3}ms",
        MeasureTime(delegate { q.ToArray(); },  100),
        MeasureTime(delegate { q1.ToArray(); }, 100),
        MeasureTime(delegate { q2.ToArray(); }, 100),
        MeasureTime(delegate { q3.ToArray(); }, 100)
    );
 
    Console.WriteLine("sequential");
    foreach (var item in q)
        Console.Write(item + " ");
 
    Console.WriteLine("");
    Console.WriteLine("Un–ordered Parallel");
    foreach (var item in q1)
        Console.Write(item + " ");
 
    Console.WriteLine();
    Console.WriteLine("Ordered Parallel");
    foreach (var item in q2)
        Console.Write(item + " ");
 
    Console.WriteLine();
    Console.WriteLine(".AsOrdered() Parallel");
    foreach (var item in q3)
        Console.Write(item + " ");
}
 

Console output (Execution time: 29660ms): [Hide/Show]


Top



Edit

Listing 9-4, 9-5 : AsParallel() and AsSequential().

This sample shows how to use the AsParallel() and AsSequential() to control when a query is executing parallel and when its executing sequentially.

public void Listing_9_4_9_5_AsParallelAndAsSequential()
{
    const int nameColumn = 1;
    const int countryColumn = 8;
    const int elevationColumn = 15;
 
    // to minimize the download, a single region file is supplied
    //var lines = File.ReadLines(Path.Combine(Environment.CurrentDirectory, "Data/AU.txt"));
 
    var lines = File.ReadLines(Path.Combine(
        Environment.CurrentDirectory, "Data/AllCountries.txt"));
 
    // this query runs sequentially
    var q = (from line in lines
             let fields = line.Split(new char[] { '\t' })
             where fields[countryColumn].StartsWith("A")
             orderby fields[elevationColumn] ?? "" descending
             select new
             {
                 name = fields[nameColumn] ?? "",
                 elevation = fields[elevationColumn] ?? "",
                 country = fields[countryColumn]
             })
            .Take(5);
 
    var lines1 = File.ReadLines(Path.Combine(
        Environment.CurrentDirectory, "Data/AllCountries.txt"));
 
    // this query runs sequentially because of the .Take() operator
    var q1 = (from line in lines1.AsParallel()
              let fields = line.Split(new char[] { '\t' })
              where fields[countryColumn].StartsWith("A")
              orderby fields[elevationColumn] ?? "" descending
              select new
              {
                  name = fields[nameColumn] ?? "",
                  elevation = fields[elevationColumn] ?? "",
                  country = fields[countryColumn]
              })
              .Take(5);
 
    var lines2 = File.ReadLines(Path.Combine(
        Environment.CurrentDirectory, "Data/AllCountries.txt"));
 
    // this query isolates the Take() operator and executes mostly parallel
    var q2 = (from line in lines2.AsParallel()
              let fields = line.Split(new char[] { '\t' })
              where fields[countryColumn].StartsWith("A")
              orderby fields[elevationColumn] ?? "" descending
              select new
              {
                  name = fields[nameColumn] ?? "",
                  elevation = fields[elevationColumn] ?? "",
                  country = fields[countryColumn]
              })
              .AsSequential()
              .Take(5);
 
    Console.WriteLine("Sequential: {0}ms, Parallel: {1}ms, with AsSequential: {2}ms",
        MeasureTime(delegate { q.ToArray(); }, 1),
        MeasureTime(delegate { q1.ToArray(); }, 1),
        MeasureTime(delegate { q2.ToArray(); }, 1));
}
 

Console output (Execution time: 83342ms): [Hide/Show]


Top



Edit

Listing 9-7 : Binary Operator AsParallel Rules

This sample shows the rules for binary operators when using the AsParallel operator.

public void Listing_9_7_AsParallelBinaryOperatorRules()
{
    var source1 = new int[] { 1, 2, 3, 4, 5 };
    var source2 = new int[] { 6, 7, 8, 9, 10 };
 
    // ERROR – Obsolete warning 
    // "The second data source of a binary operator must be of 
    // type System.Linq.ParallelQuery<T> rather than 
    // System.Collections.Generic.IEnumerable<T>. 
    // To fix this problem, use the AsParallel() extension method 
    // to convert the right data source to System.Linq.ParallelQuery<T>."
    //var q = source1.AsParallel().Concat(source2);
 
    // the following queries work fine.
    // remember to force ordering where appropriate.
    var q1 = source1.AsParallel()
             .Concat(source2.AsParallel());
 
    var q2 = source1.AsParallel().AsOrdered()
             .Concat(source2.AsParallel());
 
    var q3 = source1.AsParallel().AsOrdered()
             .Concat(source2.AsParallel().AsOrdered());
 
    //Console.WriteLine("q ={0}", String.Join(" ", q.Select(i => i.ToString()).ToArray()));
    Console.WriteLine("q1={0}", String.Join(" ", q1.Select(i => i.ToString()).ToArray()));
    Console.WriteLine("q2={0}", String.Join(" ", q2.Select(i => i.ToString()).ToArray()));
    Console.WriteLine("q3={0}", String.Join(" ", q3.Select(i => i.ToString()).ToArray()));
 
    var w = source1.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism);
    //var x = w.WithExecutionMode(ParallelExecutionMode.ForceParallelism);
 
    w.ToList();
 
}
 

Console output (Execution time: 93ms): [Hide/Show]


Top



Edit

Listing 9 : GeoNames join sample.

This sample shows how to use the AsParallel() when parsing text from a file with a join. Not covered in the book, but an extension showing joins and parallel.

public void Listing_9_x_GeoNames_Join()
{
    const int nameColumn = 1;
    const int featureClassColumn = 6;
    const int featureCodeColumn = 7;
    const int countryColumn = 8;
    const int elevationIndexColumn = 15;
 
    Stopwatch watch = new Stopwatch();
    watch.Start();
 
    // load in the feature class and code decoding data into an array.
    var codeFile = File.ReadLines(
        Path.Combine(Environment.CurrentDirectory, "Data/FeatureCodes.txt"));
 
    var codes = (from code in codeFile.AsParallel()
                 let c = code.Split(new char[] { '\t' })
                 select new
                 {
                     featureClass = c[0][0],
                     featureCode = c[0].Remove(0, 2),
                     featureDescription = c[1]
                 })
                 .ToArray();
 
    // to minimize the download, a single region file is supplied
    //var lines = File.ReadLines(Path.Combine(Environment.CurrentDirectory, "Data/AU.txt"));
 
    var lines = File.ReadLines(Path.Combine(Environment.CurrentDirectory, "Data/AllCountries.txt"));
 
    var q = from line in lines.AsParallel()
            let fields = line.Split(new char[] { '\t' })
            let elevation = string.IsNullOrEmpty(fields[elevationIndexColumn]) ?
                                     0 : int.Parse(fields[elevationIndexColumn])
            where elevation > 6000 // elevation in m's
            let code = codes.SingleOrDefault(
                         c => c.featureCode == fields[featureCodeColumn] &&
                              c.featureClass == fields[featureClassColumn][0])
            orderby elevation descending
            select new
            {
                name = fields[nameColumn] ?? "",
                elevation = elevation,
                country = fields[countryColumn],
                description = code != null ? code.featureDescription : ""
            };
 
    foreach (var x in q)
    {
        if (x != null)
            Console.WriteLine("{0} ({1}m) – A {2} in {3}",
                x.name,
                x.elevation,
                x.description,
                x.country);
    }
 
    Console.WriteLine();
    Console.WriteLine("Elapsed time: {0}ms", watch.ElapsedMilliseconds);
}
 

Console output (Execution time: 22869ms): [Hide/Show]


Top



Edit

Table 9-2 : Sequential Variance Test

This sample shows the various optomizations for a Variance statistical operator

public void Listing_9_Table_9_2_SequentialVariance()
{
    var sourceArray = Enumerable.Range(1, 100000).ToArray();
    var sourceEnum = Enumerable.Range(1, 100000);
 
    Console.WriteLine("Variance over array of int's");
    Console.WriteLine("Variance1: {0}ms, Variance2: {1}ms, Variance: {2}ms",
        MeasureTime(delegate { sourceArray.Variance1(); }, 1000),
        MeasureTime(delegate { sourceArray.Variance2(); }, 1000),
        MeasureTime(delegate { sourceArray.Variance(); }, 1000));
 
    Console.WriteLine("Variance over IEnumerable of int's");
    Console.WriteLine("Variance1: {0}ms, Variance2: {1}ms, Variance: {2}ms",
        MeasureTime(delegate { sourceEnum.Variance1(); }, 1000),
        MeasureTime(delegate { sourceEnum.Variance2(); }, 1000),
        MeasureTime(delegate { sourceEnum.Variance(); }, 1000));
}
 
public static class StatisticExtensions
{
    /* http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
    n = 0, sum = 0, sum_sqr = 0
 
    for x in data:
        n = n + 1
        sum = sum + x
        sum_sqr = sum_sqr + x*x
 
    mean = sum/n
    variance = (sum_sqr – sum*mean)/(n – 1) 
    */
 
    // sequential variance calculation
    // Listing 9–8
    public static double Variance1(
         this IEnumerable<int> source)
    {
        // traditional aggregate
        double mean = source.Average();
        return source.Aggregate(
            (double)0.0,
            (subtotal, item) =>
                subtotal + Math.Pow((item – mean), 2),
            (totalSum) => totalSum / (source.Count()1)
        );
    }
 
    // Listing 9–9
    public static double Variance2(
        this IEnumerable<int> source)
    {
        // optomization 1 – removing the Math.Power
        double mean = source.Average();
        return source.Aggregate(
            (double)0.0,
            (subtotal, item) => subtotal +
                ((double)item – mean) * ((double)item – mean),
            (totalSum) => totalSum / (source.Count()1)
        );
    }
 
    // Listing 9–10
    public static double Variance(
         this IEnumerable<int> source)
    {
        // optomization 2 – removing count and mean calcs
 
        // check for invalid source conditions
        if (source == null)
            throw new ArgumentNullException("source");
 
        return source.Aggregate(
 
            // seed – array of three doubles
            new double[3] { 0.0, 0.0, 0.0 },
 
            // item aggregation function, run for each element
            (subtotal, item) =>
            {
                subtotal[0]++; // count
                subtotal[1] += item; // sum
                // sum of squares
                subtotal[2] += (double)item * (double)item;
                return subtotal;
            },
 
            // result selector function 
            // (finesses the final sum into the variance)
            // mean = sum / count 
            // variance = (sum_sqr – sum * mean) / (n – 1) 
            // Sources with zero or one element return a value of 0
            result => result[0] > 1 ?
                (result[2](result[1] * (result[1] / result[0])))
                    / (result[0]1) : 0.0
        );
    }
 
    // parallel Variance aggregate extension method
    // based on the optomized sequential algorithm
    // Listing 9–12
    public static double Variance(
         this ParallelQuery<int> source)
    {
        /* based upon the blog posting by Igor Ostrovsky at –
         * http://blogs.msdn.com/pfxteam/archive/2008/06/05/8576194.aspx
         * which demonstrates how to use the factory functions in an 
         * Aggregate function for efficiency
         */
 
        // check for invalid source conditions
        if (source == null)
            throw new ArgumentNullException("source");
 
        return source.Aggregate(
 
            // seed – array of three doubles constructed 
            // using factory function, initialized to 0
            () => new double[3] { 0.0, 0.0, 0.0 },
 
            // item aggregation function, run for each element
            (subtotal, item) =>
            {
                subtotal[0]++; // count
                subtotal[1] += item; // sum
                // sum of squares
                subtotal[2] += (double)item * (double)item;
                return subtotal;
            },
 
            // combine function, 
            // run on completion of each "thread"
            (total, thisThread) =>
            {
                total[0] += thisThread[0];
                total[1] += thisThread[1];
                total[2] += thisThread[2];
                return total;
            },
 
            // result selector function
            // finesses the final sum into the variance
            // mean = sum / count 
            // variance = (sum_sqr – sum * mean) / (n – 1) 
            // Sources with zero or one element return a value of 0
            (result) => (result[0] > 1) ?
                (result[2](result[1] * (result[1] / result[0])))
                    / (result[0]1) : 0.0
        );
    }
 
    // Listing 9–11
    public static double StandardDeviation(
        this IEnumerable<int> source)
    {
        return Math.Sqrt(source.Variance());
    }
 
    public static double StandardDeviation<T>(
        this IEnumerable<T> source,
        Func<T, int> selector)
    {
        return StandardDeviation(
            Enumerable.Select(source, selector));
    }
 
    // Listing 9–13
    public static double StandardDeviation(
        this ParallelQuery<int> source)
    {
        return Math.Sqrt(source.Variance());
    }
 
    public static double StandardDeviation<T>(
        this ParallelQuery<T> source,
        Func<T, int> selector)
    {
        return StandardDeviation(
            ParallelEnumerable.Select(source, selector));
    }
}
 

Console output (Execution time: 49278ms): [Hide/Show]


Top



Edit

Table 9-3 : Parallel Standard Deviation Test

This sample shows the parallel StandardDeviation statistical operator

public void Listing_9_Table_9_3_Parallel_StandardDeviation()
{
    var sequential = Enumerable.Range(1, 100000).ToArray();
    var parall = Enumerable.Range(1, 100000)
        .ToArray()
        .AsParallel();
 
    var sequentialEnum = Enumerable.Range(1, 100000);
    var parallEnum = Enumerable.Range(1, 100000)
        .AsParallel()
        .WithExecutionMode(ParallelExecutionMode.ForceParallelism);
 
    Console.WriteLine("{0}, {1}", sequential.StandardDeviation(), parall.StandardDeviation());
 
    Console.WriteLine("Standard Deviation over array of int's");
    Console.WriteLine("sequential: {0}ms, Parallel: {1}ms",
        MeasureTime(delegate { sequential.StandardDeviation(); }, 1000),
        MeasureTime(delegate { parall.StandardDeviation(); }, 1000));
 
 
    Console.WriteLine("Standard Deviation over IEnumerable of int's");
    Console.WriteLine("sequential: {0}ms, Parallel: {1}ms",
        MeasureTime(delegate { sequentialEnum.StandardDeviation(); }, 1000),
        MeasureTime(delegate { parallEnum.StandardDeviation(); }, 1000));
}
 
public static class StatisticExtensions
{
    /* http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
    n = 0, sum = 0, sum_sqr = 0
 
    for x in data:
        n = n + 1
        sum = sum + x
        sum_sqr = sum_sqr + x*x
 
    mean = sum/n
    variance = (sum_sqr – sum*mean)/(n – 1) 
    */
 
    // sequential variance calculation
    // Listing 9–8
    public static double Variance1(
         this IEnumerable<int> source)
    {
        // traditional aggregate
        double mean = source.Average();
        return source.Aggregate(
            (double)0.0,
            (subtotal, item) =>
                subtotal + Math.Pow((item – mean), 2),
            (totalSum) => totalSum / (source.Count()1)
        );
    }
 
    // Listing 9–9
    public static double Variance2(
        this IEnumerable<int> source)
    {
        // optomization 1 – removing the Math.Power
        double mean = source.Average();
        return source.Aggregate(
            (double)0.0,
            (subtotal, item) => subtotal +
                ((double)item – mean) * ((double)item – mean),
            (totalSum) => totalSum / (source.Count()1)
        );
    }
 
    // Listing 9–10
    public static double Variance(
         this IEnumerable<int> source)
    {
        // optomization 2 – removing count and mean calcs
 
        // check for invalid source conditions
        if (source == null)
            throw new ArgumentNullException("source");
 
        return source.Aggregate(
 
            // seed – array of three doubles
            new double[3] { 0.0, 0.0, 0.0 },
 
            // item aggregation function, run for each element
            (subtotal, item) =>
            {
                subtotal[0]++; // count
                subtotal[1] += item; // sum
                // sum of squares
                subtotal[2] += (double)item * (double)item;
                return subtotal;
            },
 
            // result selector function 
            // (finesses the final sum into the variance)
            // mean = sum / count 
            // variance = (sum_sqr – sum * mean) / (n – 1) 
            // Sources with zero or one element return a value of 0
            result => result[0] > 1 ?
                (result[2](result[1] * (result[1] / result[0])))
                    / (result[0]1) : 0.0
        );
    }
 
    // parallel Variance aggregate extension method
    // based on the optomized sequential algorithm
    // Listing 9–12
    public static double Variance(
         this ParallelQuery<int> source)
    {
        /* based upon the blog posting by Igor Ostrovsky at –
         * http://blogs.msdn.com/pfxteam/archive/2008/06/05/8576194.aspx
         * which demonstrates how to use the factory functions in an 
         * Aggregate function for efficiency
         */
 
        // check for invalid source conditions
        if (source == null)
            throw new ArgumentNullException("source");
 
        return source.Aggregate(
 
            // seed – array of three doubles constructed 
            // using factory function, initialized to 0
            () => new double[3] { 0.0, 0.0, 0.0 },
 
            // item aggregation function, run for each element
            (subtotal, item) =>
            {
                subtotal[0]++; // count
                subtotal[1] += item; // sum
                // sum of squares
                subtotal[2] += (double)item * (double)item;
                return subtotal;
            },
 
            // combine function, 
            // run on completion of each "thread"
            (total, thisThread) =>
            {
                total[0] += thisThread[0];
                total[1] += thisThread[1];
                total[2] += thisThread[2];
                return total;
            },
 
            // result selector function
            // finesses the final sum into the variance
            // mean = sum / count 
            // variance = (sum_sqr – sum * mean) / (n – 1) 
            // Sources with zero or one element return a value of 0
            (result) => (result[0] > 1) ?
                (result[2](result[1] * (result[1] / result[0])))
                    / (result[0]1) : 0.0
        );
    }
 
    // Listing 9–11
    public static double StandardDeviation(
        this IEnumerable<int> source)
    {
        return Math.Sqrt(source.Variance());
    }
 
    public static double StandardDeviation<T>(
        this IEnumerable<T> source,
        Func<T, int> selector)
    {
        return StandardDeviation(
            Enumerable.Select(source, selector));
    }
 
    // Listing 9–13
    public static double StandardDeviation(
        this ParallelQuery<int> source)
    {
        return Math.Sqrt(source.Variance());
    }
 
    public static double StandardDeviation<T>(
        this ParallelQuery<T> source,
        Func<T, int> selector)
    {
        return StandardDeviation(
            ParallelEnumerable.Select(source, selector));
    }
}
 

Console output (Execution time: 12354ms): [Hide/Show]


Top



Edit

Listing 9 : Standard Deviation Test for Error Handling

This sample shows the Error Handling code for the StandardDeviation statistical operator

public void Listing_9_Hardening_StandardDeviation()
{
    // source is null
    int[] nullSource = null;
 
    try
    {
        nullSource.StandardDeviation();
    }
    catch (Exception e)
    {
        Console.WriteLine("Sequential SD Null Source Error: {0}",
            e.Message);
    }
 
    try
    {
        nullSource.AsParallel().StandardDeviation();
    }
    catch (Exception e)
    {
        Console.WriteLine("Parallel SD Null Source Error: {0}",
            e.Message);
    }
    
    // source is empty
    int[] empty = new int[] {  };
 
    Console.WriteLine("Sequential empty source SD = {0}, Parallel empty source SD = {1}",
        empty.StandardDeviation(),
        empty.AsParallel()
            .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
            .StandardDeviation()
    );
 
    // source is single element. Should return 0
    int[] one = new int[] { 5 };
 
    Console.WriteLine("Sequential one element SD = {0}, Parallel one element SD = {1}",
        one.StandardDeviation(),
        one.AsParallel()
            .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
            .StandardDeviation()
    ); 
 
}
 
public static class StatisticExtensions
{
    /* http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
    n = 0, sum = 0, sum_sqr = 0
 
    for x in data:
        n = n + 1
        sum = sum + x
        sum_sqr = sum_sqr + x*x
 
    mean = sum/n
    variance = (sum_sqr – sum*mean)/(n – 1) 
    */
 
    // sequential variance calculation
    // Listing 9–8
    public static double Variance1(
         this IEnumerable<int> source)
    {
        // traditional aggregate
        double mean = source.Average();
        return source.Aggregate(
            (double)0.0,
            (subtotal, item) =>
                subtotal + Math.Pow((item – mean), 2),
            (totalSum) => totalSum / (source.Count()1)
        );
    }
 
    // Listing 9–9
    public static double Variance2(
        this IEnumerable<int> source)
    {
        // optomization 1 – removing the Math.Power
        double mean = source.Average();
        return source.Aggregate(
            (double)0.0,
            (subtotal, item) => subtotal +
                ((double)item – mean) * ((double)item – mean),
            (totalSum) => totalSum / (source.Count()1)
        );
    }
 
    // Listing 9–10
    public static double Variance(
         this IEnumerable<int> source)
    {
        // optomization 2 – removing count and mean calcs
 
        // check for invalid source conditions
        if (source == null)
            throw new ArgumentNullException("source");
 
        return source.Aggregate(
 
            // seed – array of three doubles
            new double[3] { 0.0, 0.0, 0.0 },
 
            // item aggregation function, run for each element
            (subtotal, item) =>
            {
                subtotal[0]++; // count
                subtotal[1] += item; // sum
                // sum of squares
                subtotal[2] += (double)item * (double)item;
                return subtotal;
            },
 
            // result selector function 
            // (finesses the final sum into the variance)
            // mean = sum / count 
            // variance = (sum_sqr – sum * mean) / (n – 1) 
            // Sources with zero or one element return a value of 0
            result => result[0] > 1 ?
                (result[2](result[1] * (result[1] / result[0])))
                    / (result[0]1) : 0.0
        );
    }
 
    // parallel Variance aggregate extension method
    // based on the optomized sequential algorithm
    // Listing 9–12
    public static double Variance(
         this ParallelQuery<int> source)
    {
        /* based upon the blog posting by Igor Ostrovsky at –
         * http://blogs.msdn.com/pfxteam/archive/2008/06/05/8576194.aspx
         * which demonstrates how to use the factory functions in an 
         * Aggregate function for efficiency
         */
 
        // check for invalid source conditions
        if (source == null)
            throw new ArgumentNullException("source");
 
        return source.Aggregate(
 
            // seed – array of three doubles constructed 
            // using factory function, initialized to 0
            () => new double[3] { 0.0, 0.0, 0.0 },
 
            // item aggregation function, run for each element
            (subtotal, item) =>
            {
                subtotal[0]++; // count
                subtotal[1] += item; // sum
                // sum of squares
                subtotal[2] += (double)item * (double)item;
                return subtotal;
            },
 
            // combine function, 
            // run on completion of each "thread"
            (total, thisThread) =>
            {
                total[0] += thisThread[0];
                total[1] += thisThread[1];
                total[2] += thisThread[2];
                return total;
            },
 
            // result selector function
            // finesses the final sum into the variance
            // mean = sum / count 
            // variance = (sum_sqr – sum * mean) / (n – 1) 
            // Sources with zero or one element return a value of 0
            (result) => (result[0] > 1) ?
                (result[2](result[1] * (result[1] / result[0])))
                    / (result[0]1) : 0.0
        );
    }
 
    // Listing 9–11
    public static double StandardDeviation(
        this IEnumerable<int> source)
    {
        return Math.Sqrt(source.Variance());
    }
 
    public static double StandardDeviation<T>(
        this IEnumerable<T> source,
        Func<T, int> selector)
    {
        return StandardDeviation(
            Enumerable.Select(source, selector));
    }
 
    // Listing 9–13
    public static double StandardDeviation(
        this ParallelQuery<int> source)
    {
        return Math.Sqrt(source.Variance());
    }
 
    public static double StandardDeviation<T>(
        this ParallelQuery<T> source,
        Func<T, int> selector)
    {
        return StandardDeviation(
            ParallelEnumerable.Select(source, selector));
    }
}
 

Console output (Execution time: 21ms): [Hide/Show]


Top



Edit

Listing 9 : AsParallel examples.

This sample shows how to use the AsParallel() operator.

public void Listing_9_AsParallel()
{
    int[] a = { 1, 2, 3, 4, 5 };
    int[] b = { 3, 4, 5, 6, 7 };
 
     (from i in a.AsParallel()
      select i * i)
     .ForAll(i => Console.Write(i + " "));
}
 

Console output (Execution time: 0ms): [Hide/Show]


Top



If you would like to comment on this page, click on the Discuss button located on the top-right of each page. Feel free to edit any mistakes or omissions you find. If you have an objection or find in-appropriate content then contact the administrator. This website is not affiliated with Microsoft®, all content and opinions are those of the specific author and some advice, solutions and article may contain unintentional errors - please use care. Powered by ScrewTurn Wiki version 2.0.33. Some of the icons created by FamFamFam.