How to limit the concurrency of flatMap?

I'm trying to use RxJS to write a script to process several hundreds of log files, each of which is about 1GB. The skeleton of the script looks like

Rx.Observable.from(arrayOfLogFilePath)
.flatMap(function(logFilePath){
   return Rx.Node.fromReadStream(logFilePath)
   .filter(filterLogLine)
})
.groupBy(someGroupingFunc)
.map(someFurtherProcessing)
.subscribe(...)

The code works, but notice that the filtering step of all log files will start concurrently. However, from file system IO performance perspective, it is preferable to process one file after another (or at least to limit the concurrency to a few files rather than opening all hundreds of files in the same time). In this regard, how can I implement it in a "functional reactive way"?

I had thought of scheduler but could not figure out how it can help here.


You can use .merge(maxConcurrent) to limit the concurrency. Because .merge(maxConcurrent) flattens a metaobservable (observable of observables) into an observable, you need to replace the .flatMap with .map so that the output is a metaobservable ("unflat"), then you call .merge(maxConcurrent).

Rx.Observable.from(arrayOfLogFilePath)
.map(function(logFilePath){
   return Rx.Node.fromReadStream(logFilePath)
   .filter(filterLogLine)
})
.merge(2) // 2 concurrent 
.groupBy(someGroupingFunc)
.map(someFurtherProcessing)
.subscribe(...)

This code hasn't been tested (since I don't have access to the development environment you have), but this is how to proceed. RxJS doesn't have many operators with concurrency parameters, but you can almost always do what you need with .merge(maxConcurrent).


I have just solved a similar problem with RxJs 5, so I hope the solution can help others with a similar problem.

// Simulate always processing 2 requests in parallel (when one is finished it starts processing one more),
// retry two times, push error on stream if retry fails.

//const Rx = require('rxjs-es6/Rx');

// -- Global variabel just to show that it works. --
let parallelRequests = 0;
// --------------------------------------------------

function simulateRequest(req) {
    console.log("Request " + req);
    // --- To log retries ---
    var retry = 0;
    // ----------------------

    // Can't retry a promise, need to restart before the promise is made.
    return Rx.Observable.of(req).flatMap(req => new Promise((resolve, reject) => {

        var random = Math.floor(Math.random() * 2000);
        // -- To show that it works --
        if (retry) {
            console.log("Retrying request " + req + " ,retry " + retry);
        } else {

            parallelRequests++;
        }
        // ---------------------------
        setTimeout(() => {
            if (random < 900) {
                retry++;
                return reject(req + " !!!FAILED!!!");
            }

            return resolve(req);
        }, random);
    })).retry(2).catch(e => Rx.Observable.of(e));
}

Rx.Observable.range(1, 10)
    .flatMap(e => simulateRequest(e), null, 2)
    // -- To show that it works --
    .do(() => {
        console.log("ParallelRequests " + parallelRequests);
        parallelRequests--;
    })
    // ---------------------------
    .subscribe(e => console.log("Response from request " + e), e => console.log("Should not happen, error: " + e), e => console.log("Finished"));
<script src="https://npmcdn.com/@reactivex/[email protected]/dist/global/Rx.umd.js"></script>