Partition: How to add a wait after every partition
I have an API that has accepts 20 requests per minute, after that, I need to wait for 1 minute before querying it. I have a list of items (usually 1000+) whose details I need to query from the API, my thought was I could use Partitioner
to partition my list into 20 items/requests but soon I realized the Partitioner
does not work like that, my 2nd thought was adding a delay
in the partition but that too is a bad idea, from my understanding it adds a delay after every request which is not needed, instead, I need a delay after every Partition
. Below is my code:
public static async Task<IEnumerable<V>> ForEachAsync<T, V>(this IEnumerable<T> source,
int degreeOfParallelism, Func<T, Task<V>> body, CancellationToken token,
[Optional] int delay)
{
var whenAll = await Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
select Task.Run(async delegate {
var allResponses = new List<V>();
using (partition)
while (partition.MoveNext())
{
allResponses.Add(await body(partition.Current));
await Task.Delay(TimeSpan.FromSeconds(delay));
}
return allResponses;
}, token));
return whenAll.SelectMany(x => x);
}
Does anyone know how I can accomplish this?
Here is a RateLimiter
class that you could use in order to limit the frequency of the asynchronous operations. It is a simpler implementation of the RateLimiter
class that is found in this answer.
/// <summary>
/// Limits the number of workflows that can access a resource during the
/// specified time span.
/// </summary>
public class RateLimiter : IDisposable
{
private readonly SemaphoreSlim _semaphore;
private readonly TimeSpan _timeUnit;
private readonly CancellationTokenSource _disposeCts;
private readonly CancellationToken _disposeToken;
private bool _disposed;
public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
{
if (maxActionsPerTimeUnit < 1)
throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
throw new ArgumentOutOfRangeException(nameof(timeUnit));
_semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
_timeUnit = timeUnit;
_disposeCts = new CancellationTokenSource();
_disposeToken = _disposeCts.Token;
}
public async Task WaitAsync(CancellationToken cancellationToken = default)
{
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
ScheduleSemaphoreRelease();
}
private async void ScheduleSemaphoreRelease()
{
try { await Task.Delay(_timeUnit, _disposeToken).ConfigureAwait(false); }
catch (OperationCanceledException) { } // Ignore
lock (_semaphore) { if (!_disposed) _semaphore.Release(); }
}
/// <summary>Call Dispose when you are finished using the RateLimiter.</summary>
public void Dispose()
{
lock (_semaphore)
{
if (_disposed) return;
_semaphore.Dispose();
_disposed = true;
_disposeCts.Cancel();
_disposeCts.Dispose();
}
}
}
Usage example:
List<string> urls = GetUrls();
using var rateLimiter = new RateLimiter(20, TimeSpan.FromMinutes(1.0));
string[] documents = await Task.WhenAll(urls.Select(async url =>
{
await rateLimiter.WaitAsync();
return await _httpClient.GetStringAsync(url);
}));
Note: I added a Dispose
method so that the asynchronous operations initiated internally by the RateLimiter
class can be canceled. This
method should be called when you are finished using the RateLimiter
, otherwise the pending asynchronous operations will prevent
the RateLimiter
from being garbage collected in a timely manner, on top of consuming resources associated with active Task.Delay
tasks.
The original very simple but leaky implementation can be found in the 2nd revision of this answer.
I am adding an alternative implementation of the RateLimiter
class, more complex, which is based on a Stopwatch
instead of a SemaphoreSlim
. It has the advantage that it doesn't need to be disposable, since it's not launching hidden asynchronous operations in the background. The disadvantages are that the WaitAsync
method does not support a CancellationToken
argument, and that the probability of bugs is higher because of the complexity.
public class RateLimiter
{
private readonly Stopwatch _stopwatch;
private readonly Queue<TimeSpan> _queue;
private readonly int _maxActionsPerTimeUnit;
private readonly TimeSpan _timeUnit;
public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
{
// Arguments validation omitted
_stopwatch = Stopwatch.StartNew();
_queue = new Queue<TimeSpan>();
_maxActionsPerTimeUnit = maxActionsPerTimeUnit;
_timeUnit = timeUnit;
}
public Task WaitAsync()
{
var delay = TimeSpan.Zero;
lock (_stopwatch)
{
var currentTimestamp = _stopwatch.Elapsed;
while (_queue.Count > 0 && _queue.Peek() < currentTimestamp)
{
_queue.Dequeue();
}
if (_queue.Count >= _maxActionsPerTimeUnit)
{
var refTimestamp = _queue
.Skip(_queue.Count - _maxActionsPerTimeUnit).First();
delay = refTimestamp - currentTimestamp;
Debug.Assert(delay >= TimeSpan.Zero);
if (delay < TimeSpan.Zero) delay = TimeSpan.Zero; // Just in case
}
_queue.Enqueue(currentTimestamp + delay + _timeUnit);
}
if (delay == TimeSpan.Zero) return Task.CompletedTask;
return Task.Delay(delay);
}
}