Choose proper async method for batch processing for max requests/sec

I need to perform a cyclic call to some external API with some delay, to prevent from 'User Rate Limit Exceeded' restriction.

Google Maps Geocoding API is sensitive to 'req/sec', allowing 10 req/sec. I should make geocoding for hundreds of my contacts, and such delay is required. So, I need have a 10 async geocoding functions with post-delay in 1 sec for each. So, I collect all contacts in array, and then I loop through array in async manner.

Generally, I need to have a N simultaneous threads, with a delay in D msecs in the end of each thread. Entire loop iterates over an array of User entities. Each thread process single entity, as usual.

I suppose to have a code like:

const N = 10;   # threads count
const D = 1000; # delay after each execution

var processUser = function(user, callback){ 
  someBusinessLogicProc(user, function(err) {
    setTimeout(function() {
      return callback(err);
    }, D);
  });      
 }

 var async = require('async') ;
 var people = new Array(900);

 async.batchMethod(people, processUser, N, finalCallback);

In this pseudocode batchMethod is a method that I am asking for.


Solution 1:

Putting a delay on the results is not really what you want. Instead, you want to keep track of what you've sent and when you sent it so as soon as you fall under the requests per second boundary, you can send another request.


Here's a general concept for a function that will control rate limiting for you to a fixed number of requests per second. This uses promises and requires that you supply a request function that returns a promise (if you aren't using promises now, you just need to wrap your request function in a promise).

// pass the following arguments:
//   array - array of values to iterate
//   requestsPerSec - max requests per second to send (integer)
//   maxInFlight - max number of requests in process at a time
//   fn - function to process an array value
//        function is passed array element as first argument
//        function returns a promise that is resolved/rejected when async operation is done
// Returns: promise that is resolved with an array of resolves values
//          or rejected with first error that occurs
function rateLimitMap(array, requestsPerSec, maxInFlight, fn) {
    return new Promise(function(resolve, reject) {
        var index = 0;
        var inFlightCntr = 0;
        var doneCntr = 0;
        var launchTimes = [];
        var results = new Array(array.length);

        // calculate num requests in last second
        function calcRequestsInLastSecond() {
            var now = Date.now();
            // look backwards in launchTimes to see how many were launched within the last second
            var cnt = 0;
            for (var i = launchTimes.length - 1; i >= 0; i--) {
                if (now - launchTimes[i] < 1000) {
                    ++cnt;
                } else {
                    break;
                }
            }
            return cnt;            
        }

        function runMore() {
            while (index < array.length && inFlightCntr < maxInFlight && calcRequestsInLastSecond() < requestsPerSec) {
                (function(i) {
                    ++inFlightCntr;
                    launchTimes.push(Date.now());
                    fn(array[i]).then(function(val) {
                        results[i] = val;
                        --inFlightCntr;
                        ++doneCntr;
                        runMore();
                    }, reject);
                })(index);
                ++index;
            }
            // see if we're done
            if (doneCntr === array.length) {
                resolve(results);
            } else if (launchTimes.length >= requestsPerSec) {
                // calc how long we have to wait before sending more
                var delta = 1000 - (Date.now() - launchTimes[launchTimes.length - requestsPerSec]);
                if (delta >= 0) {
                    setTimeout(runMore, ++delta);
                }

            }
        }
        runMore();
    });
}

Example Usage:

rateLimitMap(inputArrayToProcess, 9, 20, myRequestFunc).then(function(results) {
    // process array of results here
}, function(err) {
    // process error here
});

A more advanced version of this function called rateMap() is here on Github.


The general idea behind this code is this:

  1. You pass in an array to iterate through
  2. It returns a promise who's resolved value is an array of results (in order)
  3. You pass a max number of requestsPerSec to ever hit
  4. You pass a max number of requests in flight at the same time
  5. You pass a function that will be passed an element from the array that is being iterated and must return a promise
  6. It keeps an array of timestamps when a request was last sent.
  7. To see if another request can be sent, it looks backwards in the array and counts how many requests were sent in the last second.
  8. If that number is lower than the threshold, then it sends another one.
  9. If that number meets the threshold, then it calciulates how long you have to wait to send another one and set a timer for that amount of time.
  10. Upon completion of each request, it checks to see if it can send more
  11. If any request rejects its promise, then the returned promise rejects immediately. If you don't want it to stop upon first error, then modify your passed in function to not reject, but to resolve with some value that you can identify as a failed request later when processing the results.

Here's a working simulation: https://jsfiddle.net/jfriend00/3gr0tq7k/

Note: If the maxInFlight value you pass in is higher than the requestsPerSec value, then this function will basically just send requestsPerSec requests and then one second later, send another requestsPerSec requests since that's the quickest way to stay under the requestsPerSec boundary. If the maxInFlight value is the same or lower than requestsPerSec then it will send requestsPerSec and then as each request finishes, it will see if it can send another one.