'use strict' const idGeneratorFactory = require('./id-generator') const purgeAbandoned = require('./purge-abandoned') /** * Returns a message tracker object that keeps track of which message * identifiers correspond to which message handlers. Also handles keeping track * of abandoned messages. * * @param {object} options * @param {string} options.id An identifier for the tracker. * @param {object} options.parser An object that will be used to parse messages. * * @returns {MessageTracker} */ module.exports = function messageTrackerFactory (options) { if (Object.prototype.toString.call(options) !== '[object Object]') { throw Error('options object is required') } if (!options.id || typeof options.id !== 'string') { throw Error('options.id string is required') } if (!options.parser || Object.prototype.toString.call(options.parser) !== '[object Object]') { throw Error('options.parser object is required') } let currentID = 0 const nextID = idGeneratorFactory() const messages = new Map() const abandoned = new Map() /** * @typedef {object} MessageTracker * @property {string} id The identifier of the tracker as supplied via the options. * @property {object} parser The parser object given by the the options. */ const tracker = { id: options.id, parser: options.parser } /** * Count of messages awaiting response. * * @alias pending * @memberof! MessageTracker# */ Object.defineProperty(tracker, 'pending', { get () { return messages.size } }) /** * Move a specific message to the abanded track. * * @param {integer} msgID The identifier for the message to move. * * @memberof MessageTracker * @method abandon */ tracker.abandon = function abandonMessage (msgID) { if (messages.has(msgID) === false) return false const toAbandon = messages.get(msgID) abandoned.set(msgID, { age: currentID, message: toAbandon.message, cb: toAbandon.callback }) return messages.delete(msgID) } /** * @typedef {object} Tracked * @property {object} message The tracked message. Usually the outgoing * request object. * @property {Function} callback The handler to use when receiving a * response to the tracked message. */ /** * Retrieves the message handler for a message. Removes abandoned messages * that have been given time to be resolved. * * @param {integer} msgID The identifier for the message to get the handler for. * * @memberof MessageTracker * @method fetch */ tracker.fetch = function fetchMessage (msgID) { const tracked = messages.get(msgID) if (tracked) { purgeAbandoned(msgID, abandoned) return tracked } // We sent an abandon request but the server either wasn't able to process // it or has not received it yet. Therefore, we received a response for the // abandoned message. So we must return the abandoned message's callback // to be processed normally. const abandonedMsg = abandoned.get(msgID) if (abandonedMsg) { return { message: abandonedMsg, callback: abandonedMsg.cb } } return null } /** * Removes all message tracks, cleans up the abandoned track, and invokes * a callback for each message purged. * * @param {function} cb A function with the signature `(msgID, handler)`. * * @memberof MessageTracker * @method purge */ tracker.purge = function purgeMessages (cb) { messages.forEach((val, key) => { purgeAbandoned(key, abandoned) tracker.remove(key) cb(key, val.callback) }) } /** * Removes a message from all tracking. * * @param {integer} msgID The identifier for the message to remove from tracking. * * @memberof MessageTracker * @method remove */ tracker.remove = function removeMessage (msgID) { if (messages.delete(msgID) === false) { abandoned.delete(msgID) } } /** * Add a message handler to be tracked. * * @param {object} message The message object to be tracked. This object will * have a new property added to it: `messageId`. * @param {function} callback The handler for the message. * * @memberof MessageTracker * @method track */ tracker.track = function trackMessage (message, callback) { currentID = nextID() // This side effect is not ideal but the client doesn't attach the tracker // to itself until after the `.connect` method has fired. If this can be // refactored later, then we can possibly get rid of this side effect. message.messageId = currentID messages.set(currentID, { callback, message }) } return tracker }