Nodejs create simple queue on Async
in this below sample code i can run multi function on the same time
Promise.all([sendMoneyToRequestedUser(_data), saveTransferMoneyTransaction(_data)])
.then(function (results) {
log.info("OOOOOOOOOOOOOo");
}).catch(function (error) {
log.info(error)
});
but i want to create simple queue for them, for example, can i implementing that with Promise
with this solution? I have 3 functions check
, send
, and post
, that I want to run in series and pass the result of each step to the other.
Step 1, check()
function, Step 2, send()
function and then after finish them execute function for example post()
for this implementation i need to get result of each steps, for example get result from Step 1, if its return true then on Step 2 i need to use result of Step 1,
is async.parallel
solution for this implementation? or Promise
can doing that
Update:
There's a module for that: https://www.npmjs.com/package/promise-queue
Old answer:
I was having issues while trying to use all kinds of modules, and finally I wrote the simplest implementation I could think of for this kind of work.
Check out this simple class I wrote (Plain JS):
class Queue {
constructor(maxSimultaneously = 1) {
this.maxSimultaneously = maxSimultaneously;
this.__active = 0;
this.__queue = [];
}
/** @param { () => Promise<T> } func
* @template T
* @returns {Promise<T>}
*/
async enqueue(func) {
if(++this.__active > this.maxSimultaneously) {
await new Promise(resolve => this.__queue.push(resolve));
}
try {
return await func();
} catch(err) {
throw err;
} finally {
this.__active--;
if(this.__queue.length) {
this.__queue.shift()();
}
}
}
}
Use it like this:
Lets say you have this async function:
const printNumber = async (n) => {
await new Promise(res => setTimeout(res, 2000)); // wait 2 sec
console.log(n);
}
so, instead of:
await printNumber(1);
await printNumber(2);
await printNumber(3);
await printNumber(4);
use:
const q = new Queue();
q.enqueue(() => printNumber(1));
q.enqueue(() => printNumber(2));
q.enqueue(() => printNumber(3));
q.enqueue(() => printNumber(4));
Each function will be executed once the others are done.
The output:
1 // after 2 sec
2 // after 4 sec
3 // after 6 sec
4 // after 8 sec
Or you can limit the queue to run up to some number of functions at the same time:
const q = new Queue(3);
q.enqueue(() => printNumber(1));
q.enqueue(() => printNumber(2));
q.enqueue(() => printNumber(3));
q.enqueue(() => printNumber(4));
The output:
1 // after 2 sec
2 // after 2 sec
3 // after 2 sec
4 // after 4 sec
Also, the enqueue method will return/throw the original data from your promise!
Lets suppose you write an API to upload files, and you want to limit the uploads to up to 5 at the same time. You want everything to stay the same as it was, without changing your flow. Here's how you can do that:
async function upload(data) {
// upload...
if(something) {
return 200;
} else {
throw 400;
}
}
So, instead of doing this:
async function work(data) {
// do something...
return await upload(data);
}
do this:
const q = new Queue(5); // up to 5 at the same time
async function work(data) {
// do something...
return await q.enqueue(() => upload(data));
}
class Queue {
constructor(maxSimultaneously = 1) {
this.maxSimultaneously = maxSimultaneously;
this.__active = 0;
this.__queue = [];
}
/** @param { () => Promise<T> } func
* @template T
* @returns {Promise<T>}
*/
async enqueue(func) {
if(++this.__active > this.maxSimultaneously) {
await new Promise(resolve => this.__queue.push(resolve));
}
try {
return await func();
} catch(err) {
throw err;
} finally {
this.__active--;
if(this.__queue.length) {
this.__queue.shift()();
}
}
}
}
const printNumber = async (n) => {
await new Promise(res => setTimeout(res, 2000)); // wait 2 sec
console.log(n);
}
async function start() {
console.log('starting...');
const q = new Queue();
q.enqueue(() => printNumber(1));
q.enqueue(() => printNumber(2));
q.enqueue(() => printNumber(3));
q.enqueue(() => printNumber(4));
}
Click this to run 1 log per 2 sec: <button onclick="start();">Start</button>
class Queue {
constructor(maxSimultaneously = 1) {
this.maxSimultaneously = maxSimultaneously;
this.__active = 0;
this.__queue = [];
}
/** @param { () => Promise<T> } func
* @template T
* @returns {Promise<T>}
*/
async enqueue(func) {
if(++this.__active > this.maxSimultaneously) {
await new Promise(resolve => this.__queue.push(resolve));
}
try {
return await func();
} catch(err) {
throw err;
} finally {
this.__active--;
if(this.__queue.length) {
this.__queue.shift()();
}
}
}
}
const printNumber = async (n) => {
await new Promise(res => setTimeout(res, 2000)); // wait 2 sec
console.log(n);
}
async function start() {
console.log('starting...');
const q = new Queue(3);
q.enqueue(() => printNumber(1));
q.enqueue(() => printNumber(2));
q.enqueue(() => printNumber(3));
q.enqueue(() => printNumber(4));
}
Click this to run up to 3 logs every 2 sec: <button onclick="start();">Start</button>
I think that you're looking for this async.series
. Is runs an array of functions one after another in sequential order and passes an array of results to the callbacks from the previous functions.
Example
var async = require('async');
async.series([
function(callback) {
//Do a bunch of relevant stuff
callback(null, 'First function');
},
function(callback) {
//Do some more relevant stuff
callback(null, 'Second function');
}
],
function(err, results) {
console.log(results); //Logs ['First function', 'Second function']
});
UPDATE: The question is not really clear on what is given, and what is desired. From the comments, I've got the following:
three functions of check
, send
, and post
are given and it is desired to be run in series.
Let's say that the mentioned check
, send
, and post
all return Promises, e.g.:
function check() {
return new Promise(function (resolve, reject) {
// Do some stuff and save them in a var 'results'
var results = ...;
resolve(results);
});
}
Then you can simply build a queue as follows:
check()
.then(send)
.then(post)
.catch(function(err) { console.error(err); })
At each level the function is called with the value resolved from the previous level.