From acfaa69a07870578966a7b7f60603c356c0f32b5 Mon Sep 17 00:00:00 2001 From: Patrick Mooney Date: Tue, 24 Jun 2014 11:04:23 -0500 Subject: [PATCH] Fix client RequestQueue - Reference documented queue parameters - Add freeze/thaw methods --- lib/client/client.js | 77 +++++++++++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 29 deletions(-) diff --git a/lib/client/client.js b/lib/client/client.js index 1e3ae8b..822a757 100644 --- a/lib/client/client.js +++ b/lib/client/client.js @@ -97,20 +97,26 @@ function RequestQueue(opts) { this.timeout = (opts.timeout > 0) ? opts.timeout : 0; this._queue = []; this._timer = null; + this._frozen = false; } /** * Insert request into queue. * */ -RequestQueue.prototype.queue = function queue(message, expect, emitter, cb) { - if (this._queue.length >= this.maxLength) { +RequestQueue.prototype.enqueue = function enqueue(msg, expect, emitter, cb) { + if (this._queue.length >= this.size || this._frozen) { return false; } - this._queue.push([message, expect, emitter, cb]); - if (this.maxAge > 0) { + var self = this; + this._queue.push([msg, expect, emitter, cb]); + if (this.timeout > 0) { if (this._timer !== null) { - this._timer = setTimeout(this.purge.bind(this), this.maxAge); + this._timer = setTimeout(function () { + // If queue times out, don't allow new entries until thawed + self.freeze(); + self.purge(); + }, this.timeout); } } return true; @@ -140,6 +146,21 @@ RequestQueue.prototype.purge = function purge() { }); }; +/** + * Freeze queue, refusing any new entries. + */ +RequestQueue.prototype.freeze = function freeze() { + this._frozen = true; +}; + +/** + * Thaw queue, allowing new entries again. + */ +RequestQueue.prototype.thaw = function thaw() { + this._frozen = false; +}; + + ///--- API /** @@ -183,11 +204,12 @@ function Client(options) { } this.queuing = (options.queuing !== undefined) ? options.queuing : true; - if (this.queuing) { - this._queue = new RequestQueue({ - size: parseInt((options.queueSize || 0), 10), - timeout: parseInt((options.queueTimeout || 0), 10) - }); + this.queue = new RequestQueue({ + size: parseInt((options.queueSize || 0), 10), + timeout: parseInt((options.queueTimeout || 0), 10) + }); + if (!this.queuing) { + this.queue.freeze(); } this.socket = null; @@ -677,14 +699,13 @@ Client.prototype.unbind = function unbind(callback) { */ Client.prototype.destroy = function destroy() { this.destroyed = true; - if (this._queue) { - // Purge any queued requests which are now meaningless - this._queue.flush(function (msg, expect, emitter, cb) { - if (typeof (cb) === 'function') { - cb(new Error('client destroyed')); - } - }); - } + this.queue.freeze(); + // Purge any queued requests which are now meaningless + this.queue.flush(function (msg, expect, emitter, cb) { + if (typeof (cb) === 'function') { + cb(new Error('client destroyed')); + } + }); this.unbind(); this.emit('destroy'); }; @@ -930,10 +951,8 @@ Client.prototype._connect = function _connect() { * Flush queued requests out to the socket. */ Client.prototype._flushQueue = function _flushQueue() { - if (this._queue) { - // Pull items we're about to process out of the queue. - this._queue.flush(this._send.bind(this)); - } + // Pull items we're about to process out of the queue. + this.queue.flush(this._send.bind(this)); }; /** @@ -1048,14 +1067,14 @@ Client.prototype._send = function _send(message, return this._sendSocket(message, expect, emitter, callback); } if (!this.socket || !this.connected) { - if (this._queue) { - if (!this._queue.queue(message, expect, emitter, callback)) { - callback(new Error('request queue full')); - } - // Initiate reconnect if needed - return this._connect(); + if (!this.queue.enqueue(message, expect, emitter, callback)) { + callback(new Error('unable to enqueue request')); } - return callback(new ConnectionError('no socket')); + // Initiate reconnect if needed + if (this.reconnect) { + this._connect(); + } + return false; } else { this._flushQueue(); return this._sendSocket(message, expect, emitter, callback);