码迷,mamicode.com
首页 > Windows程序 > 详细

C# 并行编程 之 PLINQ 规约操作和聚合函数

时间:2015-06-04 08:37:11      阅读:170      评论:0      收藏:0      [点我收藏+]

标签:c#   plinq   

概要

PLINQ可以简化对一个序列或一个组中所有成员应用同一个函数的过程,这个过程称之为规约操作。类似Sum()函数就是一个规约操作。PLINQ提供一个可重载Aggregate的接口,这里用户可以定义自己的规约函数。

规约操作是对每一个成员进行的操作,当操作完成后有可能需要将操作结果进行汇总得到一个最终的结果,这个就是聚合的概念。

规约操作

示例中要求计算 1 到 50000000中能被5整除的数除以PI以后得到的平均数。它可以用LINQ完成,也可以用PLINQ完成。

代码示例:

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Linq;
using System.IO;
using System.Collections.Generic;

namespace Sample6_2_plinq_calculate
{
    class Program
    {
        static int NUM_INTS = 50000000;

        static IEnumerable<int> GenerateInputeData()
        {
            return Enumerable.Range(1, NUM_INTS);
        }

        static ParallelQuery<int> GenerateInputeData4Parallel()
        {
            return ParallelEnumerable.Range(1, NUM_INTS);
        }

        static void Main(string[] args)
        {
            var seqTarget = GenerateInputeData();

            Console.WriteLine("============================================================");
            Console.WriteLine("TEST NORMAL LINQ");
            Console.WriteLine("============================================================");
            var swatchpn = Stopwatch.StartNew();

            var seqQuery = (from intNum in seqTarget
                            where ((intNum % 5) == 0)
                            select (intNum / Math.PI)).Average();
            swatchpn.Stop();

            Console.WriteLine("LINQ Result: " + seqQuery + "    LINQ Use Time: {0}", swatchpn.Elapsed);



            var palTarget = GenerateInputeData4Parallel();
            Console.WriteLine("\n\n");
            Console.WriteLine("============================================================");
            Console.WriteLine("TEST PARALLEL LINQ");
            Console.WriteLine("============================================================");
            var swatchp = Stopwatch.StartNew();

            var palQuery = (from intNum in palTarget.AsParallel()
                            where ((intNum % 5) == 0)
                            select (intNum / Math.PI)).Average();
            swatchp.Stop();

            Console.WriteLine("PLINQ Result: " + palQuery + "    LINQ Use Time: {0}", swatchp.Elapsed);

            Console.ReadLine();

        }
    }
}

测试结果:

技术分享

聚合操作

代码示例会计算一个数组的标准偏差,偏度,和峰度来说明聚合的使用。
顺便补补数学吧:

  • 标准偏差:一种量度数据分布的分散程度之标准,用以衡量数据值偏离算术平均值的程度。标准偏差越小,这些值偏离平均值就越少,反之亦然。标准偏差的大小可通过标准偏差与平均值的倍率关系来衡量。
    图片公式来自百度百科。
    技术分享

  • 偏度:偏度系数是描述分布偏离对称性程度的一个特征数。当分布左右对称时,偏度系数为0。当偏度系数大于0时,即重尾在右侧时,该分布为右偏。当偏度系数小于0时,即重尾在左侧时,该分布左偏。

技术分享

技术分享


  • 峰度:表示分布相对于正太分布而言是更加高耸还是更加平坦。正值表示相对高耸的分布,负值表示相对平坦的峰度。简单的说,峰度是描述分布形态的陡缓程度。也可以这样理解,在相同的标准差下,峰度系数越大,分布就有更多的极端值,那么其余值必然要更加集中在众数周围,其分布必然就更加陡峭。

技术分享


关于Aggregate 函数的参数说明参考
https://msdn.microsoft.com/en-us/zh-en/library/dd383667(v=vs.110).aspx

关于参数的简单说明:

  • seed:是累加器初始化的值。
  • update accumulator function:对数组中每一个值进行运算,PLINQ中由于它是对数据源进行了分区然后并行运算的,这一步产生的结果其实是保存的每一个分区的计算结果。
  • combine accumulator function:将每一分区的计算结果进行累加,得到一个总的数组的累加结果。
  • result selector:对累加结果进行运算,得到最终的结果,也就是返回值。

