diff --git a/lib/client/client.js b/lib/client/client.js index a249a82..d04861b 100644 --- a/lib/client/client.js +++ b/lib/client/client.js @@ -1,6 +1,8 @@ 'use strict' const requestQueueFactory = require('./request-queue') +const messageTrackerFactory = require('./message-tracker') +const { MAX_MSGID } = require('./constants') var EventEmitter = require('events').EventEmitter var net = require('net') @@ -49,7 +51,6 @@ var PresenceFilter = filters.PresenceFilter var ConnectionError = errors.ConnectionError var CMP_EXPECT = [errors.LDAP_COMPARE_TRUE, errors.LDAP_COMPARE_FALSE] -var MAX_MSGID = Math.pow(2, 31) - 1 // node 0.6 got rid of FDs, so make up a client id for logging var CLIENT_ID = 0 @@ -88,117 +89,6 @@ function ensureDN (input, strict) { } } -/** - * Track message callback by messageID. - */ -function MessageTracker (opts) { - assert.object(opts) - assert.string(opts.id) - assert.object(opts.parser) - - this.id = opts.id - this._msgid = 0 - this._messages = {} - this._abandoned = {} - this.parser = opts.parser - - var self = this - this.__defineGetter__('pending', function () { - return Object.keys(self._messages) - }) -} - -/** - * Record a messageID and callback. - */ -MessageTracker.prototype.track = function track (message, callback) { - var msgid = this._nextID() - message.messageID = msgid - this._messages[msgid] = callback - return msgid -} - -/** - * Fetch callback based on messageID. - */ -MessageTracker.prototype.fetch = function fetch (msgid) { - var msg = this._messages[msgid] - if (msg) { - this._purgeAbandoned(msgid) - return msg - } - // It's possible that the server has not received the abandon request yet. - // While waiting for evidence that the abandon has been received, incoming - // messages that match the abandoned msgid will be handled as normal. - msg = this._abandoned[msgid] - if (msg) { - return msg.cb - } - return null -} - -/** - * Cease tracking for a given messageID. - */ -MessageTracker.prototype.remove = function remove (msgid) { - if (this._messages[msgid]) { - delete this._messages[msgid] - } else if (this._abandoned[msgid]) { - delete this._abandoned[msgid] - } -} - -/** - * Mark a messageID as abandoned. - */ -MessageTracker.prototype.abandon = function abandonMsg (msgid) { - if (this._messages[msgid]) { - // Keep track of "when" the message was abandoned - this._abandoned[msgid] = { - age: this._msgid, - cb: this._messages[msgid] - } - delete this._messages[msgid] - } -} - -/** - * Purge old items from abandoned list. - */ -MessageTracker.prototype._purgeAbandoned = function _purgeAbandoned (msgid) { - var self = this - // Is (comp >= ref) according to sliding window - function geWindow (ref, comp) { - var max = ref + (MAX_MSGID / 2) - var min = ref - if (max >= MAX_MSGID) { - // Handle roll-over - max = max - MAX_MSGID - 1 - return ((comp <= max) || (comp >= min)) - } else { - return ((comp <= max) && (comp >= min)) - } - } - - Object.keys(this._abandoned).forEach(function (id) { - // Abandoned messageIDs can be forgotten if a received messageID is "newer" - if (geWindow(self._abandoned[id].age, msgid)) { - self._abandoned[id].cb(new errors.AbandonedError( - 'client request abandoned')) - delete self._abandoned[id] - } - }) -} - -/** - * Allocate the next messageID according to a sliding window. - */ -MessageTracker.prototype._nextID = function _nextID () { - if (++this._msgid >= MAX_MSGID) { this._msgid = 1 } - - return this._msgid -} - /// --- API /** @@ -947,7 +837,7 @@ Client.prototype.connect = function connect () { // Initialize socket events and LDAP parser. function initSocket () { - tracker = new MessageTracker({ + tracker = messageTrackerFactory({ id: self.url ? self.url.href : self.socketPath, parser: new Parser({ log: log }) }) @@ -1157,11 +1047,8 @@ Client.prototype._onClose = function _onClose (closeError) { this.emit('close', closeError) // On close we have to walk the outstanding messages and go invoke their // callback with an error. - tracker.pending.forEach(function (msgid) { - var cb = tracker.fetch(msgid) - tracker.remove(msgid) - - if (socket.unbindMessageID !== parseInt(msgid, 10)) { + tracker.purge(function (msgid, cb) { + if (socket.unbindMessageID !== msgid) { return cb(new ConnectionError(tracker.id + ' closed')) } else { // Unbinds will be communicated as a success since we're closed @@ -1201,7 +1088,7 @@ Client.prototype._updateIdle = function _updateIdle (override) { function isIdle (disable) { return ((disable !== true) && (self._socket && self.connected) && - (self._tracker.pending.length === 0)) + (self._tracker.pending === 0)) } if (isIdle(override)) { if (!this._idleTimer) { diff --git a/lib/client/constants.js b/lib/client/constants.js new file mode 100644 index 0000000..4a8ac9d --- /dev/null +++ b/lib/client/constants.js @@ -0,0 +1,7 @@ +'use strict' + +module.exports = { + // https://tools.ietf.org/html/rfc4511#section-4.1.1 + // Message identifiers are an integer between (0, maxint). + MAX_MSGID: Math.pow(2, 31) - 1 +} diff --git a/lib/client/message-tracker/ge-window.js b/lib/client/message-tracker/ge-window.js new file mode 100644 index 0000000..58bde78 --- /dev/null +++ b/lib/client/message-tracker/ge-window.js @@ -0,0 +1,25 @@ +'use strict' + +const { MAX_MSGID } = require('../constants') + +/** + * Compare a reference id with another id to determine "greater than or equal" + * between the two values according to a sliding window. + * + * @param {integer} ref + * @param {integer} comp + * + * @returns {boolean} `true` if the `comp` value is >= to the `ref` value + * within the computed window, otherwise `false`. + */ +module.exports = function geWindow (ref, comp) { + let max = ref + Math.floor(MAX_MSGID / 2) + const min = ref + if (max >= MAX_MSGID) { + // Handle roll-over + max = max - MAX_MSGID - 1 + return ((comp <= max) || (comp >= min)) + } else { + return ((comp <= max) && (comp >= min)) + } +} diff --git a/lib/client/message-tracker/id-generator.js b/lib/client/message-tracker/id-generator.js new file mode 100644 index 0000000..49423ea --- /dev/null +++ b/lib/client/message-tracker/id-generator.js @@ -0,0 +1,23 @@ +'use strict' + +const { MAX_MSGID } = require('../constants') + +/** + * Returns a function that generates message identifiers. According to RFC 4511 + * the identifers should be `(0, MAX_MSGID)`. The returned function handles + * this and wraps around when the maximum has been reached. + * + * @param {integer} [start=0] Starting number in the identifier sequence. + * + * @returns {function} This function accepts no parameters and returns an + * increasing sequence identifier each invocation until it reaches the maximum + * identifier. At this point the sequence starts over. + */ +module.exports = function idGeneratorFactory (start = 0) { + let currentID = start + return function nextID () { + const nextID = currentID + 1 + currentID = (nextID >= MAX_MSGID) ? 1 : nextID + return currentID + } +} diff --git a/lib/client/message-tracker/index.js b/lib/client/message-tracker/index.js new file mode 100644 index 0000000..c2f4d74 --- /dev/null +++ b/lib/client/message-tracker/index.js @@ -0,0 +1,151 @@ +'use strict' + +const idGeneratorFactory = require('./id-generator') +const purgeAbandoned = require('./purge-abandoned') + +/** + * Returns a message tracker object that keeps track of which message + * identifiers correspond to which message handlers. Also handles keeping track + * of abandoned messages. + * + * @param {object} options + * @param {string} options.id An identifier for the tracker. + * @param {object} options.parser An object that will be used to parse messages. + * + * @returns {MessageTracker} + */ +module.exports = function messageTrackerFactory (options) { + if (Object.prototype.toString.call(options) !== '[object Object]') { + throw Error('options object is required') + } + if (!options.id || typeof options.id !== 'string') { + throw Error('options.id string is required') + } + if (!options.parser || Object.prototype.toString.call(options.parser) !== '[object Object]') { + throw Error('options.parser object is required') + } + + let currentID = 0 + const nextID = idGeneratorFactory() + const messages = new Map() + const abandoned = new Map() + + /** + * @typedef {object} MessageTracker + * @property {string} id The identifier of the tracker as supplied via the options. + * @property {object} parser The parser object given by the the options. + */ + const tracker = { + id: options.id, + parser: options.parser + } + + /** + * Count of messages awaiting response. + * + * @alias pending + * @memberof! MessageTracker# + */ + Object.defineProperty(tracker, 'pending', { + get () { + return messages.size + } + }) + + /** + * Move a specific message to the abanded track. + * + * @param {integer} msgID The identifier for the message to move. + * + * @memberof MessageTracker + * @method abandon + */ + tracker.abandon = function abandonMessage (msgID) { + if (messages.has(msgID) === false) return false + abandoned.set(msgID, { + age: currentID, + cb: messages.get(msgID) + }) + return messages.delete(msgID) + } + + /** + * Retrieves the message handler for a message. Removes abandoned messages + * that have been given time to be resolved. + * + * @param {integer} msgID The identifier for the message to get the handler for. + * + * @memberof MessageTracker + * @method fetch + */ + tracker.fetch = function fetchMessage (msgID) { + const messageCB = messages.get(msgID) + if (messageCB) { + purgeAbandoned(msgID, abandoned) + return messageCB + } + + // We sent an abandon request but the server either wasn't able to process + // it or has not received it yet. Therefore, we received a response for the + // abandoned message. So we must return the abandoned message's callback + // to be processed normally. + const abandonedMsg = abandoned.get(msgID) + if (abandonedMsg) { + return abandonedMsg.cb + } + + return null + } + + /** + * Removes all message tracks, cleans up the abandoned track, and invokes + * a callback for each message purged. + * + * @param {function} cb A function with the signature `(msgID, handler)`. + * + * @memberof MessageTracker + * @method purge + */ + tracker.purge = function purgeMessages (cb) { + messages.forEach((val, key) => { + purgeAbandoned(key, abandoned) + tracker.remove(key) + cb(key, val) + }) + } + + /** + * Removes a message from all tracking. + * + * @param {integer} msgID The identifier for the message to remove from tracking. + * + * @memberof MessageTracker + * @method remove + */ + tracker.remove = function removeMessage (msgID) { + if (messages.delete(msgID) === false) { + abandoned.delete(msgID) + } + } + + /** + * Add a message handler to be tracked. + * + * @param {object} message The message object to be tracked. This object will + * have a new property added to it: `messageID`. + * @param {function} callback The handler for the message. + * + * @memberof MessageTracker + * @method track + */ + tracker.track = function trackMessage (message, callback) { + currentID = nextID() + // This side effect is not ideal but the client doesn't attach the tracker + // to itself until after the `.connect` method has fired. If this can be + // refactored later, then we can possibly get rid of this side effect. + message.messageID = currentID + messages.set(currentID, callback) + } + + return tracker +} diff --git a/lib/client/message-tracker/purge-abandoned.js b/lib/client/message-tracker/purge-abandoned.js new file mode 100644 index 0000000..8fc6cab --- /dev/null +++ b/lib/client/message-tracker/purge-abandoned.js @@ -0,0 +1,34 @@ +'use strict' + +const { AbandonedError } = require('../../errors') +const geWindow = require('./ge-window') + +/** + * Given a `msgID` and a set of `abandoned` messages, remove any abandoned + * messages that existed _prior_ to the specified `msgID`. For example, let's + * assume the server has sent 3 messages: + * + * 1. A search message. + * 2. An abandon message for the search message. + * 3. A new search message. + * + * When the response for message #1 comes in, if it does, it will be processed + * normally due to the specification. Message #2 will not receive a response, or + * if the server does send one since the spec sort of allows it, we won't do + * anything with it because we just discard that listener. Now the response + * for message #3 comes in. At this point, we will issue a purge of responses + * by passing in `msgID = 3`. This result is that we will remove the tracking + * for message #1. + * + * @param {integer} msgID An upper bound for the messages to be purged. + * @param {Map} abandoned A set of abandoned messages. Each message is an object + * `{ age: , cb: }` where `age` was the current message id when the + * abandon message was sent. + */ +module.exports = function purgeAbandoned (msgID, abandoned) { + abandoned.forEach((val, key) => { + if (geWindow(val.age, msgID) === false) return + val.cb(new AbandonedError('client request abandoned')) + abandoned.delete(key) + }) +} diff --git a/test/client.test.js b/test/client.test.js index d3b28a9..3b84611 100644 --- a/test/client.test.js +++ b/test/client.test.js @@ -937,7 +937,7 @@ tap.test('idle timeout', function (t) { }) res.on('end', function () { var late = setTimeout(function () { - t.error(false, 'too late') + t.fail('too late') }, 500) // It's ok to go idle now t.context.client.removeListener('idle', premature) diff --git a/test/lib/client/message-tracker/ge-window.test.js b/test/lib/client/message-tracker/ge-window.test.js new file mode 100644 index 0000000..10f0ef5 --- /dev/null +++ b/test/lib/client/message-tracker/ge-window.test.js @@ -0,0 +1,47 @@ +'use strict' + +const { test } = require('tap') +const { MAX_MSGID } = require('../../../../lib/client/constants') +const geWindow = require('../../../../lib/client/message-tracker/ge-window') + +test('comp > (ref in upper window) => true', async t => { + const ref = Math.floor(MAX_MSGID / 2) + 10 + const comp = ref + 10 + const result = geWindow(ref, comp) + t.is(result, true) +}) + +test('comp < (ref in upper window) => false', async t => { + const ref = Math.floor(MAX_MSGID / 2) + 10 + const comp = ref - 5 + const result = geWindow(ref, comp) + t.is(result, false) +}) + +test('comp > (ref in lower window) => true', async t => { + const ref = Math.floor(MAX_MSGID / 2) - 10 + const comp = ref + 20 + const result = geWindow(ref, comp) + t.is(result, true) +}) + +test('comp < (ref in lower window) => false', async t => { + const ref = Math.floor(MAX_MSGID / 2) - 10 + const comp = ref - 5 + const result = geWindow(ref, comp) + t.is(result, false) +}) + +test('(max === MAX_MSGID) && (comp > ref) => true', async t => { + const ref = MAX_MSGID - Math.floor(MAX_MSGID / 2) + const comp = ref + 1 + const result = geWindow(ref, comp) + t.is(result, true) +}) + +test('(max === MAX_MSGID) && (comp < ref) => false', async t => { + const ref = MAX_MSGID - Math.floor(MAX_MSGID / 2) + const comp = ref - 1 + const result = geWindow(ref, comp) + t.is(result, false) +}) diff --git a/test/lib/client/message-tracker/id-generator.test.js b/test/lib/client/message-tracker/id-generator.test.js new file mode 100644 index 0000000..a205088 --- /dev/null +++ b/test/lib/client/message-tracker/id-generator.test.js @@ -0,0 +1,21 @@ +'use strict' + +const { test } = require('tap') +const { MAX_MSGID } = require('../../../../lib/client/constants') +const idGeneratorFactory = require('../../../../lib/client/message-tracker/id-generator') + +test('starts at 0', async t => { + const nextID = idGeneratorFactory() + const currentID = nextID() + t.is(currentID, 1) +}) + +test('handles wrapping around', async t => { + const nextID = idGeneratorFactory(MAX_MSGID - 2) + + let currentID = nextID() + t.is(currentID, MAX_MSGID - 1) + + currentID = nextID() + t.is(currentID, 1) +}) diff --git a/test/lib/client/message-tracker/index.test.js b/test/lib/client/message-tracker/index.test.js new file mode 100644 index 0000000..8bfe886 --- /dev/null +++ b/test/lib/client/message-tracker/index.test.js @@ -0,0 +1,201 @@ +'use strict' + +const tap = require('tap') +const messageTrackerFactory = require('../../../../lib/client/message-tracker/') + +tap.test('options', t => { + t.test('requires an options object', async t => { + try { + messageTrackerFactory() + } catch (error) { + t.match(error, /options object is required/) + } + + try { + messageTrackerFactory([]) + } catch (error) { + t.match(error, /options object is required/) + } + + try { + messageTrackerFactory('') + } catch (error) { + t.match(error, /options object is required/) + } + + try { + messageTrackerFactory(42) + } catch (error) { + t.match(error, /options object is required/) + } + }) + + t.test('requires id to be a string', async t => { + try { + messageTrackerFactory({ id: {} }) + } catch (error) { + t.match(error, /options\.id string is required/) + } + + try { + messageTrackerFactory({ id: [] }) + } catch (error) { + t.match(error, /options\.id string is required/) + } + + try { + messageTrackerFactory({ id: 42 }) + } catch (error) { + t.match(error, /options\.id string is required/) + } + }) + + t.test('requires parser to be an object', async t => { + try { + messageTrackerFactory({ id: 'foo', parser: 'bar' }) + } catch (error) { + t.match(error, /options\.parser object is required/) + } + + try { + messageTrackerFactory({ id: 'foo', parser: 42 }) + } catch (error) { + t.match(error, /options\.parser object is required/) + } + + try { + messageTrackerFactory({ id: 'foo', parser: [] }) + } catch (error) { + t.match(error, /options\.parser object is required/) + } + }) + + t.end() +}) + +tap.test('.pending', t => { + t.test('returns 0 for no messages', async t => { + const tracker = messageTrackerFactory({ id: 'foo', parser: {} }) + t.is(tracker.pending, 0) + }) + + t.test('returns 1 for 1 message', async t => { + const tracker = messageTrackerFactory({ id: 'foo', parser: {} }) + tracker.track({}, () => {}) + t.is(tracker.pending, 1) + }) + + t.end() +}) + +tap.test('#abandon', t => { + t.test('returns false if message does not exist', async t => { + const tracker = messageTrackerFactory({ id: 'foo', parser: {} }) + const result = tracker.abandon(1) + t.is(result, false) + }) + + t.test('returns true if message is abandoned', async t => { + const tracker = messageTrackerFactory({ id: 'foo', parser: {} }) + tracker.track({}, {}) + const result = tracker.abandon(1) + t.is(result, true) + }) + + t.end() +}) + +tap.test('#fetch', t => { + t.test('returns handler for fetched message', async t => { + const tracker = messageTrackerFactory({ id: 'foo', parser: {} }) + tracker.track({}, handler) + const fetched = tracker.fetch(1) + t.is(fetched, handler) + + function handler () {} + }) + + t.test('returns handler for fetched abandoned message', async t => { + const tracker = messageTrackerFactory({ id: 'foo', parser: {} }) + tracker.track({}, handler) + tracker.track({ abandon: 'message' }, () => {}) + tracker.abandon(1) + const fetched = tracker.fetch(1) + t.is(fetched, handler) + + function handler () {} + }) + + t.test('returns null when message does not exist', async t => { + const tracker = messageTrackerFactory({ id: 'foo', parser: {} }) + const fetched = tracker.fetch(1) + t.is(fetched, null) + }) + + t.end() +}) + +tap.test('#purge', t => { + t.test('invokes cb for each tracked message', async t => { + t.plan(4) + let count = 0 + const tracker = messageTrackerFactory({ id: 'foo', parser: {} }) + tracker.track({}, handler1) + tracker.track({}, handler2) + tracker.purge(cb) + + function cb (msgID, handler) { + if (count === 0) { + t.is(msgID, 1) + t.is(handler, handler1) + count += 1 + return + } + t.is(msgID, 2) + t.is(handler, handler2) + } + + function handler1 () {} + function handler2 () {} + }) + + t.end() +}) + +tap.test('#remove', t => { + t.test('removes from the current track', async t => { + const tracker = messageTrackerFactory({ id: 'foo', parser: {} }) + tracker.track({}, () => {}) + tracker.remove(1) + t.is(tracker.pending, 0) + }) + + // Not a great test. It exercises the desired code path, but we probably + // should expose some insight into the abandoned track. + t.test('removes from the abandoned track', async t => { + const tracker = messageTrackerFactory({ id: 'foo', parser: {} }) + tracker.track({}, () => {}) + tracker.track({ abandon: 'message' }, () => {}) + tracker.abandon(1) + tracker.remove(1) + t.is(tracker.pending, 1) + }) + + t.end() +}) + +tap.test('#track', t => { + t.test('add messageID and tracks message', async t => { + const tracker = messageTrackerFactory({ id: 'foo', parser: {} }) + const msg = {} + tracker.track(msg, handler) + + t.deepEqual(msg, { messageID: 1 }) + const cb = tracker.fetch(1) + t.is(cb, handler) + + function handler () {} + }) + + t.end() +}) diff --git a/test/lib/client/message-tracker/purge-abandoned.test.js b/test/lib/client/message-tracker/purge-abandoned.test.js new file mode 100644 index 0000000..e5d16b3 --- /dev/null +++ b/test/lib/client/message-tracker/purge-abandoned.test.js @@ -0,0 +1,64 @@ +'use strict' + +const { test } = require('tap') +const { MAX_MSGID } = require('../../../../lib/client/constants') +const purgeAbandoned = require('../../../../lib/client/message-tracker/purge-abandoned') + +test('clears queue if only one message present', async t => { + t.plan(3) + const abandoned = new Map() + abandoned.set(1, { age: 2, cb }) + + purgeAbandoned(2, abandoned) + t.is(abandoned.size, 0) + + function cb (err) { + t.is(err.name, 'AbandonedError') + t.is(err.message, 'client request abandoned') + } +}) + +test('clears queue if multiple messages present', async t => { + t.plan(5) + const abandoned = new Map() + abandoned.set(1, { age: 2, cb }) + abandoned.set(2, { age: 3, cb }) + + purgeAbandoned(4, abandoned) + t.is(abandoned.size, 0) + + function cb (err) { + t.is(err.name, 'AbandonedError') + t.is(err.message, 'client request abandoned') + } +}) + +test('message id has wrappred around', async t => { + t.plan(3) + const abandoned = new Map() + abandoned.set(MAX_MSGID - 1, { age: MAX_MSGID, cb }) + + // The "abandon" message was sent with an id of "MAX_MSGID". So the message + // that is triggering the purge was the "first" message in the new sequence + // of message identifiers. + purgeAbandoned(1, abandoned) + t.is(abandoned.size, 0) + + function cb (err) { + t.is(err.name, 'AbandonedError') + t.is(err.message, 'client request abandoned') + } +}) + +test('does not clear if window not met', async t => { + t.plan(1) + const abandoned = new Map() + abandoned.set(1, { age: 2, cb }) + + purgeAbandoned(1, abandoned) + t.is(abandoned.size, 1) + + function cb () { + t.fail('should not be invoked') + } +})