Fix client RequestQueue

- Reference documented queue parameters
- Add freeze/thaw methods
This commit is contained in:
Patrick Mooney 2014-06-24 11:04:23 -05:00
parent 3b101a7b2f
commit acfaa69a07
1 changed files with 48 additions and 29 deletions

View File

@ -97,20 +97,26 @@ function RequestQueue(opts) {
this.timeout = (opts.timeout > 0) ? opts.timeout : 0; this.timeout = (opts.timeout > 0) ? opts.timeout : 0;
this._queue = []; this._queue = [];
this._timer = null; this._timer = null;
this._frozen = false;
} }
/** /**
* Insert request into queue. * Insert request into queue.
* *
*/ */
RequestQueue.prototype.queue = function queue(message, expect, emitter, cb) { RequestQueue.prototype.enqueue = function enqueue(msg, expect, emitter, cb) {
if (this._queue.length >= this.maxLength) { if (this._queue.length >= this.size || this._frozen) {
return false; return false;
} }
this._queue.push([message, expect, emitter, cb]); var self = this;
if (this.maxAge > 0) { this._queue.push([msg, expect, emitter, cb]);
if (this.timeout > 0) {
if (this._timer !== null) { if (this._timer !== null) {
this._timer = setTimeout(this.purge.bind(this), this.maxAge); this._timer = setTimeout(function () {
// If queue times out, don't allow new entries until thawed
self.freeze();
self.purge();
}, this.timeout);
} }
} }
return true; return true;
@ -140,6 +146,21 @@ RequestQueue.prototype.purge = function purge() {
}); });
}; };
/**
* Freeze queue, refusing any new entries.
*/
RequestQueue.prototype.freeze = function freeze() {
this._frozen = true;
};
/**
* Thaw queue, allowing new entries again.
*/
RequestQueue.prototype.thaw = function thaw() {
this._frozen = false;
};
///--- API ///--- API
/** /**
@ -183,11 +204,12 @@ function Client(options) {
} }
this.queuing = (options.queuing !== undefined) ? options.queuing : true; this.queuing = (options.queuing !== undefined) ? options.queuing : true;
if (this.queuing) { this.queue = new RequestQueue({
this._queue = new RequestQueue({ size: parseInt((options.queueSize || 0), 10),
size: parseInt((options.queueSize || 0), 10), timeout: parseInt((options.queueTimeout || 0), 10)
timeout: parseInt((options.queueTimeout || 0), 10) });
}); if (!this.queuing) {
this.queue.freeze();
} }
this.socket = null; this.socket = null;
@ -677,14 +699,13 @@ Client.prototype.unbind = function unbind(callback) {
*/ */
Client.prototype.destroy = function destroy() { Client.prototype.destroy = function destroy() {
this.destroyed = true; this.destroyed = true;
if (this._queue) { this.queue.freeze();
// Purge any queued requests which are now meaningless // Purge any queued requests which are now meaningless
this._queue.flush(function (msg, expect, emitter, cb) { this.queue.flush(function (msg, expect, emitter, cb) {
if (typeof (cb) === 'function') { if (typeof (cb) === 'function') {
cb(new Error('client destroyed')); cb(new Error('client destroyed'));
} }
}); });
}
this.unbind(); this.unbind();
this.emit('destroy'); this.emit('destroy');
}; };
@ -930,10 +951,8 @@ Client.prototype._connect = function _connect() {
* Flush queued requests out to the socket. * Flush queued requests out to the socket.
*/ */
Client.prototype._flushQueue = function _flushQueue() { Client.prototype._flushQueue = function _flushQueue() {
if (this._queue) { // Pull items we're about to process out of the queue.
// Pull items we're about to process out of the queue. this.queue.flush(this._send.bind(this));
this._queue.flush(this._send.bind(this));
}
}; };
/** /**
@ -1048,14 +1067,14 @@ Client.prototype._send = function _send(message,
return this._sendSocket(message, expect, emitter, callback); return this._sendSocket(message, expect, emitter, callback);
} }
if (!this.socket || !this.connected) { if (!this.socket || !this.connected) {
if (this._queue) { if (!this.queue.enqueue(message, expect, emitter, callback)) {
if (!this._queue.queue(message, expect, emitter, callback)) { callback(new Error('unable to enqueue request'));
callback(new Error('request queue full'));
}
// Initiate reconnect if needed
return this._connect();
} }
return callback(new ConnectionError('no socket')); // Initiate reconnect if needed
if (this.reconnect) {
this._connect();
}
return false;
} else { } else {
this._flushQueue(); this._flushQueue();
return this._sendSocket(message, expect, emitter, callback); return this._sendSocket(message, expect, emitter, callback);