From 2939fa1fadc5f940b0b320957daeff91e5aca8f1 Mon Sep 17 00:00:00 2001 From: Patrick Mooney Date: Wed, 9 Jul 2014 13:40:17 -0500 Subject: [PATCH] Improve client request state tracking - Handle abandoned requests properly - Re-enable client abandon test - Report client idleness correctly (even after abandons) Fix mcavage/node-ldapjs#211 --- lib/client/client.js | 180 ++++++++++++++++++++++++++++++++----------- lib/errors/index.js | 25 +++++- test/client.test.js | 16 ++-- 3 files changed, 168 insertions(+), 53 deletions(-) diff --git a/lib/client/client.js b/lib/client/client.js index ffb7b41..a8cccf6 100644 --- a/lib/client/client.js +++ b/lib/client/client.js @@ -141,7 +141,7 @@ RequestQueue.prototype.flush = function flush(cb) { */ RequestQueue.prototype.purge = function purge() { this.flush(function (msg, expect, emitter, cb) { - cb(new Error('RequestQueue timeout')); + cb(new errors.TimeoutError('request queue timeout')); }); }; @@ -160,6 +160,118 @@ RequestQueue.prototype.thaw = function thaw() { }; +/** + * Track message callback by messageID. + */ +function MessageTracker(opts) { + assert.object(opts); + assert.string(opts.id); + assert.object(opts.parser); + + this.id = opts.id; + this._msgid = 0; + this._messages = {}; + this._abandoned = {}; + this.parser = opts.parser; + + var self = this; + this.__defineGetter__('pending', function () { + return Object.keys(self._messages); + }); +} + +/** + * Record a messageID and callback. + */ +MessageTracker.prototype.track = function track(message, callback) { + var msgid = this._nextID(); + message.messageID = msgid; + this._messages[msgid] = callback; + return msgid; +}; + +/** + * Fetch callback based on messageID. + */ +MessageTracker.prototype.fetch = function fetch(msgid) { + var msg = this._messages[msgid]; + if (msg) { + this._purgeAbandoned(msgid); + return msg; + } + // It's possible that the server has not received the abandon request yet. + // While waiting for evidence that the abandon has been received, incoming + // messages that match the abandoned msgid will be handled as normal. + msg = this._abandoned[msgid]; + if (msg) { + return msg.cb; + } + return null; +}; + +/** + * Cease tracking for a given messageID. + */ +MessageTracker.prototype.remove = function remove(msgid) { + if (this._messages[msgid]) { + delete this._messages[msgid]; + } else if (this._abandoned[msgid]) { + delete this._abandoned[msgid]; + } +}; + +/** + * Mark a messageID as abandoned. + */ +MessageTracker.prototype.abandon = function abandonMsg(msgid) { + if (this._messages[msgid]) { + // Keep track of "when" the message was abandoned + this._abandoned[msgid] = { + age: this._msgid, + cb: this._messages[msgid] + }; + delete this._messages[msgid]; + } +}; + +/** + * Purge old items from abandoned list. + */ +MessageTracker.prototype._purgeAbandoned = function _purgeAbandoned(msgid) { + var self = this; + // Is (comp >= ref) according to sliding window + function geWindow(ref, comp) { + var max = ref + (MAX_MSGID/2); + var min = ref; + if (max >= MAX_MSGID) { + // Handle roll-over + max = max - MAX_MSGID - 1; + return ((comp <= max) || (comp >= min)); + } else { + return ((comp <= max) && (comp >= min)); + } + } + + Object.keys(this._abandoned).forEach(function (id) { + // Abandoned messageIDs can be forgotten if a received messageID is "newer" + if (geWindow(self._abandoned[id].age, msgid)) { + self._abandoned[id].cb(new errors.AbandonedError( + 'client request abandoned')); + delete self._abandoned[id]; + } + }); +}; + +/** + * Allocate the next messageID according to a sliding window. + */ +MessageTracker.prototype._nextID = function _nextID() { + if (++this._msgid >= MAX_MSGID) + this._msgid = 1; + + return this._msgid; +}; + ///--- API /** @@ -806,20 +918,10 @@ Client.prototype._connect = function _connect() { // Initialize socket events and LDAP parser. function initSocket() { - socket.ldap = { + socket.ldap = new MessageTracker({ id: self.url ? self.url.href : self.socketPath, - messageID: 0, - messages: {}, - getNextMessageID: function getNextMessageID() { - if (++socket.ldap.messageID >= MAX_MSGID) - socket.ldap.messageID = 1; - - return socket.ldap.messageID; - }, - parser: new Parser({ - log: log - }) - }; + parser: new Parser({log: log}) + }); // This won't be set on TLS. So. Very. Annoying. if (typeof (socket.setKeepAlive) !== 'function') { @@ -839,7 +941,7 @@ Client.prototype._connect = function _connect() { // The "router" socket.ldap.parser.on('message', function onMessage(message) { message.connection = socket; - var callback = socket.ldap.messages[message.messageID]; + var callback = socket.ldap.fetch(message.messageID); if (!callback) { log.error({message: message.json}, 'unsolicited message'); @@ -1020,29 +1122,20 @@ Client.prototype._onClose = function _onClose(had_err) { this.emit('close', had_err); // On close we have to walk the outstanding messages and go invoke their // callback with an error. - Object.keys(socket.ldap.messages).forEach(function (msgid) { - var err; + socket.ldap.pending.forEach(function (msgid) { + var cb = socket.ldap.fetch(msgid); + socket.ldap.remove(msgid); + if (socket.unbindMessageID !== parseInt(msgid, 10)) { - err = new ConnectionError(socket.ldap.id + ' closed'); + return cb(new ConnectionError(socket.ldap.id + ' closed')); } else { // Unbinds will be communicated as a success since we're closed - err = new UnbindResponse({ - messageID: msgid - }); - err.status = 'unbind'; + var unbind = new UnbindResponse({messageID: msgid}); + unbind.status = 'unbind'; + return cb(unbind); } - - if (typeof (socket.ldap.messages[msgid]) === 'function') { - var callback = socket.ldap.messages[msgid]; - delete socket.ldap.messages[msgid]; - return callback(err); - } else if (socket.ldap.messages[msgid]) { - if (err instanceof Error) - socket.ldap.messages[msgid].emit('error', err); - delete socket.ldap.messages[msgid]; - } - return false; }); + delete socket.ldap.parser; delete socket.ldap; @@ -1070,10 +1163,9 @@ Client.prototype._updateIdle = function _updateIdle(override) { // Client must be connected but not waiting on any request data var self = this; function isIdle(disable) { - // FIXME: this breaks with abandons return ((disable !== true) && (self.socket && self.connected) && - (Object.keys(self.socket.ldap.messages).length === 0)); + (self.socket.ldap.pending.length === 0)); } if (isIdle(override)) { if (!this._idleTimer) { @@ -1224,7 +1316,7 @@ Client.prototype._sendSocket = function _sendSocket(message, // page search continued, just return for now return undefined; } else { - delete conn.ldap.messages[message.messageID]; + conn.ldap.remove(message.messageID); // Potentially mark client as idle self._updateIdle(); @@ -1243,16 +1335,19 @@ Client.prototype._sendSocket = function _sendSocket(message, function onRequestTimeout() { self.emit('timeout', message); - if (conn.ldap.messages[message.messageID]) { - conn.ldap.messages[message.messageID](new LDAPResult({ - status: 80, // LDAP_OTHER - errorMessage: 'request timeout (client interrupt)' - })); + var cb = conn.ldap.fetch(message.messageID); + if (cb) { + //FIXME: the timed-out request should be abandoned + cb(new errors.TimeoutError('request timeout (client interrupt)')); } } // end function onRequestTimeout() function writeCallback() { if (expect === 'abandon') { + // Mark the messageID specified as abandoned + conn.ldap.abandon(message.abandonID); + // No need to track the abandon request itself + conn.ldap.remove(message.id); return callback(null); } else if (expect === 'unbind') { conn.unbindMessageID = message.id; @@ -1267,8 +1362,7 @@ Client.prototype._sendSocket = function _sendSocket(message, } // end writeCallback() // Start actually doing something... - message.messageID = conn.ldap.getNextMessageID(); - conn.ldap.messages[message.messageID] = messageCallback; + conn.ldap.track(message, messageCallback); // Mark client as active this._updateIdle(true); diff --git a/lib/errors/index.js b/lib/errors/index.js index 6352036..de5cb39 100644 --- a/lib/errors/index.js +++ b/lib/errors/index.js @@ -100,14 +100,37 @@ module.exports.getMessage = function (code) { }; +///--- Custom application errors function ConnectionError(message) { LDAPError.call(this, 'ConnectionError', - 0x80, // LDAP_OTHER, + CODES.LDAP_OTHER, message, null, ConnectionError); } util.inherits(ConnectionError, LDAPError); module.exports.ConnectionError = ConnectionError; + +function AbandonedError(message) { + LDAPError.call(this, + 'AbandonedError', + CODES.LDAP_OTHER, + message, + null, + AbandonedError); +} +util.inherits(AbandonedError, LDAPError); +module.exports.AbandonedError = AbandonedError; + +function TimeoutError(message) { + LDAPError.call(this, + 'TimeoutError', + CODES.LDAP_OTHER, + message, + null, + TimeoutError); +} +util.inherits(TimeoutError, LDAPError); +module.exports.TimeoutError = TimeoutError; diff --git a/test/client.test.js b/test/client.test.js index 451ae4c..3d42fd0 100644 --- a/test/client.test.js +++ b/test/client.test.js @@ -619,7 +619,6 @@ test('GH-24 attribute selection of *', function (t) { test('idle timeout', function (t) { - // FIXME: this must be run before abandon client.idleTimeout = 250; function premature() { t.ifError(true); @@ -856,14 +855,13 @@ test('no auto-reconnect on unbind', function (t) { }); -// Abandon handling is improper on the server and not robust in the client -// FIXME: Disable test until one/both are fixed. -//test('abandon (GH-27)', function (t) { -// client.abandon(401876543, function (err) { -// t.ifError(err); -// t.end(); -// }); -//}); +test('abandon (GH-27)', function (t) { + // FIXME: test abandoning a real request + client.abandon(401876543, function (err) { + t.ifError(err); + t.end(); + }); +}); test('search timeout (GH-51)', function (t) {