Iterating over a mongodb cursor serially (waiting for callbacks before moving to next document)
Using mongoskin, I can do a query like this, which will return a cursor:
myCollection.find({}, function(err, resultCursor) {
resultCursor.each(function(err, result) {
}
}
However, I'd like to call some async functions for each document, and only move on to the next item on the cursor after this has called back (similar to the eachSeries structure in the async.js module). E.g:
myCollection.find({}, function(err, resultCursor) {
resultCursor.each(function(err, result) {
externalAsyncFunction(result, function(err) {
//externalAsyncFunction completed - now want to move to next doc
});
}
}
How could I do this?
Thanks
UPDATE:
I don't wan't to use toArray()
as this is a large batch operation, and the results might not fit in memory in one go.
Solution 1:
A more modern approach that uses async
/await
:
const cursor = db.collection("foo").find({});
while(await cursor.hasNext()) {
const doc = await cursor.next();
// process doc here
}
Notes:
- This may be even more simple to do when async iterators arrive.
- You'll probably want to add try/catch for error checking.
- The containing function should be
async
or the code should be wrapped in(async function() { ... })()
since it usesawait
. - If you want, add
await new Promise(resolve => setTimeout(resolve, 1000));
(pause for 1 second) at the end of the while loop to show that it does process docs one after the other.
Solution 2:
If you don't want to load all of the results into memory using toArray, you can iterate using the cursor with something like the following.
myCollection.find({}, function(err, resultCursor) {
function processItem(err, item) {
if(item === null) {
return; // All done!
}
externalAsyncFunction(item, function(err) {
resultCursor.nextObject(processItem);
});
}
resultCursor.nextObject(processItem);
}
Solution 3:
since node.js v10.3 you can use async iterator
const cursor = db.collection('foo').find({});
for await (const doc of cursor) {
// do your thing
// you can even use `await myAsyncOperation()` here
}
Jake Archibald wrote a great blog post about async iterators, that I came to know after reading @user993683's answer.
Solution 4:
This works with large dataset by using setImmediate:
var cursor = collection.find({filter...}).cursor();
cursor.nextObject(function fn(err, item) {
if (err || !item) return;
setImmediate(fnAction, item, arg1, arg2, function() {
cursor.nextObject(fn);
});
});
function fnAction(item, arg1, arg2, callback) {
// Here you can do whatever you want to do with your item.
return callback();
}
Solution 5:
If someone is looking for a Promise way of doing this (as opposed to using callbacks of nextObject), here it is. I am using Node v4.2.2 and mongo driver v2.1.7. This is kind of an asyncSeries version of Cursor.forEach()
:
function forEachSeries(cursor, iterator) {
return new Promise(function(resolve, reject) {
var count = 0;
function processDoc(doc) {
if (doc != null) {
count++;
return iterator(doc).then(function() {
return cursor.next().then(processDoc);
});
} else {
resolve(count);
}
}
cursor.next().then(processDoc);
});
}
To use this, pass the cursor and an iterator that operates on each document asynchronously (like you would for Cursor.forEach). The iterator needs to return a promise, like most mongodb native driver functions do.
Say, you want to update all documents in the collection test
. This is how you would do it:
var theDb;
MongoClient.connect(dbUrl).then(function(db) {
theDb = db; // save it, we'll need to close the connection when done.
var cur = db.collection('test').find();
return forEachSeries(cur, function(doc) { // this is the iterator
return db.collection('test').updateOne(
{_id: doc._id},
{$set: {updated: true}} // or whatever else you need to change
);
// updateOne returns a promise, if not supplied a callback. Just return it.
});
})
.then(function(count) {
console.log("All Done. Processed", count, "records");
theDb.close();
})