Refactor client RequestQueue into testable module (#548)
* Refactor client RequestQueue into testable module * Update test/lib/client/request-queue/enqueue.test.js Co-Authored-By: Tony Brix <tony@brix.ninja> * Update test/lib/client/request-queue/enqueue.test.js Co-Authored-By: Tony Brix <tony@brix.ninja> * Update test/lib/client/request-queue/enqueue.test.js Co-Authored-By: Tony Brix <tony@brix.ninja>
This commit is contained in:
parent
c2786d9f4b
commit
2e1ef78108
|
@ -1,5 +1,7 @@
|
|||
'use strict'
|
||||
|
||||
const requestQueueFactory = require('./request-queue')
|
||||
|
||||
var EventEmitter = require('events').EventEmitter
|
||||
var net = require('net')
|
||||
var tls = require('tls')
|
||||
|
@ -86,86 +88,6 @@ function ensureDN (input, strict) {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Queue to contain LDAP requests.
|
||||
*
|
||||
* @param {Object} opts queue options
|
||||
*
|
||||
* Accepted Options:
|
||||
* - size: Maximum queue size
|
||||
* - timeout: Set timeout between first queue insertion and queue flush.
|
||||
*/
|
||||
function RequestQueue (opts) {
|
||||
if (!opts || typeof (opts) !== 'object') {
|
||||
opts = {}
|
||||
}
|
||||
this.size = (opts.size > 0) ? opts.size : Infinity
|
||||
this.timeout = (opts.timeout > 0) ? opts.timeout : 0
|
||||
this._queue = []
|
||||
this._timer = null
|
||||
this._frozen = false
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert request into queue.
|
||||
*
|
||||
*/
|
||||
RequestQueue.prototype.enqueue = function enqueue (msg, expect, emitter, cb) {
|
||||
if (this._queue.length >= this.size || this._frozen) {
|
||||
return false
|
||||
}
|
||||
var self = this
|
||||
this._queue.push([msg, expect, emitter, cb])
|
||||
if (this.timeout > 0) {
|
||||
if (this._timer !== null) {
|
||||
this._timer = setTimeout(function () {
|
||||
// If queue times out, don't allow new entries until thawed
|
||||
self.freeze()
|
||||
self.purge()
|
||||
}, this.timeout)
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
/**
|
||||
* Process all queued requests with callback.
|
||||
*/
|
||||
RequestQueue.prototype.flush = function flush (cb) {
|
||||
if (this._timer) {
|
||||
clearTimeout(this._timer)
|
||||
this._timer = null
|
||||
}
|
||||
var items = this._queue
|
||||
this._queue = []
|
||||
items.forEach(function (req) {
|
||||
cb(req[0], req[1], req[2], req[3])
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Purge all queued requests with an error.
|
||||
*/
|
||||
RequestQueue.prototype.purge = function purge () {
|
||||
this.flush(function (msg, expect, emitter, cb) {
|
||||
cb(new errors.TimeoutError('request queue timeout'))
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Freeze queue, refusing any new entries.
|
||||
*/
|
||||
RequestQueue.prototype.freeze = function freeze () {
|
||||
this._frozen = true
|
||||
}
|
||||
|
||||
/**
|
||||
* Thaw queue, allowing new entries again.
|
||||
*/
|
||||
RequestQueue.prototype.thaw = function thaw () {
|
||||
this._frozen = false
|
||||
}
|
||||
|
||||
/**
|
||||
* Track message callback by messageID.
|
||||
*/
|
||||
|
@ -323,7 +245,7 @@ function Client (options) {
|
|||
}
|
||||
this.strictDN = (options.strictDN !== undefined) ? options.strictDN : true
|
||||
|
||||
this.queue = new RequestQueue({
|
||||
this.queue = requestQueueFactory({
|
||||
size: parseInt((options.queueSize || 0), 10),
|
||||
timeout: parseInt((options.queueTimeout || 0), 10)
|
||||
})
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
'use strict'
|
||||
|
||||
/**
|
||||
* Adds requests to the queue. If a timeout has been added to the queue then
|
||||
* this will freeze the queue with the newly added item, flush it, and then
|
||||
* unfreeze it when the queue has been cleared.
|
||||
*
|
||||
* @param {object} message An LDAP message object.
|
||||
* @param {object} expect An expectation object.
|
||||
* @param {object} emitter An event emitter or `null`.
|
||||
* @param {function} cb A callback to invoke when the request is finished.
|
||||
*
|
||||
* @returns {boolean} `true` if the requested was queued. `false` if the queue
|
||||
* is not accepting any requests.
|
||||
*/
|
||||
module.exports = function enqueue (message, expect, emitter, cb) {
|
||||
if (this._queue.length >= this.size || this._frozen) {
|
||||
return false
|
||||
}
|
||||
|
||||
this._queue.add({ message, expect, emitter, cb })
|
||||
|
||||
if (this.timeout === 0) return true
|
||||
if (this._timer === null) return true
|
||||
|
||||
// A queue can have a specified time allotted for it to be cleared. If that
|
||||
// time has been reached, reject new entries until the queue has been cleared.
|
||||
this._timer = setTimeout(queueTimeout.bind(this), this.timeout)
|
||||
|
||||
return true
|
||||
|
||||
function queueTimeout () {
|
||||
this.freeze()
|
||||
this.purge()
|
||||
}
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
'use strict'
|
||||
|
||||
/**
|
||||
* Invokes all requests in the queue by passing them to the supplied callback
|
||||
* function and then clears all items from the queue.
|
||||
*
|
||||
* @param {function} cb A function used to handle the requests.
|
||||
*/
|
||||
module.exports = function flush (cb) {
|
||||
if (this._timer) {
|
||||
clearTimeout(this._timer)
|
||||
this._timer = null
|
||||
}
|
||||
|
||||
// We must get a local copy of the queue and clear it before iterating it.
|
||||
// The client will invoke this flush function _many_ times. If we try to
|
||||
// iterate it without a local copy and clearing first then we will overflow
|
||||
// the stack.
|
||||
const requests = Array.from(this._queue.values())
|
||||
this._queue.clear()
|
||||
for (const req of requests) {
|
||||
cb(req.message, req.expect, req.emitter, req.cb)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
'use strict'
|
||||
|
||||
const enqueue = require('./enqueue')
|
||||
const flush = require('./flush')
|
||||
const purge = require('./purge')
|
||||
|
||||
/**
|
||||
* Builds a request queue object and returns it.
|
||||
*
|
||||
* @param {object} [options]
|
||||
* @param {integer} [options.size] Maximum size of the request queue. Must be
|
||||
* a number greater than `0` if supplied. Default: `Infinity`.
|
||||
* @param {integer} [options.timeout] Time in milliseconds a queue has to
|
||||
* complete the requests it contains.
|
||||
*
|
||||
* @returns {object} A queue instance.
|
||||
*/
|
||||
module.exports = function requestQueueFactory (options) {
|
||||
const opts = Object.assign({}, options)
|
||||
const q = {
|
||||
size: (opts.size > 0) ? opts.size : Infinity,
|
||||
timeout: (opts.timeout > 0) ? opts.timeout : 0,
|
||||
_queue: new Set(),
|
||||
_timer: null,
|
||||
_frozen: false
|
||||
}
|
||||
|
||||
q.enqueue = enqueue.bind(q)
|
||||
q.flush = flush.bind(q)
|
||||
q.purge = purge.bind(q)
|
||||
q.freeze = function freeze () {
|
||||
this._frozen = true
|
||||
}
|
||||
q.thaw = function thaw () {
|
||||
this._frozen = false
|
||||
}
|
||||
|
||||
return q
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
'use strict'
|
||||
|
||||
const { TimeoutError } = require('../../errors')
|
||||
|
||||
/**
|
||||
* Flushes the queue by rejecting all pending requests with a timeout error.
|
||||
*/
|
||||
module.exports = function purge () {
|
||||
this.flush(function flushCB (a, b, c, cb) {
|
||||
cb(new TimeoutError('request queue timeout'))
|
||||
})
|
||||
}
|
12
package.json
12
package.json
|
@ -34,12 +34,12 @@
|
|||
"uuid": "^3.3.3"
|
||||
},
|
||||
"scripts": {
|
||||
"test": "tap --no-cov test/**/*.test.js",
|
||||
"test:cov": "tap test/**/*.test.js",
|
||||
"test:cov:html": "tap --coverage-report=html test/**/*.test.js",
|
||||
"test:watch": "tap -n -w --no-coverage-report test/**/*.test.js",
|
||||
"lint": "standard examples/**/*.js lib/**/*.js test/**/*.js | snazzy",
|
||||
"lint:ci": "standard examples/**/*.js lib/**/*.js test/**/*.js"
|
||||
"test": "tap --no-cov",
|
||||
"test:cov": "tap",
|
||||
"test:cov:html": "tap --coverage-report=html",
|
||||
"test:watch": "tap -n -w --no-coverage-report",
|
||||
"lint": "standard | snazzy",
|
||||
"lint:ci": "standard"
|
||||
},
|
||||
"husky": {
|
||||
"hooks": {
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
'use strict'
|
||||
|
||||
const { test } = require('tap')
|
||||
const enqueue = require('../../../../lib/client/request-queue/enqueue')
|
||||
|
||||
test('rejects new requests if size is exceeded', async t => {
|
||||
const q = { _queue: { length: 5 }, size: 5 }
|
||||
const result = enqueue.call(q, 'foo', 'bar', {}, {})
|
||||
t.false(result)
|
||||
})
|
||||
|
||||
test('rejects new requests if queue is frozen', async t => {
|
||||
const q = { _queue: { length: 0 }, size: 5, _frozen: true }
|
||||
const result = enqueue.call(q, 'foo', 'bar', {}, {})
|
||||
t.false(result)
|
||||
})
|
||||
|
||||
test('adds a request and returns if no timeout', async t => {
|
||||
const q = {
|
||||
_queue: {
|
||||
length: 0,
|
||||
add (obj) {
|
||||
t.deepEqual(obj, {
|
||||
message: 'foo',
|
||||
expect: 'bar',
|
||||
emitter: 'baz',
|
||||
cb: 'bif'
|
||||
})
|
||||
}
|
||||
},
|
||||
_frozen: false,
|
||||
timeout: 0
|
||||
}
|
||||
const result = enqueue.call(q, 'foo', 'bar', 'baz', 'bif')
|
||||
t.true(result)
|
||||
})
|
||||
|
||||
test('adds a request and returns timer not set', async t => {
|
||||
const q = {
|
||||
_queue: {
|
||||
length: 0,
|
||||
add (obj) {
|
||||
t.deepEqual(obj, {
|
||||
message: 'foo',
|
||||
expect: 'bar',
|
||||
emitter: 'baz',
|
||||
cb: 'bif'
|
||||
})
|
||||
}
|
||||
},
|
||||
_frozen: false,
|
||||
timeout: 100,
|
||||
_timer: null
|
||||
}
|
||||
const result = enqueue.call(q, 'foo', 'bar', 'baz', 'bif')
|
||||
t.true(result)
|
||||
})
|
||||
|
||||
test('adds a request, returns true, and clears queue', t => {
|
||||
// Must not be an async test due to an internal `setTimeout`
|
||||
t.plan(4)
|
||||
const q = {
|
||||
_queue: {
|
||||
length: 0,
|
||||
add (obj) {
|
||||
t.deepEqual(obj, {
|
||||
message: 'foo',
|
||||
expect: 'bar',
|
||||
emitter: 'baz',
|
||||
cb: 'bif'
|
||||
})
|
||||
}
|
||||
},
|
||||
_frozen: false,
|
||||
timeout: 5,
|
||||
_timer: 123,
|
||||
freeze () { t.pass() },
|
||||
purge () { t.pass() }
|
||||
}
|
||||
const result = enqueue.call(q, 'foo', 'bar', 'baz', 'bif')
|
||||
t.true(result)
|
||||
})
|
|
@ -0,0 +1,51 @@
|
|||
'use strict'
|
||||
|
||||
const { test } = require('tap')
|
||||
const flush = require('../../../../lib/client/request-queue/flush')
|
||||
|
||||
test('clears timer', async t => {
|
||||
t.plan(2)
|
||||
const q = {
|
||||
_timer: 123,
|
||||
_queue: {
|
||||
values () {
|
||||
return []
|
||||
},
|
||||
clear () {
|
||||
t.pass()
|
||||
}
|
||||
}
|
||||
}
|
||||
flush.call(q)
|
||||
t.is(q._timer, null)
|
||||
})
|
||||
|
||||
test('invokes callback with parameters', async t => {
|
||||
t.plan(6)
|
||||
const req = {
|
||||
message: 'foo',
|
||||
expect: 'bar',
|
||||
emitter: 'baz',
|
||||
cb: theCB
|
||||
}
|
||||
const q = {
|
||||
_timer: 123,
|
||||
_queue: {
|
||||
values () {
|
||||
return [req]
|
||||
},
|
||||
clear () {
|
||||
t.pass()
|
||||
}
|
||||
}
|
||||
}
|
||||
flush.call(q, (message, expect, emitter, cb) => {
|
||||
t.is(message, 'foo')
|
||||
t.is(expect, 'bar')
|
||||
t.is(emitter, 'baz')
|
||||
t.is(cb, theCB)
|
||||
})
|
||||
t.is(q._timer, null)
|
||||
|
||||
function theCB () {}
|
||||
})
|
|
@ -0,0 +1,18 @@
|
|||
'use strict'
|
||||
|
||||
const { test } = require('tap')
|
||||
const purge = require('../../../../lib/client/request-queue/purge')
|
||||
|
||||
test('flushes the queue with timeout errors', async t => {
|
||||
t.plan(3)
|
||||
const q = {
|
||||
flush (func) {
|
||||
func('a', 'b', 'c', (err) => {
|
||||
t.ok(err)
|
||||
t.is(err.name, 'TimeoutError')
|
||||
t.is(err.message, 'request queue timeout')
|
||||
})
|
||||
}
|
||||
}
|
||||
purge.call(q)
|
||||
})
|
Loading…
Reference in New Issue