示例的重点并不是各种数字运算,而是说明Aggregate() 可以对数据源每一个元素运算后将结果进行汇总再次运算,它可以在一个步骤中完成,省去了分别编写的麻烦。而且它对数据运算时是数据分区,任务并行的。

下面是计算的代码示例:

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Linq;
using System.IO;
using System.Collections.Generic;


namespace Sample6_2_plink_aggregate
{
    class Program
    {
        static void Main(string[] args)
        {
            int[] inputInts = {0,3,4,8,15,22,34,57,68,32,30};

            var mean = inputInts.AsParallel().Average();


            var standarddeviation = inputInts.AsParallel().Aggregate(

                0d, // seed

                // update accumulator function
                // An accumulator function to be invoked on each element in a partition
                (subTotal, thisNumber) => subTotal + Math.Pow((thisNumber - mean), 2),

                // combine accumulator function
                // An accumulator function to be invoked on the yielded accumulator result from each partition.
                (total, thisTask) => total + thisTask,

                // result selector
                // A function to transform the final accumulator value into the result value.
                (finalSum) => Math.Sqrt((finalSum / (inputInts.Count()-1)))
                );


            var skewness = inputInts.AsParallel().Aggregate(

                0d, // seed

                // update accumulator function
                // An accumulator function to be invoked on each element in a partition
                (subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 3),

                // combine accumulator function
                // An accumulator function to be invoked on the yielded accumulator result from each partition.
                (total, thisTask) => total + thisTask,

                // result selector
                // A function to transform the final accumulator value into the result value.
                (finalSum) => (finalSum * inputInts.Count()) / ((inputInts.Count()-1)*(inputInts.Count()-2))
                );

            var kurtosis = inputInts.AsParallel().Aggregate(

                0d, // seed

                // update accumulator function
                // An accumulator function to be invoked on each element in a partition
                (subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 4),

                // combine accumulator function
                // An accumulator function to be invoked on the yielded accumulator result from each partition.
                (total, thisTask) => total + thisTask,

                // result selector
                // A function to transform the final accumulator value into the result value.
                (finalSum) => ((finalSum * inputInts.Count() * (inputInts.Count() + 1)) /
                    ((inputInts.Count() - 1) * (inputInts.Count() - 2) * (inputInts.Count() - 3))) -
                    (3 * Math.Pow((inputInts.Count() - 2), 2)) /
                    ((inputInts.Count() - 2) * (inputInts.Count() - 3))
                );

            Console.WriteLine("============================================================");
            Console.WriteLine("TEST Parallel LINQ Calculate Result");
            Console.WriteLine("============================================================");
            Console.WriteLine("Mean : {0}", mean);
            Console.WriteLine("Standard Deviaton : {0}", standarddeviation);
            Console.WriteLine("Skewness : {0}", skewness);
            Console.WriteLine("Kurtosis : {0}", kurtosis);

            Console.ReadLine();
        }
    }
}

并发的PLINQ任务和任务的取消

PLINQ同样也可以和其他形式的并发任务一起使用。例如在计算 标准偏差,偏度和峰度的过程中。
实际的执行顺序是 平均值 => 标准偏差 => 偏度 => 峰度

但根据运算的公式,完全可以把偏度和峰度进行并行化处理的。标准差是他们公共的输入。
平均值 => 标准偏差 => 偏度
=> 峰度

它们完全可以使用ContinueWith操作,如果有超时控制或取消需要的话,可以使用WithCancellation() 接口。

代码示例:
代码中用函数将 PLINQ 的操作又进行了封装,然后用Task的方式进行并行化的调用。deferredCancelTask 是一个捣乱任务,如果把注释打开,在2秒时它会发出一个Cancel信号,取消任务的执行,并且在异常处理时打印任务的状态。

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Linq;
using System.IO;
using System.Collections.Generic;


namespace Sample6_4_parallel_task_with_plinq
{
    class Program
    {
        private static ParallelQuery<int> inputInts =
            ParallelEnumerable.Range(1, 100000000);

        private static double CalculateMean(System.Threading.CancellationToken ct)
        {
            return inputInts.AsParallel().WithCancellation(ct).Average();
        }

