Nesting await in Parallel.ForEach
In a metro app, I need to execute a number of WCF calls. There are a significant number of calls to be made, so I need to do them in a parallel loop. The problem is that the parallel loop exits before the WCF calls are all complete.
How would you refactor this to work as expected?
var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new System.Collections.Concurrent.BlockingCollection<Customer>();
Parallel.ForEach(ids, async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
});
foreach ( var customer in customers )
{
Console.WriteLine(customer.ID);
}
Console.ReadKey();
The whole idea behind Parallel.ForEach()
is that you have a set of threads and each thread processes part of the collection. As you noticed, this doesn't work with async
-await
, where you want to release the thread for the duration of the async call.
You could “fix” that by blocking the ForEach()
threads, but that defeats the whole point of async
-await
.
What you could do is to use TPL Dataflow instead of Parallel.ForEach()
, which supports asynchronous Task
s well.
Specifically, your code could be written using a TransformBlock
that transforms each id into a Customer
using the async
lambda. This block can be configured to execute in parallel. You would link that block to an ActionBlock
that writes each Customer
to the console.
After you set up the block network, you can Post()
each id to the TransformBlock
.
In code:
var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var getCustomerBlock = new TransformBlock<string, Customer>(
async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
writeCustomerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
foreach (var id in ids)
getCustomerBlock.Post(id);
getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();
Although you probably want to limit the parallelism of the TransformBlock
to some small constant. Also, you could limit the capacity of the TransformBlock
and add the items to it asynchronously using SendAsync()
, for example if the collection is too big.
As an added benefit when compared to your code (if it worked) is that the writing will start as soon as a single item is finished, and not wait until all of the processing is finished.
svick's answer is (as usual) excellent.
However, I find Dataflow to be more useful when you actually have large amounts of data to transfer. Or when you need an async
-compatible queue.
In your case, a simpler solution is to just use the async
-style parallelism:
var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customerTasks = ids.Select(i =>
{
ICustomerRepo repo = new CustomerRepo();
return repo.GetCustomer(i);
});
var customers = await Task.WhenAll(customerTasks);
foreach (var customer in customers)
{
Console.WriteLine(customer.ID);
}
Console.ReadKey();
Using DataFlow as svick suggested may be overkill, and Stephen's answer does not provide the means to control the concurrency of the operation. However, that can be achieved rather simply:
public static async Task RunWithMaxDegreeOfConcurrency<T>(
int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
var activeTasks = new List<Task>(maxDegreeOfConcurrency);
foreach (var task in collection.Select(taskFactory))
{
activeTasks.Add(task);
if (activeTasks.Count == maxDegreeOfConcurrency)
{
await Task.WhenAny(activeTasks.ToArray());
//observe exceptions here
activeTasks.RemoveAll(t => t.IsCompleted);
}
}
await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t =>
{
//observe exceptions in a manner consistent with the above
});
}
The ToArray()
calls can be optimized by using an array instead of a list and replacing completed tasks, but I doubt it would make much of a difference in most scenarios. Sample usage per the OP's question:
RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
});
EDIT Fellow SO user and TPL wiz Eli Arbel pointed me to a related article from Stephen Toub. As usual, his implementation is both elegant and efficient:
public static Task ForEachAsync<T>(
this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current).ContinueWith(t =>
{
//observe exceptions
});
}));
}
You can save effort with the new AsyncEnumerator NuGet Package, which didn't exist 4 years ago when the question was originally posted. It allows you to control the degree of parallelism:
using System.Collections.Async;
...
await ids.ParallelForEachAsync(async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
},
maxDegreeOfParallelism: 10);
Disclaimer: I'm the author of the AsyncEnumerator library, which is open source and licensed under MIT, and I'm posting this message just to help the community.