async 库包含三个主要部分:Controlflow(异步流程控制)、Utils(工具类)、Collections(对 map、objects、arrays 等数据结构的异步操作)。
流程控制函数(Control Flow)
以下函数调用后立刻返回,不阻塞当前执行。
1. series(tasks, callback) — 多个函数依次执行,之间没有数据交互
执行结果保存在最终 callback(err, res) 的 res 参数中。如果某个函数调用了 callback 且第一个参数不为 null,则会立即调用最终 callback,并中断尚未执行的剩余函数。
async.series([
function(callback) {
// do some stuff ...
callback(null, 'one');
},
function(callback) {
// do some more stuff ...
callback(null, 'two');
}
],
// optional callback
function(err, results) {
// results is now equal to ['one', 'two']
});
async.series({
one: function(callback) {
setTimeout(function() {
callback(null, 1);
}, 200);
},
two: function(callback){
setTimeout(function() {
callback(null, 2);
}, 100);
}
}, function(err, results) {
// results is now equal to: {one: 1, two: 2}
});
2. parallel(tasks, callback) — 立即并行执行所有函数
最终 callback 中结果的顺序与函数在数组中的顺序一致。如果某个函数触发了错误,最终 callback 会被立即调用,结果包含已完成函数的返回值;剩余函数仍然会继续执行并调用各自的 callback,但不再触发最终 callback。传给最终 callback 的结果参数依然有效,示例如下:
async.parallel([
function (callback) {
// do some stuff ...
callback(1111, 'one');
console.log("1111111");
},
function (callback) {
// do some more stuff ...
callback(null, 'two');
console.log("22222222222222");
}
],
// optional callback
function (err, results) {
// results is now equal to ['one', 'two']
console.log(JSON.stringify(results));
sss = results;
});
setTimeout(function(){
console.log(JSON.stringify(sss)); /// 打印最终结果
},1000);
/*
输出:
["one"]
1111111
2222222222
["one","two"]
*/
3. waterfall(tasks, callback) — 多个函数依次执行,且前一个的输出为后一个的输入
如果某个函数触发了错误,后续函数不会被执行,错误会直接传递给最终 callback 并立即调用它。
/*
Runs the tasks array of functions in series, each passing their
results to the next in the array. However, if any of the tasks pass
an error to their own callback, the next function is not executed, and
the main callback is immediately called with the error.
*/
async.waterfall([
function(callback) {
callback(null, 'one', 'two');
},
function(arg1, arg2, callback) {
// arg1 now equals 'one' and arg2 now equals 'two'
callback(null, 'three');
},
function(arg1, callback) {
// arg1 now equals 'three'
callback(null, 'done');
}
], function (err, result) {
// result now equals 'done'
});
4. auto(tasks, callback) — 多个函数有依赖关系,有的并行执行,有的依次执行
/*
这里假设我要写一个程序,它要完成以下几件事:
1.从某处取得数据
2.在硬盘上建立一个新的目录
3.将数据写入到目录下某文件
4.发送邮件,将文件以附件形式发送给其它人。
分析该任务,可以知道1与2可以并行执行,3需要等1和2完成,4要等3完成。
*/
async.auto({
GetData: function (callback) {
console.log('.....GetData');
callback(null, "GetData");
}, CreateDir: function (callback) {
console.log('.....CreateDir');
callback(null, 'CreateDir');
},
WriteToFile: ["GetData", "CreateDir", function (callback, res) {
setTimeout(function () {
console.log('.....WriteToFile ' + JSON.stringify(res));
callback(null, 'WriteToFile');
}, 300);
}],
SendEmail: ["WriteToFile", function (callback, res) {
console.log('.....SendEmail ', res);
callback(null, "111");
}]
},
function (err, res) {
console.log('.....done ', res);
});
5. whilst(test, fn, callback) — 异步的 while 循环
先执行 test,若返回 true 则调用 fn;fn 触发错误或 test 返回 false 时,立即调用最终 callback。
var count = 0;
async.whilst(
function () { return count < 5; },
function (callback) {
count++;
setTimeout(function () {
console.log(count);
callback(null, count);
}, 200);
},
function (err, n) {
console.log(" res " +(count));
}
);
console.log("............");
6. queue(worker, number) — 任务队列
将任务添加到队列,通过 worker 并行处理。number 参数指定允许同时并行的 worker 数量,默认为 1(即依次执行)。队列还提供若干事件钩子:
/*
Creates a queue object with the specified concurrency.
Tasks added to the queue are processed in parallel
(up to the concurrency limit). If all workers are in progress,
the task is queued until one becomes available. Once a worker
completes a task, that task's callback is called.
*/
var q = async.queue(function (task, callback) {
console.log('start to process task: ' + task.name);
callback();
}, 3);
// assign a callback
q.drain = function () { // 所有任务执行完调用
console.log('all items have been processed');
};
// add some items to the queue
q.push({ name: '111111111' }, function (err) {
setTimeout(function () {
console.log('finished processing 11111111');
}, 1000);
});
q.push({ name: '2222222222' }, function (err) { //
setTimeout(function () {
console.log('finished processing 22222222222');
}, 2000);
});
// add some items to the queue (batch-wise) //添加多个任务
q.push([{ name: 'baz' }, { name: 'bay' }, { name: 'bax' }], function (err) {
console.log('finished processing item');
});
// add some items to the front of the queue
q.unshift({ name: 'bar111' }, function (err) { //添加任务到队列之前
console.log('finished processing bar111');
});
setTimeout(function () {
q.unshift({ name: '3333333333' }, function (err) {
console.log('finished processing 444444444444');
});
}, 100);
q.saturated = function () { //worker数量用完时调用
console. log('all workers to be used');
}
q.empty = function () {//最后一个任务提交worker时调用
console. log('no more tasks wating');
}
console.log("............");