Concurrent Collections With C#: ForEachConcurrent
Concurrent programming seems to be a theme of the day of late so I'll throw my hat in the ring.
First a disclaimer: I am not a fan of the async/await construct in C#.
In my view, async/await is a step away from the OO message-passing trend that came with LINQ, closures and collection method-chaining which I have grown to like and depend on. In the face of that async/await feels like a throwback to a procedural yesteryear I feel is best underemphasized and avoided. Not completely without merit, the construct has its place but I have yet to produce any clean implementations with it. It reminds me a lot of using VB goto labels for error handling - an untidy if acceptable way of doing something for which it seems there (at least) should be a more elegant way to express.
There are maybe one or two circumstances I have encountered where there would be a performance boost to an async task while imperative inline statements directly under execute concurrently. On many more occasions I was faced with circumstances where collections needed identical statements executed concurrently. Zealots will rightly note that AsParallel exists and is implemented well enough by the good people at Microsoft. The problem I have faced when using AsParallel on collections is that the performance benefits are far outstripped by the complications which arise with threading in general. In ASP.NET specifically, it is mostly harmful since it confuses the IIS/Katana wrapper threading model under which it runs. On top of that, AsParallel is severely limited to 8 threads and, if all that wasn't enough, there is no way to track completion of tasks requiring the code base to be sprayed with "global" variables checking for completion, locking, etc.
For all these code-smell reasons violating SOLID principles, async/await is actually a better alternative to AsParallel but the problem with async/await is that it too leads to SOLID violations (particularly the S) but in a more roundabout way. All async/await implementations force method's callees to be async, as well. Modifying existing statements to be async in any given lower level module means that all implementations of the method in higher layers would also have to be modified. This in turn begets callee modifications to be async which begets their callees to be async and so on and so on till eventually the entire architecture has been adapted into async. Bottom-up. Inadvertently.
When tackling identical concurrent statements over a collection it would be far easier if we could simply step into an async-like processing over the particular collection mid-statement then step out and continue executing all the tail end statements synchronously. Like AsParallel but without the limitations and with the simplicity of await to detect completion. Or, if you prefer, like async/await but without forcing async bubble up and with inline chaining expression of AsParallel. For this requirement, delegates, in my view, are far better suited approach than async/await construct, which I personally found cumbersome (for this purpose).
So without further delay I present ForEachConcurrent:
public static List<TD> ForEachConcurrent<T, TD>(this IEnumerable<T> src, Func<T, TD> concurrentAsync, bool isRunAsync = false, short magnitude = 2, params object[] args) { var arDataTasks = new ConcurrentStack<TD>(); List<Tuple<Func<T, TD>, T>> tasks = src .Select(thing => new Tuple<Func<T, TD>, T>(concurrentAsync, thing)) .ToList(); var states = new ConcurrentStack<IAsyncResult>(); tasks .SplitInBatchesOf(magnitude.BitMultiplesOf8()) .ForEach(batch => batch .ForEach(task => { var async = task.Item1; var arg = task.Item2; var state = async.BeginInvoke(arg, result => { using (result.AsyncWaitHandle) { arDataTasks.Push(async.EndInvoke(result)); result.AsyncWaitHandle.Close(); } }, args); //waitHandles states.Push(state); })); bool isWait = !isRunAsync; while (isWait) if (states.ToList().All(itm => itm.IsCompleted || itm.CompletedSynchronously) && arDataTasks.ToList().Count >= tasks.Count) isWait = false; //break; return arDataTasks.ToList(); }
The reason behind SplitInBatchesOf perhaps should be explained in more detail but for this post's intents and purposes it should suffice to say that: the number of concurrent operations which run on an x86/x64 machine (namely mine) is between 32 and 128. Going over that and somewhere between the CLR and the assembler the processor locks up while trying to manage the sheer volume of resources required for that number of operations to run concurrently. Running 1000 concurrent operations is completely out of the question. The best we can do is run up-to 32-128 concurrent operations (but for safety 32-64) and spawn new batches of operations to run concurrently when the previous batch is finished executing.
public static List<List<T>> SplitInBatchesOf<T>(this IEnumerable<T> source, int batchSize) { return source .SplitInBatchesOf(batchSize, itm => new List<T>(itm ?? new List<T>())) .ToList(); } static IEnumerable<TR> SplitInBatchesOf<T, TR>(this IEnumerable<T> source, int size, Func<IEnumerable<T>, TR> resultSelector = null, Func<T, TR> selector = null) { return batchElements(source, size, resultSelector ?? (itm => itm.Select(selector ?? (x => default(TR))).FirstOrDefault())); } private static IEnumerable<TR> batchElements<T, TR>(this IEnumerable<T> source, int size, Func<IEnumerable<T>, TR> resultSelector) { T[] batch = null; var count = 0; foreach (var item in source) { if (batch == null) batch = new T[size]; batch[count++] = item; if (count != size) continue; yield return resultSelector(batch.Select(x => x)); batch = null; count = 0; } if (batch != null && count > 0) yield return resultSelector(batch.Take(count)); }
So far we have succeeded in creating quite a bit of overhead to squeeze out 32-64 concurrent operations. Does all this boilerplate even have a significant performance impact? The answere is as always "it depends". In order to see how it might, we first need to take a look at some latency numbers from Norvig.
All of Norvig's original latency numbers were from 2002 so if we project them to 2015 (using the latency scale so kindly put together by Berkley's research staff) we can see that there are certain absolutes which are not going to be getting much better. Not only in the foreseeable future but ever. Specifically the roundtrip for one packet in a data center is locked at approximately 500μs. That's 500 micro-seconds or 0.5 milliseconds. Two roundtrips for one packet in the same data center will take us to 1ms hang time. The process must idle for 1ms and it must do so from here till the end of eternity. Joke. Probably not but I would hazard a guess that it won't improve within the next 20 years. It has been locked to the 0.5ms bottom since before 1995 so its a good bet it will remain there till 2035. And if it does improve it won't improve drastically by say an order of magnitude. It may very well half to 250μs but let's say that in the year 2050 we are dealing with a circumstance of sending one packet of data for a roundtrip in the same data centre four times which brings us back to 1ms = 4 * 250μs.
Keep this 1ms in mind. For our purposes we will use this number as an anchor to something concrete and rooted in real-life circumstance.
Then there are other numbers which will continue improving but not exponentially and at a decelerating rate. The reading of 1,000,000 bytes from an SSD takes approximately 200μs today and is projected to be around 100μs in two years time by the year 2017. What is significant is that this metric is on a completely different scale from network packet roundtrip times. In 2002, when these were published, to read approximately 1MB of data from SSD took over 3000μs (3ms). Five years later in 2007 that number was just under 1ms. Five years after in 2012 it was 0.3ms. In five years from then i.e. two years form now in 2017 it is projected at 0.1ms. So it is roughly improving by three times every five years. If we naively project that over the next 20 years it will be approximately be in the region of 100μs / 34 = 100μs / 81 = 1.2μs. And while we are being naive, let's assume that we can just add on time by adding on data such that if it takes 0.1ms to read 1MB of data today, it will take 1ms to read 10MB of data. And 10ms to load 100MB and so on and so forth. By that (flaky) line of reasoning in 2030 when it takes 0.0012ms to read 1MB SSD it will still take 1ms to sequentially read 1GB SSD.
Armed with this knowledge lets take 1ms as our common latency which occurs "in the wild" natively. We can always (naively) adjust the amount of data we wish to read sequentially from SSD and adjust the number of roundtrips in the data centre in order to project the same 1ms delay. If we take 1ms to be the latency we cannot avoid, what we could do to improve performance is process data concurrently where the time penalty overhead of code managing concurrent operations is nullified by 1ms latency over a number of operations e.g. over a thousand.
Let's set up an operation that does something which we can initially test for sequential performance without any latency. It will perform two branch conditional checks, a modulo arithmetic operation, a multiplication every length of collection and one return.
private static long operate(int i, int batchSize, int sleepInMs) { //http://imgur.com/usWXmhM //http://www.eecs.berkeley.edu/~rcs/research/interactive_latency.html if (sleepInMs > 0) Thread.Sleep(sleepInMs); return i % batchSize == 0 ? i * 2 : 1; }
Now that we have set that up let's run it over the size of the collection ten times, collect the performance metrics and then run that over. For this we can wrap the input and output into classes for clarity. We also might need a time measure which we can also put in a separate class.
class ToProcess { readonly short _magnitude; public ToProcess(short magnitude) { _magnitude = magnitude; } public int SleepInMs; public List<int> Items = new List<int>(); public int TotalItems { get { return Items.Count; } } public short Magnitude { get { return _magnitude; } } } class Processed { public List<List<long>> ConcTimes = new List<List<long>>(); public List<List<long>> LinearTimes = new List<List<long>>(); public bool HasLinear { get; set; } } //... static TimeMeasure _ts; //... static Processed process(ToProcess dataToProcess) { List<List<long>> logConcTimes = new List<List<long>>(); List<List<long>> logLinearTimes = new List<List<long>>(); for (int k = 0; k < 10; k++) { for (int idx = 0; idx < 10; idx++) { //linear { var range = new List<long>(); _ts = new TimeMeasure(); dataToProcess .Items .ForEach(i => range.Add(operate(i, dataToProcess.TotalItems, dataToProcess.SleepInMs)) )); excTimesLin[idx] = _ts.TimePassedInMs; } //concurrent _ts = new TimeMeasure(); var concRange = dataToProcess .Items .ForEachConcurrent(i => operate(i, dataToProcess.TotalItems, dataToProcess.SleepInMs), false, dataToProcess.Magnitude); excTimesConc[idx] = _ts.TimePassedInMs; } logConcTimes.Add(excTimesConc.ToList()); logLinearTimes.Add(excTimesLin.ToList()); } return new Processed { ConcTimes = logConcTimes, LinearTimes = logLinearTimes, HasLinear = true }; } class TimeMeasure { readonly DateTime _start = DateTime.Now; public long TimePassedInMs { get { return Convert.ToInt64(Math.Ceiling(DateTime.Now.Subtract(_start).TotalMilliseconds)); } } }
Another disclaimer: I am not a fan of "benchmarks" so I've devised an (overly) complex system to log and report execution times over a single input set many times and print average execution times. Moving along, we then calculate the average, median and mode times of running the operations 100 times in succession. Let's put it all together in a console app and see what results we can observe.
static readonly long[] excTimesLin = new long[10] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; static readonly long[] excTimesConc = new long[10] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; static void Main(string[] args) { ToProcess input = parseInput(args); Processed output = process(input); prompt(output); var newInput = gatherInput(); if (!string.IsNullOrEmpty(newInput)) Main(newInput.Split(new[] { ' ' })); } static ToProcess parseInput(string[] args) { int batchSize = args.Length < 1 ? 100 : int.Parse(args[0]); short magnitude = args.Length < 2 ? (short)2 : short.Parse(args[1]); var integers = new List<int>(Enumerable.Range(1, batchSize)); integers.Reverse(); var ret = new ToProcess(magnitude) { Items = integers, SleepInMs = args.Length > 2 ? int.Parse(args[2]) : 0 }; Console.WriteLine("args:{0}", ret); return ret; } static void prompt(Processed output) { if (output.HasLinear) { print(output.LinearTimes, "Linear"); Console.WriteLine(""); } print(output.ConcTimes, "Concurrent"); } static void prompt(Processed output) { if (output.HasLinear) { print(output.LinearTimes, "Linear"); Console.WriteLine(""); } print(output.ConcTimes, "Concurrent"); } static void print(List<List<long>> outputTimes, string title) { var concTime = new List<decimal>(); foreach (var concTimes in outputTimes) concTime.Add(concTimes.Mean(itm => itm)); Console.WriteLine("{0}", title); print(concTime.ToArray(), ""); Console.WriteLine(""); Console.WriteLine("{1} Ttl: >{0} ms", outputTimes.Sum(itm => itm.Sum()), title); } static void print(decimal[] excTimes, string label) { Console.Write("{0} TTE (in ms):", label.Trim()); Console.WriteLine(""); Console.Write("avg {0}, ", excTimes.Mean(itm => itm).ToString("#0.0", CultureInfo.InvariantCulture)); Console.Write("median {0}, ", excTimes.Median(itm => itm).ToString("#0.0", CultureInfo.InvariantCulture)); Console.Write("modes {"); Console.WriteLine(excTimes.Modes(itm => itm).ToXsv(", ", itm => itm.ToString("#0.0", CultureInfo.InvariantCulture)) + "}"); Console.WriteLine(""); } static string gatherInput() { Console.WriteLine(""); Console.WriteLine(""); Console.WriteLine("Run again? (cls + 'Return' clears screen; leave blank + 'Return' to quit)"); Console.Write("(space delimited) total items[, magnitude[,delay (in ms)]]>"); var inp = Console.ReadLine(); if (!string.IsNullOrEmpty(inp)) if (inp.ToLowerInvariant() == "cls") { Console.Clear(); Console.Beep(); ConsoleX.ClearConsoleHistory(); return gatherInput(); } return inp; }
Additionally we'll need extensions to calculate mean, median and average numbers over integer collections.
public static decimal Median<T>(this IEnumerable<T> xs, Func<T, decimal> f) { var ys = xs.OrderBy(x => x).Select(f).ToList(); double mid = (ys.Count - 1) / 2.0; return (ys[(int)(mid)] + ys[(int)(mid + 0.5)]) / 2; } public static decimal Mean<T>(this IEnumerable<T> list, Func<T, decimal> selector) { return list.Average(selector); } public static IEnumerable<decimal> Modes<T>(this IEnumerable<T> list, Func<T, decimal> f) { var modesList = list .GroupBy(f) .Select(grp => new { Value = grp.Key, Occurrences = grp.Count(), }) .ToList(); int maxOccurrence = modesList.Max(g => g.Occurrences); return modesList .Where(x => x.Occurrences == maxOccurrence && maxOccurrence > 1) // Thanks Rui! .Select(x => x.Value); }
Side-note: Contrary to my preference I am supplying the console application project (in Visual Studio 2013 - now available with Professional edition equivalent for free-as-in-beer) with implementation for reader to follow the rest of this post. Full code available for download via Dropbox
Running the entire Main procedure a few times to get the memory and caches "warmed up" and eliminate as much ambiguity due to loading. On the "first" run over a 1000 operations the output is predictable. Given the miniscule latencies in nanoseconds of clock cycles as well as L1 and L2 cache references, the sequential operations run two orders of magnitude better than the concurrent and batched counterparts. We can see this with the time to execute (TTE) of linear on average is 0.2ms with 0.4, 0.2 and 0.0 being the most recurring numbers (modes). This means that a good portion of the operations don't even register on the millisecond scale, probably running well under micro-seconds too. The total time to execute the same 1000 operations in 10 * 10 loop (i.e. 100,000 operations in total) is 24ms. Concurrent times are double an order of magnitude worse (on average and in total). In total it takes concurrently batched operation 1.2 seconds to execute 100,000 operations. So hands down winner - sequentially linear execution, no doubt. In fact, if we continue increasing the number of operation to 1,000,000, 10,000,000 etc. the numbers look predictably the same. Linear wins hands down every time.
(space delimited) total items[, magnitude[,delay (in ms)]]>1000 args:1000 | 2^2 * 8 => 32 | 0 Linear TTE (in ms): avg 0.2, median 0.2, modes {0.4, 0.2, 0.0} Linear Ttl: >24 ms Concurrent TTE (in ms): avg 11.9, median 11.5, modes {11.3, 11.4, 11.5} Concurrent Ttl: >1190 ms ------------------------------------------------------------------------- Run again? (cls + 'Return' clears screen; leave blank + 'Return' to quit) (space delimited) total items[, magnitude[,delay (in ms)]]>10000 args:10000 | 2^2 * 8 => 32 | 0 Linear TTE (in ms): avg 2.0, median 1.8, modes {1.7} Linear Ttl: >197 ms Concurrent TTE (in ms): avg 140.5, median 141.8, modes {} Concurrent Ttl: >14053 ms ------------------------------------------------------------------------- Run again? (cls + 'Return' clears screen; leave blank + 'Return' to quit) (space delimited) total items[, magnitude[,delay (in ms)]]>100000 args:100000 | 2^2 * 8 => 32 | 0 Linear TTE (in ms): avg 15.0, median 14.3, modes {12.7} Linear Ttl: >1504 ms Concurrent TTE (in ms): avg 1718.1, median 1701.6, modes {} Concurrent Ttl: >171814 ms
So far we have seen that this entire concurrent thing has no place when computing with local resources. To see where it does have an impact let's change up the input a bit.
Let's run over a 1000 operations only this time let's create an artificial delay in the operation of 1ms such that the thread is suspended for 1ms in addition to the time it takes to execute everything else.
(space delimited) total items[, magnitude[,delay (in ms)]]>1000 2 1 args:1000 | 2^2 * 8 => 32 | 1 Linear TTE (in ms): avg 2018.3, median 2006.2, modes {2002.0} Linear Ttl: >201831 ms Concurrent TTE (in ms): avg 265.5, median 249.0, modes {} Concurrent Ttl: >26549 ms
Quite a different picture. Just one millisecond delay in the operation over a 1000 operations and the average time for linear is 2 seconds where as concurrent is 265ms. That's an order of magnitude better for concurrent if you're keeping score. The total time to execute 10 * 10 loops of 1000 operations (i.e. 100,000 operations in total) linearly is over 3 minutes and concurrently is under half a minute. If we start increasing latency times to 2 or even 20ms we'll have to drop the number of operations to 100 because linear time takes too long for my patience.
(space delimited) total items[, magnitude[,delay (in ms)]]>100 2 2 args:100 | 2^2 * 8 => 32 | 2 Linear TTE (in ms): avg 301.1, median 301.0, modes {301.0} Linear Ttl: >30113 ms Concurrent TTE (in ms): avg 13.6, median 11.8, modes {11.6} Concurrent Ttl: >1355 ms ------------------------------------------------------------------------ Run again? (cls + 'Return' clears screen; leave blank + 'Return' to quit) (space delimited) total items[, magnitude[,delay (in ms)]]>10 2 20 args:10 | 2^2 * 8 => 32 | 20 Linear TTE (in ms): avg 211.7, median 211.2, modes {211.0} Linear Ttl: >21169 ms Concurrent TTE (in ms): avg 49.9, median 50.9, modes {} Concurrent Ttl: >4992 ms
So to answer the original question: are there any performance benefits? You would first have to identify where and why there are latencies to being with.
100 operations with a 2ms delay over 10 * 10 loops (i.e. total of 10,000 operations) takes 300ms on average and the total time is 30 seconds linearly vs 13.6ms on average and total of 1.3 seconds concurrently. Depending on the data set size that's a difference between re-architecting an entire user process to serve data in real time or schedule it to trigger sending of an email. Even for small sets of 10 operations over 10 * 10 loops (i.e. 1000 in total) a 20ms delay is a difference between 20s in total and 5s in total which frees up server resources quicker, allowing handling of ten times as many requests per second with same infrastructure.
I would not recommend implementing ForEachConcurrent upfront or in anticipation of (imagined) latencies where there probably aren't any. But post-release of initial versions; when knowledge and log data confirm an understanding that there are indeed and in fact parts of the process where connecting to remote services in the data center or on the www are taking up time which affects end-users... there and then you might want to consider this approach before rewriting entire flow or upgrading infrastructure.
Applying this technique wisely, strategically and discretely in your existing code base could save you a lot of time and perhaps money. As long as you first have a good grasp on where to apply it and a firm grip on why and how to apply it.















