C# · 12月 20, 2021

看看Parallel中高度封装的三个方法,Invoke,For和ForEach

  说到.net中的并行编程,也许你的第一反应就是Task,确实Task是一个非常灵活的用于并行编程的一个专用类,不可否认越灵活的东西用起来就越

复杂,高度封装的东西用起来很简单,但是缺失了灵活性,这篇我们就看看这些好用但灵活性不高的几个并行方法。

一:Invoke

  现在电子商务的网站都少不了订单的流程,没有订单的话网站也就没有存活的价值了,往往在订单提交成功后,通常会有这两个操作,第一个:发起

信用卡扣款,第二个:发送emial确认单,这两个操作我们就可以在下单接口调用成功后,因为两个方法是互不干扰的,所以就可以用invoke来玩玩了。

Main( Console.WriteLine( Thread.Sleep( Console.WriteLine( Console.WriteLine( Thread.Sleep( Console.WriteLine( }

  怎么样,实现起来是不是很简单,只要把你需要的方法塞给invoke就行了,不过在这个方法里面有一个重载参数需要注意下,

Invoke(, Action[] actions);

有时候我们的线程可能会跑遍所有的内核,为了提高其他应用程序的稳定性,就要限制参与的内核,正好ParallelOptions提供了

MaxDegreeOfParallelism属性。

好了,下面我们大概翻翻invoke里面的代码实现,发现有几个好玩的地方:

: 当invoke中的方法超过10个话,我们发现它走了一个internal可见的ParallelForReplicatingTask的FCL内部专用类,而这个类是继承自

   Task的,当方法少于10个的话,才会走常规的Task.

居然发现了一个装exception 的ConcurrentQueue队列集合,多个异常入队后,再包装成AggregateException抛出来。

       比如:throw new AggregateException(exceptionQ);

我们发现,不管是超过10个还是小于10个,都是通过WaitAll来等待所有的执行,所以缺点就在这个地方,

如果我们用task中就可以在waitall中设置一个过期时间,但invoke却没法做到,所以在使用invoke的时候要慎重考虑。

( > || (parallelOptions.MaxDegreeOfParallelism != – && parallelOptions.MaxDegreeOfParallelism < exceptionQ = null; actionIndex = ParallelForReplicatingTask parallelForReplicatingTask = ParallelForReplicatingTask(parallelOptions, ( l = Interlocked.Increment( actionIndex); l <= actionsCopy.Length; l = Interlocked.Increment( actionsCopy[l – LazyInitializer.EnsureInitialized( exceptionQ,() => ConcurrentQueue LazyInitializer.EnsureInitialized( exceptionQ,() => ConcurrentQueue AggregateException ex = ex2 (ex != (IEnumerator enumerator = Exception current = (exceptionQ != && exceptionQ.Count > ( j = ; j < array.Length; j++ array[j] = (array.Length <= ( k = ; k < array.Length; k++ TplEtwProvider.Log.ParallelInvokeEnd((task != ) ? task.m_taskScheduler.Id : TaskScheduler.Current.Id,(task != ) ? task.Id : }

二:For

   下面再看看Parallel.For,我们知道普通的For是一个串行操作,如果说你的for中每条流程都需要执行一个方法,并且这些方法可以并行操作且

比较耗时,那么为何不尝试用Parallel.For呢,就比如下面的代码。

Main( List actions = List result = Parallel.For(,actions.Count,(i) => Console.WriteLine( + Console.WriteLine( Thread.Sleep( Console.WriteLine( Console.WriteLine( Thread.Sleep( Console.WriteLine( }

下面我们再看看Parallel.For中的最简单的重载和最复杂的重载:

ParallelLoopResult For( fromInclusive, toExclusive,Action ParallelLoopResult For( fromInclusive,ParallelOptions parallelOptions,Func localInit,Func body,Action

简单的重载不必多说,很简单,我上面的例子也演示了。

最复杂的这种重载提供了一个AOP的功能,在每一个body的action执行之前会先执行localInit这个action,在body之后还会执行localFinally

       这个action,有没有感觉到已经把body切成了三块?好了,下面看一个例子。

<div class=”cnblogs_code” onclick=”cnblogs_code_show(‘9489ca82-accb-417b-a3e6-77cd372aa653’)”>
<img id=”code_img_closed_9489ca82-accb-417b-a3e6-77cd372aa653″ class=”code_img_closed” src=”https://www.jb51.cc/res/2019/02-17/00/1c53668bcee393edac0d7b3b3daff1ae.gif” alt=””><img id=”code_img_opened_9489ca82-accb-417b-a3e6-77cd372aa653″ class=”code_img_opened” style=”display: none;” onclick=”cnblogs_code_hide(‘9489ca82-accb-417b-a3e6-77cd372aa653’,event)” src=”https://www.jb51.cc/res/2019/02-17/00/405b18b4b6584ae338e0f6ecaf736533.gif” alt=””><div id=”cnblogs_code_open_9489ca82-accb-417b-a3e6-77cd372aa653″ class=”cnblogs_code_hide”>

Main( list = List() { ,,, options = total = result = Parallel.For(,list.Count,() => Console.WriteLine( (i,loop,j) => Console.WriteLine( Console.WriteLine( + list[i] + + (i) => Console.WriteLine( Interlocked.Add( Console.WriteLine( + Console.WriteLine( + }

接下来我们再翻翻它的源代码,由于源码太多,里面神乎其神,我就找几个好玩的地方。

  我在里面找到了一个rangeManager分区函数,代码复杂看不懂,貌似很强大。

RangeManager( nFromInclusive, nToExclusive, nStep, .m_nCurrentIndexrangeToAssign = .m_nStep = (nNumExpectedWorkers == nNumExpectedWorkers = num = ()(nToExclusive – num2 = num / ()(( num2 -= num2 % ( (num2 == num2 = ( num3 = ()(num / (num % num2 != num3++ num4 = ( .m_indexranges = num5 = ( i = ; i < num3; i++ .m_indexranges[i].m_nFromInclusive = .m_indexranges[i].m_nSharedCurrentIndexOffset = .m_indexranges[i].m_bRangeFinished = num5 += (num5 num5 = .m_indexranges[i].m_nToExclusive = }

我又找到了这个神奇的ParallelForReplicatingTask类。

那么下面问题来了,在单线程的for中,我可以continue,可以break,那么在Parallel.For中有吗?因为是并行,所以continue基本上就没有

存在价值,break的话确实有价值,这个就是委托中的ParallelLoopState做到的,并且还新增了一个Stop。

 

三:ForEach

其实ForEach和for在本质上是一样的,你在源代码中会发现在底层都是调用一个方法的,而ForEach会在底层中调用for共同的函数之前还会执行

其他的一些逻辑,所以这就告诉我们,,其他的都一样了,这里就不赘述了。