        private static double CalculateStandardDeviation(System.Threading.CancellationToken ct, double mean)
        {
            return inputInts.AsParallel().WithCancellation(ct).Aggregate(

                0d, // seed

                // update accumulator function
                // An accumulator function to be invoked on each element in a partition
                (subTotal, thisNumber) => subTotal + Math.Pow((thisNumber - mean), 2),

                // combine accumulator function
                // An accumulator function to be invoked on the yielded accumulator result from each partition.
                (total, thisTask) => total + thisTask,

                // result selector
                // A function to transform the final accumulator value into the result value.
                (finalSum) => Math.Sqrt((finalSum / (inputInts.Count() - 1)))
                );
        }

        private static double CalculateSkewness(System.Threading.CancellationToken ct, double mean, double standarddeviation)
        {
            return inputInts.AsParallel().WithCancellation(ct).Aggregate(

                0d, // seed

                // update accumulator function
                // An accumulator function to be invoked on each element in a partition
                (subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 3),

                // combine accumulator function
                // An accumulator function to be invoked on the yielded accumulator result from each partition.
                (total, thisTask) => total + thisTask,

                // result selector
                // A function to transform the final accumulator value into the result value.
                (finalSum) => (finalSum * inputInts.Count()) / ((inputInts.Count() - 1) * (inputInts.Count() - 2))
                );
        }

        private static double CalculateKurtosis(System.Threading.CancellationToken ct, double mean, double standarddeviation)
        {
            return inputInts.AsParallel().WithCancellation(ct).Aggregate(

                0d, // seed

                // update accumulator function
                // An accumulator function to be invoked on each element in a partition
                (subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 4),

                // combine accumulator function
                // An accumulator function to be invoked on the yielded accumulator result from each partition.
                (total, thisTask) => total + thisTask,

                // result selector
                // A function to transform the final accumulator value into the result value.
                (finalSum) => ((finalSum * inputInts.Count() * (inputInts.Count() + 1)) /
                    ((inputInts.Count() - 1) * (inputInts.Count() - 2) * (inputInts.Count() - 3))) -
                    (3 * Math.Pow((inputInts.Count() - 2), 2)) /
                    ((inputInts.Count() - 2) * (inputInts.Count() - 3))
                );
        }

        static void Main(string[] args)
        {
            Console.WriteLine("============================================================");
            Console.WriteLine("TEST Parallel TASK work with PLINQ");
            Console.WriteLine("============================================================");

            var cts = new System.Threading.CancellationTokenSource();
            var ct = cts.Token;


            var TaskMean = new Task<double>(()=> CalculateMean(ct), ct);
            var TaskSTDev = TaskMean.ContinueWith<double>((t) => { return CalculateStandardDeviation(ct, t.Result); },
                                                          TaskContinuationOptions.OnlyOnRanToCompletion);
            var TaskSkewness = TaskSTDev.ContinueWith<double>((t) => { return CalculateSkewness(ct, TaskMean.Result, t.Result); },
                                                          TaskContinuationOptions.OnlyOnRanToCompletion);
            var TaskKurtosis = TaskSTDev.ContinueWith<double>((t) => { return CalculateKurtosis(ct, TaskMean.Result, t.Result); },
                                                          TaskContinuationOptions.OnlyOnRanToCompletion);

            //var deferredCancelTask = Task.Factory.StartNew(() => { System.Threading.Thread.Sleep(2000); cts.Cancel();});

            try
            {
                TaskMean.Start();

                Task.WaitAll(TaskSkewness, TaskKurtosis);
                Console.WriteLine("Mean : {0}", TaskMean.Result);
                Console.WriteLine("Standard Deviaton : {0}", TaskSTDev.Result);
                Console.WriteLine("Skewness : {0}", TaskSkewness.Result);
                Console.WriteLine("Kurtosis : {0}", TaskKurtosis.Result);

            }
            catch(AggregateException aex)
            {
                foreach (var ex in aex.InnerExceptions)
                {
                    //Console.WriteLine(ex.ToString());

                    if (ex is TaskCanceledException)
                    {
                        Console.WriteLine("Mean Task: {0}", TaskMean.Status);
                        Console.WriteLine("Standard Deviation Task: {0}", TaskSTDev.Status);
                        Console.WriteLine("Skewness Task: {0}", TaskSkewness.Status);
                        Console.WriteLine("Kurtosis Task: {0}", TaskKurtosis.Status);
                    }
                }
            }

            Console.ReadLine();
        }
    }
}

C# 并行编程 之 PLINQ 规约操作和聚合函数

标签:c#   plinq   

原文地址:http://blog.csdn.net/wangzhiyu1980/article/details/46299795

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!