牛骨文教育服务平台(让学习变的简单)
博文笔记

node.js任务队列简易版实现(基于Promise)

创建时间:2017-03-23 投稿人: 浏览次数:1878

任务队列适用在什么场景下?

  • 高并发
    日常情况中,如果并发数超过一定数量,会导致数据出错,系统奔溃,如果一台破电脑同时要执行10W个复杂同步或异步函数会怎样,同样是运行10W个函数如果用队列控制并发运行数,稳定性大大提升。

  • 并发任务时导致数据库错乱,不单单可以用锁来实现
    如果有并发任务要对表进行读取并修改的操作,并发的时候会发生什么情况,会同时读取并修改了数据,这肯定不是我们想要的。

  • 抢红包的实现,也是是用队列来一个一个读取并返回
    抢红包最好的实现方法就是用队列来实现,你们肯定会说不是可以用乐观锁,悲观锁,但是用这些锁的话,大家同时访问,只有一个人抢到了红包,其他人都报错误,这样肯定是不行的。

任务队列用法

创建Queue对象并设置并行数量
var q = new Queue(2); //并发数2
加入任务队列
q.set(fn1);//fn1为Promise函数
q.set(fn1).set(fn3);
加入并启动
q.set(fn1).set(fn2).set_run(fn5);//set_run()加入并启动
当前任务数
q.get();//输出任务数:n
启动
q.run();
暂停
q.pause();//暂停任务队列
恢复
q.rec();//恢复任务队列

demo 模拟异步函数代码

var q = new Queue(2);//创建队列对象
var ks = new Date().getTime();//记时
var fn1 = function () {//函数1
    var deferred = Q.defer();
    setTimeout(function () {//延迟500ms后执行
        deferred.resolve("数据1");
        console.log(new Date().getTime() - ks);
    }, 500);
    return deferred.promise;
};

var fn2 = function () {
    var deferred = Q.defer();
    setTimeout(function () {
        deferred.resolve("数据2");
        console.log(new Date().getTime() - ks);
    }, 500);
    return deferred.promise;
};

var fn3 = function () {
    var deferred = Q.defer();
    setTimeout(function () {
        deferred.resolve("数据3");
        console.log(new Date().getTime() - ks);
    }, 500);
    return deferred.promise;
};

q.set(fn1).set(fn2).set_run(fn3);

打印输出:

556
数据1
562
数据2
1063
数据3

队列代码实现

var Q = require("q");

var Queue = function (setcount) {
    this.list = [];//任务列表
    this.js = 0;//当前运行任务数
    if (setcount == 0 && typeof setcount != "number") this.count = 1;
    this.count = setcount;//最高并发数
    this.ps = false;//暂停
};
Queue.prototype = {
    clear: function () {//清空任务队列
        this.list.length = 0;
        return this;
    },
    pause: function () {//暂停任务队列
        this.ps = true;
    },
    rec: function () {//恢复任务队列
        this.ps = false;
        this.run();
    },
    set: function (fn) {//设置任务
        this.list.push(fn);
        return this;
    },
    set_run: function (fn) {//设置任务并启动
        this.list.push(fn);
        this.run();
        return this;
    },
    get: function () {//查询任务数
        return this.list.length;
    },
    run: function () {
        if (!this.ps) {
            //最高并发数-当前运行任务数=可以运行的任务数
            var i = this.count - this.js;
            var p;
            //保存可运行任务
            var k = [];
            //可以运行的任务数-任务数组长度<0的话
            //取可以运行的任务数 否则取任务数组长度
            i - this.list.length > 0 ? p = this.list.length : p = i;
            //循环写入可运行任务到数组K
            while (p) {
                k.push(this.list.shift() || function () {});
                p--;
            }
             (function (obj) {
                k.forEach(function (item) {
                    obj.js++;
                    item().then(function (msg) {
                        console.log(msg);
                        if (obj.get() && obj.js--) {
                            if (obj.count - q.js > 0 && obj.ps == false)
                                obj.run();
                        }
                    });
                });
            })(this)
        }
    }
};

- 待完善

  • 添加任务事件的监听

  • 内存溢出隐患

  • 分布式支持

声明:该文观点仅代表作者本人,牛骨文系教育信息发布平台,牛骨文仅提供信息存储空间服务。