Partition: How to add a wait after every partition

I have an API that has accepts 20 requests per minute, after that, I need to wait for 1 minute before querying it. I have a list of items (usually 1000+) whose details I need to query from the API, my thought was I could use Partitioner to partition my list into 20 items/requests but soon I realized the Partitioner does not work like that, my 2nd thought was adding a delay in the partition but that too is a bad idea, from my understanding it adds a delay after every request which is not needed, instead, I need a delay after every Partition. Below is my code:

public static async Task<IEnumerable<V>> ForEachAsync<T, V>(this IEnumerable<T> source,
    int degreeOfParallelism, Func<T, Task<V>> body, CancellationToken token,
    [Optional] int delay)
{
    var whenAll = await Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
        select Task.Run(async delegate {
            var allResponses = new List<V>();
            using (partition)
                while (partition.MoveNext())
                {
                    allResponses.Add(await body(partition.Current));
                    await Task.Delay(TimeSpan.FromSeconds(delay));
                }
            return allResponses;
        }, token));
    return whenAll.SelectMany(x => x);
}

Does anyone know how I can accomplish this?


Here is a RateLimiter class that you could use in order to limit the frequency of the asynchronous operations. It is a simpler implementation of the RateLimiter class that is found in this answer.

/// <summary>
/// Limits the number of workflows that can access a resource during the
/// specified time span.
/// </summary>
public class RateLimiter : IDisposable
{
    private readonly SemaphoreSlim _semaphore;
    private readonly TimeSpan _timeUnit;
    private readonly CancellationTokenSource _disposeCts;
    private readonly CancellationToken _disposeToken;
    private bool _disposed;

    public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    {
        if (maxActionsPerTimeUnit < 1)
            throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
        if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
            throw new ArgumentOutOfRangeException(nameof(timeUnit));
        _semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
        _timeUnit = timeUnit;
        _disposeCts = new CancellationTokenSource();
        _disposeToken = _disposeCts.Token;
    }

    public async Task WaitAsync(CancellationToken cancellationToken = default)
    {
        await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
        ScheduleSemaphoreRelease();
    }

    private async void ScheduleSemaphoreRelease()
    {
        try { await Task.Delay(_timeUnit, _disposeToken).ConfigureAwait(false); }
        catch (OperationCanceledException) { } // Ignore
        lock (_semaphore) { if (!_disposed) _semaphore.Release(); }
    }

    /// <summary>Call Dispose when you are finished using the RateLimiter.</summary>
    public void Dispose()
    {
        lock (_semaphore)
        {
            if (_disposed) return;
            _semaphore.Dispose();
            _disposed = true;
            _disposeCts.Cancel();
            _disposeCts.Dispose();
        }
    }
}

Usage example:

List<string> urls = GetUrls();

using var rateLimiter = new RateLimiter(20, TimeSpan.FromMinutes(1.0));

string[] documents = await Task.WhenAll(urls.Select(async url =>
{
    await rateLimiter.WaitAsync();
    return await _httpClient.GetStringAsync(url);
}));

Note: I added a Dispose method so that the asynchronous operations initiated internally by the RateLimiter class can be canceled. This method should be called when you are finished using the RateLimiter, otherwise the pending asynchronous operations will prevent the RateLimiter from being garbage collected in a timely manner, on top of consuming resources associated with active Task.Delay tasks. The original very simple but leaky implementation can be found in the 2nd revision of this answer.


I am adding an alternative implementation of the RateLimiter class, more complex, which is based on a Stopwatch instead of a SemaphoreSlim. It has the advantage that it doesn't need to be disposable, since it's not launching hidden asynchronous operations in the background. The disadvantages are that the WaitAsync method does not support a CancellationToken argument, and that the probability of bugs is higher because of the complexity.

public class RateLimiter
{
    private readonly Stopwatch _stopwatch;
    private readonly Queue<TimeSpan> _queue;
    private readonly int _maxActionsPerTimeUnit;
    private readonly TimeSpan _timeUnit;

    public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    {
        // Arguments validation omitted
        _stopwatch = Stopwatch.StartNew();
        _queue = new Queue<TimeSpan>();
        _maxActionsPerTimeUnit = maxActionsPerTimeUnit;
        _timeUnit = timeUnit;
    }

    public Task WaitAsync()
    {
        var delay = TimeSpan.Zero;
        lock (_stopwatch)
        {
            var currentTimestamp = _stopwatch.Elapsed;
            while (_queue.Count > 0 && _queue.Peek() < currentTimestamp)
            {
                _queue.Dequeue();
            }
            if (_queue.Count >= _maxActionsPerTimeUnit)
            {
                var refTimestamp = _queue
                    .Skip(_queue.Count - _maxActionsPerTimeUnit).First();
                delay = refTimestamp - currentTimestamp;
                Debug.Assert(delay >= TimeSpan.Zero);
                if (delay < TimeSpan.Zero) delay = TimeSpan.Zero; // Just in case
            }
            _queue.Enqueue(currentTimestamp + delay + _timeUnit);
        }
        if (delay == TimeSpan.Zero) return Task.CompletedTask;
        return Task.Delay(delay);
    }
}