Is there anything like asynchronous BlockingCollection<T>?
I would like to await
on the result of BlockingCollection<T>.Take()
asynchronously, so I do not block the thread. Looking for anything like this:
var item = await blockingCollection.TakeAsync();
I know I could do this:
var item = await Task.Run(() => blockingCollection.Take());
but that kinda kills the whole idea, because another thread (of ThreadPool
) gets blocked instead.
Is there any alternative?
There are four alternatives that I know of.
The first is Channels, which provides a threadsafe queue that supports asynchronous Read
and Write
operations. Channels are highly optimized and optionally support dropping some items if a threshold is reached.
The next is BufferBlock<T>
from TPL Dataflow. If you only have a single consumer, you can use OutputAvailableAsync
or ReceiveAsync
, or just link it to an ActionBlock<T>
. For more information, see my blog.
The last two are types that I created, available in my AsyncEx library.
AsyncCollection<T>
is the async
near-equivalent of BlockingCollection<T>
, capable of wrapping a concurrent producer/consumer collection such as ConcurrentQueue<T>
or ConcurrentBag<T>
. You can use TakeAsync
to asynchronously consume items from the collection. For more information, see my blog.
AsyncProducerConsumerQueue<T>
is a more portable async
-compatible producer/consumer queue. You can use DequeueAsync
to asynchronously consume items from the queue. For more information, see my blog.
The last three of these alternatives allow synchronous and asynchronous puts and takes.
...or you can do this:
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
public class AsyncQueue<T>
{
private readonly SemaphoreSlim _sem;
private readonly ConcurrentQueue<T> _que;
public AsyncQueue()
{
_sem = new SemaphoreSlim(0);
_que = new ConcurrentQueue<T>();
}
public void Enqueue(T item)
{
_que.Enqueue(item);
_sem.Release();
}
public void EnqueueRange(IEnumerable<T> source)
{
var n = 0;
foreach (var item in source)
{
_que.Enqueue(item);
n++;
}
_sem.Release(n);
}
public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken))
{
for (; ; )
{
await _sem.WaitAsync(cancellationToken);
T item;
if (_que.TryDequeue(out item))
{
return item;
}
}
}
}
Simple, fully functional asynchronous FIFO queue.
Note:
SemaphoreSlim.WaitAsync
was added in .NET 4.5 before that, this was not all that straightforward.