diff --git a/.taprc b/.taprc index 13df826..d363411 100644 --- a/.taprc +++ b/.taprc @@ -1 +1,4 @@ esm: false + +files: + - 'test/**/*.test.js' diff --git a/lib/client/client.js b/lib/client/client.js index cf668e5..a249a82 100644 --- a/lib/client/client.js +++ b/lib/client/client.js @@ -1,5 +1,7 @@ 'use strict' +const requestQueueFactory = require('./request-queue') + var EventEmitter = require('events').EventEmitter var net = require('net') var tls = require('tls') @@ -86,86 +88,6 @@ function ensureDN (input, strict) { } } -/** - * Queue to contain LDAP requests. - * - * @param {Object} opts queue options - * - * Accepted Options: - * - size: Maximum queue size - * - timeout: Set timeout between first queue insertion and queue flush. - */ -function RequestQueue (opts) { - if (!opts || typeof (opts) !== 'object') { - opts = {} - } - this.size = (opts.size > 0) ? opts.size : Infinity - this.timeout = (opts.timeout > 0) ? opts.timeout : 0 - this._queue = [] - this._timer = null - this._frozen = false -} - -/** - * Insert request into queue. - * - */ -RequestQueue.prototype.enqueue = function enqueue (msg, expect, emitter, cb) { - if (this._queue.length >= this.size || this._frozen) { - return false - } - var self = this - this._queue.push([msg, expect, emitter, cb]) - if (this.timeout > 0) { - if (this._timer !== null) { - this._timer = setTimeout(function () { - // If queue times out, don't allow new entries until thawed - self.freeze() - self.purge() - }, this.timeout) - } - } - return true -} - -/** - * Process all queued requests with callback. - */ -RequestQueue.prototype.flush = function flush (cb) { - if (this._timer) { - clearTimeout(this._timer) - this._timer = null - } - var items = this._queue - this._queue = [] - items.forEach(function (req) { - cb(req[0], req[1], req[2], req[3]) - }) -} - -/** - * Purge all queued requests with an error. - */ -RequestQueue.prototype.purge = function purge () { - this.flush(function (msg, expect, emitter, cb) { - cb(new errors.TimeoutError('request queue timeout')) - }) -} - -/** - * Freeze queue, refusing any new entries. - */ -RequestQueue.prototype.freeze = function freeze () { - this._frozen = true -} - -/** - * Thaw queue, allowing new entries again. - */ -RequestQueue.prototype.thaw = function thaw () { - this._frozen = false -} - /** * Track message callback by messageID. */ @@ -323,7 +245,7 @@ function Client (options) { } this.strictDN = (options.strictDN !== undefined) ? options.strictDN : true - this.queue = new RequestQueue({ + this.queue = requestQueueFactory({ size: parseInt((options.queueSize || 0), 10), timeout: parseInt((options.queueTimeout || 0), 10) }) diff --git a/lib/client/request-queue/enqueue.js b/lib/client/request-queue/enqueue.js new file mode 100644 index 0000000..a1fbe95 --- /dev/null +++ b/lib/client/request-queue/enqueue.js @@ -0,0 +1,36 @@ +'use strict' + +/** + * Adds requests to the queue. If a timeout has been added to the queue then + * this will freeze the queue with the newly added item, flush it, and then + * unfreeze it when the queue has been cleared. + * + * @param {object} message An LDAP message object. + * @param {object} expect An expectation object. + * @param {object} emitter An event emitter or `null`. + * @param {function} cb A callback to invoke when the request is finished. + * + * @returns {boolean} `true` if the requested was queued. `false` if the queue + * is not accepting any requests. + */ +module.exports = function enqueue (message, expect, emitter, cb) { + if (this._queue.length >= this.size || this._frozen) { + return false + } + + this._queue.add({ message, expect, emitter, cb }) + + if (this.timeout === 0) return true + if (this._timer === null) return true + + // A queue can have a specified time allotted for it to be cleared. If that + // time has been reached, reject new entries until the queue has been cleared. + this._timer = setTimeout(queueTimeout.bind(this), this.timeout) + + return true + + function queueTimeout () { + this.freeze() + this.purge() + } +} diff --git a/lib/client/request-queue/flush.js b/lib/client/request-queue/flush.js new file mode 100644 index 0000000..c0f581c --- /dev/null +++ b/lib/client/request-queue/flush.js @@ -0,0 +1,24 @@ +'use strict' + +/** + * Invokes all requests in the queue by passing them to the supplied callback + * function and then clears all items from the queue. + * + * @param {function} cb A function used to handle the requests. + */ +module.exports = function flush (cb) { + if (this._timer) { + clearTimeout(this._timer) + this._timer = null + } + + // We must get a local copy of the queue and clear it before iterating it. + // The client will invoke this flush function _many_ times. If we try to + // iterate it without a local copy and clearing first then we will overflow + // the stack. + const requests = Array.from(this._queue.values()) + this._queue.clear() + for (const req of requests) { + cb(req.message, req.expect, req.emitter, req.cb) + } +} diff --git a/lib/client/request-queue/index.js b/lib/client/request-queue/index.js new file mode 100644 index 0000000..6fb1325 --- /dev/null +++ b/lib/client/request-queue/index.js @@ -0,0 +1,39 @@ +'use strict' + +const enqueue = require('./enqueue') +const flush = require('./flush') +const purge = require('./purge') + +/** + * Builds a request queue object and returns it. + * + * @param {object} [options] + * @param {integer} [options.size] Maximum size of the request queue. Must be + * a number greater than `0` if supplied. Default: `Infinity`. + * @param {integer} [options.timeout] Time in milliseconds a queue has to + * complete the requests it contains. + * + * @returns {object} A queue instance. + */ +module.exports = function requestQueueFactory (options) { + const opts = Object.assign({}, options) + const q = { + size: (opts.size > 0) ? opts.size : Infinity, + timeout: (opts.timeout > 0) ? opts.timeout : 0, + _queue: new Set(), + _timer: null, + _frozen: false + } + + q.enqueue = enqueue.bind(q) + q.flush = flush.bind(q) + q.purge = purge.bind(q) + q.freeze = function freeze () { + this._frozen = true + } + q.thaw = function thaw () { + this._frozen = false + } + + return q +} diff --git a/lib/client/request-queue/purge.js b/lib/client/request-queue/purge.js new file mode 100644 index 0000000..dd0dc46 --- /dev/null +++ b/lib/client/request-queue/purge.js @@ -0,0 +1,12 @@ +'use strict' + +const { TimeoutError } = require('../../errors') + +/** + * Flushes the queue by rejecting all pending requests with a timeout error. + */ +module.exports = function purge () { + this.flush(function flushCB (a, b, c, cb) { + cb(new TimeoutError('request queue timeout')) + }) +} diff --git a/package.json b/package.json index a694abb..4287904 100644 --- a/package.json +++ b/package.json @@ -34,12 +34,12 @@ "uuid": "^3.3.3" }, "scripts": { - "test": "tap --no-cov test/**/*.test.js", - "test:cov": "tap test/**/*.test.js", - "test:cov:html": "tap --coverage-report=html test/**/*.test.js", - "test:watch": "tap -n -w --no-coverage-report test/**/*.test.js", - "lint": "standard examples/**/*.js lib/**/*.js test/**/*.js | snazzy", - "lint:ci": "standard examples/**/*.js lib/**/*.js test/**/*.js" + "test": "tap --no-cov", + "test:cov": "tap", + "test:cov:html": "tap --coverage-report=html", + "test:watch": "tap -n -w --no-coverage-report", + "lint": "standard | snazzy", + "lint:ci": "standard" }, "husky": { "hooks": { diff --git a/test/lib/client/request-queue/enqueue.test.js b/test/lib/client/request-queue/enqueue.test.js new file mode 100644 index 0000000..28fab4f --- /dev/null +++ b/test/lib/client/request-queue/enqueue.test.js @@ -0,0 +1,82 @@ +'use strict' + +const { test } = require('tap') +const enqueue = require('../../../../lib/client/request-queue/enqueue') + +test('rejects new requests if size is exceeded', async t => { + const q = { _queue: { length: 5 }, size: 5 } + const result = enqueue.call(q, 'foo', 'bar', {}, {}) + t.false(result) +}) + +test('rejects new requests if queue is frozen', async t => { + const q = { _queue: { length: 0 }, size: 5, _frozen: true } + const result = enqueue.call(q, 'foo', 'bar', {}, {}) + t.false(result) +}) + +test('adds a request and returns if no timeout', async t => { + const q = { + _queue: { + length: 0, + add (obj) { + t.deepEqual(obj, { + message: 'foo', + expect: 'bar', + emitter: 'baz', + cb: 'bif' + }) + } + }, + _frozen: false, + timeout: 0 + } + const result = enqueue.call(q, 'foo', 'bar', 'baz', 'bif') + t.true(result) +}) + +test('adds a request and returns timer not set', async t => { + const q = { + _queue: { + length: 0, + add (obj) { + t.deepEqual(obj, { + message: 'foo', + expect: 'bar', + emitter: 'baz', + cb: 'bif' + }) + } + }, + _frozen: false, + timeout: 100, + _timer: null + } + const result = enqueue.call(q, 'foo', 'bar', 'baz', 'bif') + t.true(result) +}) + +test('adds a request, returns true, and clears queue', t => { + // Must not be an async test due to an internal `setTimeout` + t.plan(4) + const q = { + _queue: { + length: 0, + add (obj) { + t.deepEqual(obj, { + message: 'foo', + expect: 'bar', + emitter: 'baz', + cb: 'bif' + }) + } + }, + _frozen: false, + timeout: 5, + _timer: 123, + freeze () { t.pass() }, + purge () { t.pass() } + } + const result = enqueue.call(q, 'foo', 'bar', 'baz', 'bif') + t.true(result) +}) diff --git a/test/lib/client/request-queue/flush.test.js b/test/lib/client/request-queue/flush.test.js new file mode 100644 index 0000000..96dc7c6 --- /dev/null +++ b/test/lib/client/request-queue/flush.test.js @@ -0,0 +1,51 @@ +'use strict' + +const { test } = require('tap') +const flush = require('../../../../lib/client/request-queue/flush') + +test('clears timer', async t => { + t.plan(2) + const q = { + _timer: 123, + _queue: { + values () { + return [] + }, + clear () { + t.pass() + } + } + } + flush.call(q) + t.is(q._timer, null) +}) + +test('invokes callback with parameters', async t => { + t.plan(6) + const req = { + message: 'foo', + expect: 'bar', + emitter: 'baz', + cb: theCB + } + const q = { + _timer: 123, + _queue: { + values () { + return [req] + }, + clear () { + t.pass() + } + } + } + flush.call(q, (message, expect, emitter, cb) => { + t.is(message, 'foo') + t.is(expect, 'bar') + t.is(emitter, 'baz') + t.is(cb, theCB) + }) + t.is(q._timer, null) + + function theCB () {} +}) diff --git a/test/lib/client/request-queue/purge.test.js b/test/lib/client/request-queue/purge.test.js new file mode 100644 index 0000000..198e83b --- /dev/null +++ b/test/lib/client/request-queue/purge.test.js @@ -0,0 +1,18 @@ +'use strict' + +const { test } = require('tap') +const purge = require('../../../../lib/client/request-queue/purge') + +test('flushes the queue with timeout errors', async t => { + t.plan(3) + const q = { + flush (func) { + func('a', 'b', 'c', (err) => { + t.ok(err) + t.is(err.name, 'TimeoutError') + t.is(err.message, 'request queue timeout') + }) + } + } + purge.call(q) +})