Merge pull request #550 from ldapjs/msg-tracker
Refactor MessageTracker into testable module
This commit is contained in:
commit
f499b977d9
|
@ -1,6 +1,8 @@
|
||||||
'use strict'
|
'use strict'
|
||||||
|
|
||||||
const requestQueueFactory = require('./request-queue')
|
const requestQueueFactory = require('./request-queue')
|
||||||
|
const messageTrackerFactory = require('./message-tracker')
|
||||||
|
const { MAX_MSGID } = require('./constants')
|
||||||
|
|
||||||
var EventEmitter = require('events').EventEmitter
|
var EventEmitter = require('events').EventEmitter
|
||||||
var net = require('net')
|
var net = require('net')
|
||||||
|
@ -49,7 +51,6 @@ var PresenceFilter = filters.PresenceFilter
|
||||||
var ConnectionError = errors.ConnectionError
|
var ConnectionError = errors.ConnectionError
|
||||||
|
|
||||||
var CMP_EXPECT = [errors.LDAP_COMPARE_TRUE, errors.LDAP_COMPARE_FALSE]
|
var CMP_EXPECT = [errors.LDAP_COMPARE_TRUE, errors.LDAP_COMPARE_FALSE]
|
||||||
var MAX_MSGID = Math.pow(2, 31) - 1
|
|
||||||
|
|
||||||
// node 0.6 got rid of FDs, so make up a client id for logging
|
// node 0.6 got rid of FDs, so make up a client id for logging
|
||||||
var CLIENT_ID = 0
|
var CLIENT_ID = 0
|
||||||
|
@ -88,117 +89,6 @@ function ensureDN (input, strict) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Track message callback by messageID.
|
|
||||||
*/
|
|
||||||
function MessageTracker (opts) {
|
|
||||||
assert.object(opts)
|
|
||||||
assert.string(opts.id)
|
|
||||||
assert.object(opts.parser)
|
|
||||||
|
|
||||||
this.id = opts.id
|
|
||||||
this._msgid = 0
|
|
||||||
this._messages = {}
|
|
||||||
this._abandoned = {}
|
|
||||||
this.parser = opts.parser
|
|
||||||
|
|
||||||
var self = this
|
|
||||||
this.__defineGetter__('pending', function () {
|
|
||||||
return Object.keys(self._messages)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Record a messageID and callback.
|
|
||||||
*/
|
|
||||||
MessageTracker.prototype.track = function track (message, callback) {
|
|
||||||
var msgid = this._nextID()
|
|
||||||
message.messageID = msgid
|
|
||||||
this._messages[msgid] = callback
|
|
||||||
return msgid
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Fetch callback based on messageID.
|
|
||||||
*/
|
|
||||||
MessageTracker.prototype.fetch = function fetch (msgid) {
|
|
||||||
var msg = this._messages[msgid]
|
|
||||||
if (msg) {
|
|
||||||
this._purgeAbandoned(msgid)
|
|
||||||
return msg
|
|
||||||
}
|
|
||||||
// It's possible that the server has not received the abandon request yet.
|
|
||||||
// While waiting for evidence that the abandon has been received, incoming
|
|
||||||
// messages that match the abandoned msgid will be handled as normal.
|
|
||||||
msg = this._abandoned[msgid]
|
|
||||||
if (msg) {
|
|
||||||
return msg.cb
|
|
||||||
}
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Cease tracking for a given messageID.
|
|
||||||
*/
|
|
||||||
MessageTracker.prototype.remove = function remove (msgid) {
|
|
||||||
if (this._messages[msgid]) {
|
|
||||||
delete this._messages[msgid]
|
|
||||||
} else if (this._abandoned[msgid]) {
|
|
||||||
delete this._abandoned[msgid]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Mark a messageID as abandoned.
|
|
||||||
*/
|
|
||||||
MessageTracker.prototype.abandon = function abandonMsg (msgid) {
|
|
||||||
if (this._messages[msgid]) {
|
|
||||||
// Keep track of "when" the message was abandoned
|
|
||||||
this._abandoned[msgid] = {
|
|
||||||
age: this._msgid,
|
|
||||||
cb: this._messages[msgid]
|
|
||||||
}
|
|
||||||
delete this._messages[msgid]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Purge old items from abandoned list.
|
|
||||||
*/
|
|
||||||
MessageTracker.prototype._purgeAbandoned = function _purgeAbandoned (msgid) {
|
|
||||||
var self = this
|
|
||||||
// Is (comp >= ref) according to sliding window
|
|
||||||
function geWindow (ref, comp) {
|
|
||||||
var max = ref + (MAX_MSGID / 2)
|
|
||||||
var min = ref
|
|
||||||
if (max >= MAX_MSGID) {
|
|
||||||
// Handle roll-over
|
|
||||||
max = max - MAX_MSGID - 1
|
|
||||||
return ((comp <= max) || (comp >= min))
|
|
||||||
} else {
|
|
||||||
return ((comp <= max) && (comp >= min))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Object.keys(this._abandoned).forEach(function (id) {
|
|
||||||
// Abandoned messageIDs can be forgotten if a received messageID is "newer"
|
|
||||||
if (geWindow(self._abandoned[id].age, msgid)) {
|
|
||||||
self._abandoned[id].cb(new errors.AbandonedError(
|
|
||||||
'client request abandoned'))
|
|
||||||
delete self._abandoned[id]
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Allocate the next messageID according to a sliding window.
|
|
||||||
*/
|
|
||||||
MessageTracker.prototype._nextID = function _nextID () {
|
|
||||||
if (++this._msgid >= MAX_MSGID) { this._msgid = 1 }
|
|
||||||
|
|
||||||
return this._msgid
|
|
||||||
}
|
|
||||||
|
|
||||||
/// --- API
|
/// --- API
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -947,7 +837,7 @@ Client.prototype.connect = function connect () {
|
||||||
|
|
||||||
// Initialize socket events and LDAP parser.
|
// Initialize socket events and LDAP parser.
|
||||||
function initSocket () {
|
function initSocket () {
|
||||||
tracker = new MessageTracker({
|
tracker = messageTrackerFactory({
|
||||||
id: self.url ? self.url.href : self.socketPath,
|
id: self.url ? self.url.href : self.socketPath,
|
||||||
parser: new Parser({ log: log })
|
parser: new Parser({ log: log })
|
||||||
})
|
})
|
||||||
|
@ -1157,11 +1047,8 @@ Client.prototype._onClose = function _onClose (closeError) {
|
||||||
this.emit('close', closeError)
|
this.emit('close', closeError)
|
||||||
// On close we have to walk the outstanding messages and go invoke their
|
// On close we have to walk the outstanding messages and go invoke their
|
||||||
// callback with an error.
|
// callback with an error.
|
||||||
tracker.pending.forEach(function (msgid) {
|
tracker.purge(function (msgid, cb) {
|
||||||
var cb = tracker.fetch(msgid)
|
if (socket.unbindMessageID !== msgid) {
|
||||||
tracker.remove(msgid)
|
|
||||||
|
|
||||||
if (socket.unbindMessageID !== parseInt(msgid, 10)) {
|
|
||||||
return cb(new ConnectionError(tracker.id + ' closed'))
|
return cb(new ConnectionError(tracker.id + ' closed'))
|
||||||
} else {
|
} else {
|
||||||
// Unbinds will be communicated as a success since we're closed
|
// Unbinds will be communicated as a success since we're closed
|
||||||
|
@ -1201,7 +1088,7 @@ Client.prototype._updateIdle = function _updateIdle (override) {
|
||||||
function isIdle (disable) {
|
function isIdle (disable) {
|
||||||
return ((disable !== true) &&
|
return ((disable !== true) &&
|
||||||
(self._socket && self.connected) &&
|
(self._socket && self.connected) &&
|
||||||
(self._tracker.pending.length === 0))
|
(self._tracker.pending === 0))
|
||||||
}
|
}
|
||||||
if (isIdle(override)) {
|
if (isIdle(override)) {
|
||||||
if (!this._idleTimer) {
|
if (!this._idleTimer) {
|
||||||
|
|
|
@ -0,0 +1,7 @@
|
||||||
|
'use strict'
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
// https://tools.ietf.org/html/rfc4511#section-4.1.1
|
||||||
|
// Message identifiers are an integer between (0, maxint).
|
||||||
|
MAX_MSGID: Math.pow(2, 31) - 1
|
||||||
|
}
|
|
@ -0,0 +1,25 @@
|
||||||
|
'use strict'
|
||||||
|
|
||||||
|
const { MAX_MSGID } = require('../constants')
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compare a reference id with another id to determine "greater than or equal"
|
||||||
|
* between the two values according to a sliding window.
|
||||||
|
*
|
||||||
|
* @param {integer} ref
|
||||||
|
* @param {integer} comp
|
||||||
|
*
|
||||||
|
* @returns {boolean} `true` if the `comp` value is >= to the `ref` value
|
||||||
|
* within the computed window, otherwise `false`.
|
||||||
|
*/
|
||||||
|
module.exports = function geWindow (ref, comp) {
|
||||||
|
let max = ref + Math.floor(MAX_MSGID / 2)
|
||||||
|
const min = ref
|
||||||
|
if (max >= MAX_MSGID) {
|
||||||
|
// Handle roll-over
|
||||||
|
max = max - MAX_MSGID - 1
|
||||||
|
return ((comp <= max) || (comp >= min))
|
||||||
|
} else {
|
||||||
|
return ((comp <= max) && (comp >= min))
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,23 @@
|
||||||
|
'use strict'
|
||||||
|
|
||||||
|
const { MAX_MSGID } = require('../constants')
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a function that generates message identifiers. According to RFC 4511
|
||||||
|
* the identifers should be `(0, MAX_MSGID)`. The returned function handles
|
||||||
|
* this and wraps around when the maximum has been reached.
|
||||||
|
*
|
||||||
|
* @param {integer} [start=0] Starting number in the identifier sequence.
|
||||||
|
*
|
||||||
|
* @returns {function} This function accepts no parameters and returns an
|
||||||
|
* increasing sequence identifier each invocation until it reaches the maximum
|
||||||
|
* identifier. At this point the sequence starts over.
|
||||||
|
*/
|
||||||
|
module.exports = function idGeneratorFactory (start = 0) {
|
||||||
|
let currentID = start
|
||||||
|
return function nextID () {
|
||||||
|
const nextID = currentID + 1
|
||||||
|
currentID = (nextID >= MAX_MSGID) ? 1 : nextID
|
||||||
|
return currentID
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,151 @@
|
||||||
|
'use strict'
|
||||||
|
|
||||||
|
const idGeneratorFactory = require('./id-generator')
|
||||||
|
const purgeAbandoned = require('./purge-abandoned')
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a message tracker object that keeps track of which message
|
||||||
|
* identifiers correspond to which message handlers. Also handles keeping track
|
||||||
|
* of abandoned messages.
|
||||||
|
*
|
||||||
|
* @param {object} options
|
||||||
|
* @param {string} options.id An identifier for the tracker.
|
||||||
|
* @param {object} options.parser An object that will be used to parse messages.
|
||||||
|
*
|
||||||
|
* @returns {MessageTracker}
|
||||||
|
*/
|
||||||
|
module.exports = function messageTrackerFactory (options) {
|
||||||
|
if (Object.prototype.toString.call(options) !== '[object Object]') {
|
||||||
|
throw Error('options object is required')
|
||||||
|
}
|
||||||
|
if (!options.id || typeof options.id !== 'string') {
|
||||||
|
throw Error('options.id string is required')
|
||||||
|
}
|
||||||
|
if (!options.parser || Object.prototype.toString.call(options.parser) !== '[object Object]') {
|
||||||
|
throw Error('options.parser object is required')
|
||||||
|
}
|
||||||
|
|
||||||
|
let currentID = 0
|
||||||
|
const nextID = idGeneratorFactory()
|
||||||
|
const messages = new Map()
|
||||||
|
const abandoned = new Map()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @typedef {object} MessageTracker
|
||||||
|
* @property {string} id The identifier of the tracker as supplied via the options.
|
||||||
|
* @property {object} parser The parser object given by the the options.
|
||||||
|
*/
|
||||||
|
const tracker = {
|
||||||
|
id: options.id,
|
||||||
|
parser: options.parser
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Count of messages awaiting response.
|
||||||
|
*
|
||||||
|
* @alias pending
|
||||||
|
* @memberof! MessageTracker#
|
||||||
|
*/
|
||||||
|
Object.defineProperty(tracker, 'pending', {
|
||||||
|
get () {
|
||||||
|
return messages.size
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Move a specific message to the abanded track.
|
||||||
|
*
|
||||||
|
* @param {integer} msgID The identifier for the message to move.
|
||||||
|
*
|
||||||
|
* @memberof MessageTracker
|
||||||
|
* @method abandon
|
||||||
|
*/
|
||||||
|
tracker.abandon = function abandonMessage (msgID) {
|
||||||
|
if (messages.has(msgID) === false) return false
|
||||||
|
abandoned.set(msgID, {
|
||||||
|
age: currentID,
|
||||||
|
cb: messages.get(msgID)
|
||||||
|
})
|
||||||
|
return messages.delete(msgID)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieves the message handler for a message. Removes abandoned messages
|
||||||
|
* that have been given time to be resolved.
|
||||||
|
*
|
||||||
|
* @param {integer} msgID The identifier for the message to get the handler for.
|
||||||
|
*
|
||||||
|
* @memberof MessageTracker
|
||||||
|
* @method fetch
|
||||||
|
*/
|
||||||
|
tracker.fetch = function fetchMessage (msgID) {
|
||||||
|
const messageCB = messages.get(msgID)
|
||||||
|
if (messageCB) {
|
||||||
|
purgeAbandoned(msgID, abandoned)
|
||||||
|
return messageCB
|
||||||
|
}
|
||||||
|
|
||||||
|
// We sent an abandon request but the server either wasn't able to process
|
||||||
|
// it or has not received it yet. Therefore, we received a response for the
|
||||||
|
// abandoned message. So we must return the abandoned message's callback
|
||||||
|
// to be processed normally.
|
||||||
|
const abandonedMsg = abandoned.get(msgID)
|
||||||
|
if (abandonedMsg) {
|
||||||
|
return abandonedMsg.cb
|
||||||
|
}
|
||||||
|
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes all message tracks, cleans up the abandoned track, and invokes
|
||||||
|
* a callback for each message purged.
|
||||||
|
*
|
||||||
|
* @param {function} cb A function with the signature `(msgID, handler)`.
|
||||||
|
*
|
||||||
|
* @memberof MessageTracker
|
||||||
|
* @method purge
|
||||||
|
*/
|
||||||
|
tracker.purge = function purgeMessages (cb) {
|
||||||
|
messages.forEach((val, key) => {
|
||||||
|
purgeAbandoned(key, abandoned)
|
||||||
|
tracker.remove(key)
|
||||||
|
cb(key, val)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes a message from all tracking.
|
||||||
|
*
|
||||||
|
* @param {integer} msgID The identifier for the message to remove from tracking.
|
||||||
|
*
|
||||||
|
* @memberof MessageTracker
|
||||||
|
* @method remove
|
||||||
|
*/
|
||||||
|
tracker.remove = function removeMessage (msgID) {
|
||||||
|
if (messages.delete(msgID) === false) {
|
||||||
|
abandoned.delete(msgID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a message handler to be tracked.
|
||||||
|
*
|
||||||
|
* @param {object} message The message object to be tracked. This object will
|
||||||
|
* have a new property added to it: `messageID`.
|
||||||
|
* @param {function} callback The handler for the message.
|
||||||
|
*
|
||||||
|
* @memberof MessageTracker
|
||||||
|
* @method track
|
||||||
|
*/
|
||||||
|
tracker.track = function trackMessage (message, callback) {
|
||||||
|
currentID = nextID()
|
||||||
|
// This side effect is not ideal but the client doesn't attach the tracker
|
||||||
|
// to itself until after the `.connect` method has fired. If this can be
|
||||||
|
// refactored later, then we can possibly get rid of this side effect.
|
||||||
|
message.messageID = currentID
|
||||||
|
messages.set(currentID, callback)
|
||||||
|
}
|
||||||
|
|
||||||
|
return tracker
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
'use strict'
|
||||||
|
|
||||||
|
const { AbandonedError } = require('../../errors')
|
||||||
|
const geWindow = require('./ge-window')
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a `msgID` and a set of `abandoned` messages, remove any abandoned
|
||||||
|
* messages that existed _prior_ to the specified `msgID`. For example, let's
|
||||||
|
* assume the server has sent 3 messages:
|
||||||
|
*
|
||||||
|
* 1. A search message.
|
||||||
|
* 2. An abandon message for the search message.
|
||||||
|
* 3. A new search message.
|
||||||
|
*
|
||||||
|
* When the response for message #1 comes in, if it does, it will be processed
|
||||||
|
* normally due to the specification. Message #2 will not receive a response, or
|
||||||
|
* if the server does send one since the spec sort of allows it, we won't do
|
||||||
|
* anything with it because we just discard that listener. Now the response
|
||||||
|
* for message #3 comes in. At this point, we will issue a purge of responses
|
||||||
|
* by passing in `msgID = 3`. This result is that we will remove the tracking
|
||||||
|
* for message #1.
|
||||||
|
*
|
||||||
|
* @param {integer} msgID An upper bound for the messages to be purged.
|
||||||
|
* @param {Map} abandoned A set of abandoned messages. Each message is an object
|
||||||
|
* `{ age: <id>, cb: <func> }` where `age` was the current message id when the
|
||||||
|
* abandon message was sent.
|
||||||
|
*/
|
||||||
|
module.exports = function purgeAbandoned (msgID, abandoned) {
|
||||||
|
abandoned.forEach((val, key) => {
|
||||||
|
if (geWindow(val.age, msgID) === false) return
|
||||||
|
val.cb(new AbandonedError('client request abandoned'))
|
||||||
|
abandoned.delete(key)
|
||||||
|
})
|
||||||
|
}
|
|
@ -937,7 +937,7 @@ tap.test('idle timeout', function (t) {
|
||||||
})
|
})
|
||||||
res.on('end', function () {
|
res.on('end', function () {
|
||||||
var late = setTimeout(function () {
|
var late = setTimeout(function () {
|
||||||
t.error(false, 'too late')
|
t.fail('too late')
|
||||||
}, 500)
|
}, 500)
|
||||||
// It's ok to go idle now
|
// It's ok to go idle now
|
||||||
t.context.client.removeListener('idle', premature)
|
t.context.client.removeListener('idle', premature)
|
||||||
|
|
|
@ -0,0 +1,47 @@
|
||||||
|
'use strict'
|
||||||
|
|
||||||
|
const { test } = require('tap')
|
||||||
|
const { MAX_MSGID } = require('../../../../lib/client/constants')
|
||||||
|
const geWindow = require('../../../../lib/client/message-tracker/ge-window')
|
||||||
|
|
||||||
|
test('comp > (ref in upper window) => true', async t => {
|
||||||
|
const ref = Math.floor(MAX_MSGID / 2) + 10
|
||||||
|
const comp = ref + 10
|
||||||
|
const result = geWindow(ref, comp)
|
||||||
|
t.is(result, true)
|
||||||
|
})
|
||||||
|
|
||||||
|
test('comp < (ref in upper window) => false', async t => {
|
||||||
|
const ref = Math.floor(MAX_MSGID / 2) + 10
|
||||||
|
const comp = ref - 5
|
||||||
|
const result = geWindow(ref, comp)
|
||||||
|
t.is(result, false)
|
||||||
|
})
|
||||||
|
|
||||||
|
test('comp > (ref in lower window) => true', async t => {
|
||||||
|
const ref = Math.floor(MAX_MSGID / 2) - 10
|
||||||
|
const comp = ref + 20
|
||||||
|
const result = geWindow(ref, comp)
|
||||||
|
t.is(result, true)
|
||||||
|
})
|
||||||
|
|
||||||
|
test('comp < (ref in lower window) => false', async t => {
|
||||||
|
const ref = Math.floor(MAX_MSGID / 2) - 10
|
||||||
|
const comp = ref - 5
|
||||||
|
const result = geWindow(ref, comp)
|
||||||
|
t.is(result, false)
|
||||||
|
})
|
||||||
|
|
||||||
|
test('(max === MAX_MSGID) && (comp > ref) => true', async t => {
|
||||||
|
const ref = MAX_MSGID - Math.floor(MAX_MSGID / 2)
|
||||||
|
const comp = ref + 1
|
||||||
|
const result = geWindow(ref, comp)
|
||||||
|
t.is(result, true)
|
||||||
|
})
|
||||||
|
|
||||||
|
test('(max === MAX_MSGID) && (comp < ref) => false', async t => {
|
||||||
|
const ref = MAX_MSGID - Math.floor(MAX_MSGID / 2)
|
||||||
|
const comp = ref - 1
|
||||||
|
const result = geWindow(ref, comp)
|
||||||
|
t.is(result, false)
|
||||||
|
})
|
|
@ -0,0 +1,21 @@
|
||||||
|
'use strict'
|
||||||
|
|
||||||
|
const { test } = require('tap')
|
||||||
|
const { MAX_MSGID } = require('../../../../lib/client/constants')
|
||||||
|
const idGeneratorFactory = require('../../../../lib/client/message-tracker/id-generator')
|
||||||
|
|
||||||
|
test('starts at 0', async t => {
|
||||||
|
const nextID = idGeneratorFactory()
|
||||||
|
const currentID = nextID()
|
||||||
|
t.is(currentID, 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
test('handles wrapping around', async t => {
|
||||||
|
const nextID = idGeneratorFactory(MAX_MSGID - 2)
|
||||||
|
|
||||||
|
let currentID = nextID()
|
||||||
|
t.is(currentID, MAX_MSGID - 1)
|
||||||
|
|
||||||
|
currentID = nextID()
|
||||||
|
t.is(currentID, 1)
|
||||||
|
})
|
|
@ -0,0 +1,201 @@
|
||||||
|
'use strict'
|
||||||
|
|
||||||
|
const tap = require('tap')
|
||||||
|
const messageTrackerFactory = require('../../../../lib/client/message-tracker/')
|
||||||
|
|
||||||
|
tap.test('options', t => {
|
||||||
|
t.test('requires an options object', async t => {
|
||||||
|
try {
|
||||||
|
messageTrackerFactory()
|
||||||
|
} catch (error) {
|
||||||
|
t.match(error, /options object is required/)
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
messageTrackerFactory([])
|
||||||
|
} catch (error) {
|
||||||
|
t.match(error, /options object is required/)
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
messageTrackerFactory('')
|
||||||
|
} catch (error) {
|
||||||
|
t.match(error, /options object is required/)
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
messageTrackerFactory(42)
|
||||||
|
} catch (error) {
|
||||||
|
t.match(error, /options object is required/)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('requires id to be a string', async t => {
|
||||||
|
try {
|
||||||
|
messageTrackerFactory({ id: {} })
|
||||||
|
} catch (error) {
|
||||||
|
t.match(error, /options\.id string is required/)
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
messageTrackerFactory({ id: [] })
|
||||||
|
} catch (error) {
|
||||||
|
t.match(error, /options\.id string is required/)
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
messageTrackerFactory({ id: 42 })
|
||||||
|
} catch (error) {
|
||||||
|
t.match(error, /options\.id string is required/)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('requires parser to be an object', async t => {
|
||||||
|
try {
|
||||||
|
messageTrackerFactory({ id: 'foo', parser: 'bar' })
|
||||||
|
} catch (error) {
|
||||||
|
t.match(error, /options\.parser object is required/)
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
messageTrackerFactory({ id: 'foo', parser: 42 })
|
||||||
|
} catch (error) {
|
||||||
|
t.match(error, /options\.parser object is required/)
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
messageTrackerFactory({ id: 'foo', parser: [] })
|
||||||
|
} catch (error) {
|
||||||
|
t.match(error, /options\.parser object is required/)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
tap.test('.pending', t => {
|
||||||
|
t.test('returns 0 for no messages', async t => {
|
||||||
|
const tracker = messageTrackerFactory({ id: 'foo', parser: {} })
|
||||||
|
t.is(tracker.pending, 0)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('returns 1 for 1 message', async t => {
|
||||||
|
const tracker = messageTrackerFactory({ id: 'foo', parser: {} })
|
||||||
|
tracker.track({}, () => {})
|
||||||
|
t.is(tracker.pending, 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
tap.test('#abandon', t => {
|
||||||
|
t.test('returns false if message does not exist', async t => {
|
||||||
|
const tracker = messageTrackerFactory({ id: 'foo', parser: {} })
|
||||||
|
const result = tracker.abandon(1)
|
||||||
|
t.is(result, false)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('returns true if message is abandoned', async t => {
|
||||||
|
const tracker = messageTrackerFactory({ id: 'foo', parser: {} })
|
||||||
|
tracker.track({}, {})
|
||||||
|
const result = tracker.abandon(1)
|
||||||
|
t.is(result, true)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
tap.test('#fetch', t => {
|
||||||
|
t.test('returns handler for fetched message', async t => {
|
||||||
|
const tracker = messageTrackerFactory({ id: 'foo', parser: {} })
|
||||||
|
tracker.track({}, handler)
|
||||||
|
const fetched = tracker.fetch(1)
|
||||||
|
t.is(fetched, handler)
|
||||||
|
|
||||||
|
function handler () {}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('returns handler for fetched abandoned message', async t => {
|
||||||
|
const tracker = messageTrackerFactory({ id: 'foo', parser: {} })
|
||||||
|
tracker.track({}, handler)
|
||||||
|
tracker.track({ abandon: 'message' }, () => {})
|
||||||
|
tracker.abandon(1)
|
||||||
|
const fetched = tracker.fetch(1)
|
||||||
|
t.is(fetched, handler)
|
||||||
|
|
||||||
|
function handler () {}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('returns null when message does not exist', async t => {
|
||||||
|
const tracker = messageTrackerFactory({ id: 'foo', parser: {} })
|
||||||
|
const fetched = tracker.fetch(1)
|
||||||
|
t.is(fetched, null)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
tap.test('#purge', t => {
|
||||||
|
t.test('invokes cb for each tracked message', async t => {
|
||||||
|
t.plan(4)
|
||||||
|
let count = 0
|
||||||
|
const tracker = messageTrackerFactory({ id: 'foo', parser: {} })
|
||||||
|
tracker.track({}, handler1)
|
||||||
|
tracker.track({}, handler2)
|
||||||
|
tracker.purge(cb)
|
||||||
|
|
||||||
|
function cb (msgID, handler) {
|
||||||
|
if (count === 0) {
|
||||||
|
t.is(msgID, 1)
|
||||||
|
t.is(handler, handler1)
|
||||||
|
count += 1
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t.is(msgID, 2)
|
||||||
|
t.is(handler, handler2)
|
||||||
|
}
|
||||||
|
|
||||||
|
function handler1 () {}
|
||||||
|
function handler2 () {}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
tap.test('#remove', t => {
|
||||||
|
t.test('removes from the current track', async t => {
|
||||||
|
const tracker = messageTrackerFactory({ id: 'foo', parser: {} })
|
||||||
|
tracker.track({}, () => {})
|
||||||
|
tracker.remove(1)
|
||||||
|
t.is(tracker.pending, 0)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Not a great test. It exercises the desired code path, but we probably
|
||||||
|
// should expose some insight into the abandoned track.
|
||||||
|
t.test('removes from the abandoned track', async t => {
|
||||||
|
const tracker = messageTrackerFactory({ id: 'foo', parser: {} })
|
||||||
|
tracker.track({}, () => {})
|
||||||
|
tracker.track({ abandon: 'message' }, () => {})
|
||||||
|
tracker.abandon(1)
|
||||||
|
tracker.remove(1)
|
||||||
|
t.is(tracker.pending, 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
tap.test('#track', t => {
|
||||||
|
t.test('add messageID and tracks message', async t => {
|
||||||
|
const tracker = messageTrackerFactory({ id: 'foo', parser: {} })
|
||||||
|
const msg = {}
|
||||||
|
tracker.track(msg, handler)
|
||||||
|
|
||||||
|
t.deepEqual(msg, { messageID: 1 })
|
||||||
|
const cb = tracker.fetch(1)
|
||||||
|
t.is(cb, handler)
|
||||||
|
|
||||||
|
function handler () {}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.end()
|
||||||
|
})
|
|
@ -0,0 +1,64 @@
|
||||||
|
'use strict'
|
||||||
|
|
||||||
|
const { test } = require('tap')
|
||||||
|
const { MAX_MSGID } = require('../../../../lib/client/constants')
|
||||||
|
const purgeAbandoned = require('../../../../lib/client/message-tracker/purge-abandoned')
|
||||||
|
|
||||||
|
test('clears queue if only one message present', async t => {
|
||||||
|
t.plan(3)
|
||||||
|
const abandoned = new Map()
|
||||||
|
abandoned.set(1, { age: 2, cb })
|
||||||
|
|
||||||
|
purgeAbandoned(2, abandoned)
|
||||||
|
t.is(abandoned.size, 0)
|
||||||
|
|
||||||
|
function cb (err) {
|
||||||
|
t.is(err.name, 'AbandonedError')
|
||||||
|
t.is(err.message, 'client request abandoned')
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
test('clears queue if multiple messages present', async t => {
|
||||||
|
t.plan(5)
|
||||||
|
const abandoned = new Map()
|
||||||
|
abandoned.set(1, { age: 2, cb })
|
||||||
|
abandoned.set(2, { age: 3, cb })
|
||||||
|
|
||||||
|
purgeAbandoned(4, abandoned)
|
||||||
|
t.is(abandoned.size, 0)
|
||||||
|
|
||||||
|
function cb (err) {
|
||||||
|
t.is(err.name, 'AbandonedError')
|
||||||
|
t.is(err.message, 'client request abandoned')
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
test('message id has wrappred around', async t => {
|
||||||
|
t.plan(3)
|
||||||
|
const abandoned = new Map()
|
||||||
|
abandoned.set(MAX_MSGID - 1, { age: MAX_MSGID, cb })
|
||||||
|
|
||||||
|
// The "abandon" message was sent with an id of "MAX_MSGID". So the message
|
||||||
|
// that is triggering the purge was the "first" message in the new sequence
|
||||||
|
// of message identifiers.
|
||||||
|
purgeAbandoned(1, abandoned)
|
||||||
|
t.is(abandoned.size, 0)
|
||||||
|
|
||||||
|
function cb (err) {
|
||||||
|
t.is(err.name, 'AbandonedError')
|
||||||
|
t.is(err.message, 'client request abandoned')
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
test('does not clear if window not met', async t => {
|
||||||
|
t.plan(1)
|
||||||
|
const abandoned = new Map()
|
||||||
|
abandoned.set(1, { age: 2, cb })
|
||||||
|
|
||||||
|
purgeAbandoned(1, abandoned)
|
||||||
|
t.is(abandoned.size, 1)
|
||||||
|
|
||||||
|
function cb () {
|
||||||
|
t.fail('should not be invoked')
|
||||||
|
}
|
||||||
|
})
|
Loading…
Reference in New Issue