Implement queueing events until a listener appears
This resolves an issue arising from using both a callback and an EventEmitter together in the Client.search() API. Since the emitter would only be available through the callback, some events could be emitted before the callback is triggered, resulting in missed events. This change incorporates a test case originally by László Szűcs (@ifroz). For GH-602
This commit is contained in:
parent
a67303e7a4
commit
5204bb7ac0
|
@ -25,6 +25,7 @@ var errors = require('../errors')
|
|||
var filters = require('../filters')
|
||||
var messages = require('../messages')
|
||||
var url = require('../url')
|
||||
var CorkedEmitter = require('../corked_emitter')
|
||||
|
||||
/// --- Globals
|
||||
|
||||
|
@ -632,7 +633,7 @@ Client.prototype.search = function search (base,
|
|||
pager.on('search', sendRequest)
|
||||
pager.begin()
|
||||
} else {
|
||||
sendRequest(controls, new EventEmitter(), callback)
|
||||
sendRequest(controls, new CorkedEmitter(), callback)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
'use strict'
|
||||
|
||||
var EventEmitter = require('events').EventEmitter
|
||||
|
||||
/**
|
||||
* A CorkedEmitter is a variant of an EventEmitter where events emitted
|
||||
* wait for the appearance of the first listener of any kind. That is,
|
||||
* a CorkedEmitter will store all .emit()s it receives, to be replayed
|
||||
* later when an .on() is applied.
|
||||
* It is meant for situations where the consumers of the emitter are
|
||||
* unable to register listeners right away, and cannot afford to miss
|
||||
* any events emitted from the start.
|
||||
* Note that, whenever the first emitter (for any event) appears,
|
||||
* the emitter becomes uncorked and works as usual for ALL events, and
|
||||
* will not cache anything anymore. This is necessary to avoid
|
||||
* re-ordering emits - either everything is being buffered, or nothing.
|
||||
*/
|
||||
function CorkedEmitter () {
|
||||
var self = this
|
||||
EventEmitter.call(self)
|
||||
/**
|
||||
* An array of arguments objects (array-likes) to emit on open.
|
||||
*/
|
||||
self._outstandingEmits = []
|
||||
/**
|
||||
* Whether the normal flow of emits is restored yet.
|
||||
*/
|
||||
self._opened = false
|
||||
// When the first listener appears, we enqueue an opening.
|
||||
// It is not done immediately, so that other listeners can be
|
||||
// registered in the same critical section.
|
||||
self.once('newListener', function () {
|
||||
setImmediate(function releaseStoredEvents () {
|
||||
self._opened = true
|
||||
self._outstandingEmits.forEach(function (args) {
|
||||
self.emit.apply(self, args)
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
CorkedEmitter.prototype = Object.create(EventEmitter.prototype)
|
||||
CorkedEmitter.prototype.emit = function emit (eventName) {
|
||||
if (this._opened || eventName === 'newListener') {
|
||||
EventEmitter.prototype.emit.apply(this, arguments)
|
||||
} else {
|
||||
this._outstandingEmits.push(arguments)
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = CorkedEmitter
|
|
@ -703,6 +703,25 @@ tap.test('search basic', function (t) {
|
|||
})
|
||||
})
|
||||
|
||||
tap.test('GH-602 search basic with delayed event listener binding', function (t) {
|
||||
t.context.client.search('cn=test, ' + SUFFIX, '(objectclass=*)', function (err, res) {
|
||||
t.error(err)
|
||||
setTimeout(() => {
|
||||
let gotEntry = 0
|
||||
res.on('searchEntry', function (entry) {
|
||||
gotEntry++
|
||||
})
|
||||
res.on('error', function (err) {
|
||||
t.fail(err)
|
||||
})
|
||||
res.on('end', function (res) {
|
||||
t.equal(gotEntry, 2)
|
||||
t.end()
|
||||
})
|
||||
}, 100)
|
||||
})
|
||||
})
|
||||
|
||||
tap.test('search sizeLimit', function (t) {
|
||||
t.test('over limit', function (t2) {
|
||||
t.context.client.search('cn=sizelimit', {}, function (err, res) {
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
'use strict'
|
||||
|
||||
const { test } = require('tap')
|
||||
const CorkedEmitter = require('../lib/corked_emitter')
|
||||
|
||||
function gatherEventSequence (expectedNumber) {
|
||||
const gatheredEvents = []
|
||||
let callback
|
||||
const finished = new Promise(function (resolve) {
|
||||
callback = function (...args) {
|
||||
gatheredEvents.push(...args)
|
||||
if (gatheredEvents.length >= expectedNumber) {
|
||||
// Prevent result mutation after our promise is resolved:
|
||||
resolve(gatheredEvents.slice())
|
||||
}
|
||||
}
|
||||
})
|
||||
return {
|
||||
finished,
|
||||
callback
|
||||
}
|
||||
}
|
||||
|
||||
test('normal emit flow', function (t) {
|
||||
const emitter = new CorkedEmitter()
|
||||
const expectedSequence = [
|
||||
['searchEntry', { data: 'a' }],
|
||||
['searchEntry', { data: 'b' }],
|
||||
['end']
|
||||
]
|
||||
const gatherer = gatherEventSequence(3)
|
||||
emitter.on('searchEntry', function (...args) {
|
||||
gatherer.callback(['searchEntry', ...args])
|
||||
})
|
||||
emitter.on('end', function (...args) {
|
||||
gatherer.callback(['end', ...args])
|
||||
})
|
||||
emitter.emit('searchEntry', { data: 'a' })
|
||||
emitter.emit('searchEntry', { data: 'b' })
|
||||
emitter.emit('end')
|
||||
gatherer.finished.then(function (gatheredEvents) {
|
||||
expectedSequence.forEach(function (expectedEvent, i) {
|
||||
t.equal(JSON.stringify(expectedEvent), JSON.stringify(gatheredEvents[i]))
|
||||
})
|
||||
t.end()
|
||||
})
|
||||
})
|
||||
|
||||
test('reversed listener registration', function (t) {
|
||||
const emitter = new CorkedEmitter()
|
||||
const expectedSequence = [
|
||||
['searchEntry', { data: 'a' }],
|
||||
['searchEntry', { data: 'b' }],
|
||||
['end']
|
||||
]
|
||||
const gatherer = gatherEventSequence(3)
|
||||
// This time, we swap the event listener registrations.
|
||||
// The order of emits should remain unchanged.
|
||||
emitter.on('end', function (...args) {
|
||||
gatherer.callback(['end', ...args])
|
||||
})
|
||||
emitter.on('searchEntry', function (...args) {
|
||||
gatherer.callback(['searchEntry', ...args])
|
||||
})
|
||||
emitter.emit('searchEntry', { data: 'a' })
|
||||
emitter.emit('searchEntry', { data: 'b' })
|
||||
emitter.emit('end')
|
||||
gatherer.finished.then(function (gatheredEvents) {
|
||||
expectedSequence.forEach(function (expectedEvent, i) {
|
||||
t.equal(JSON.stringify(expectedEvent), JSON.stringify(gatheredEvents[i]))
|
||||
})
|
||||
t.end()
|
||||
})
|
||||
})
|
||||
|
||||
test('delayed listener registration', function (t) {
|
||||
const emitter = new CorkedEmitter()
|
||||
const expectedSequence = [
|
||||
['searchEntry', { data: 'a' }],
|
||||
['searchEntry', { data: 'b' }],
|
||||
['end']
|
||||
]
|
||||
const gatherer = gatherEventSequence(3)
|
||||
emitter.emit('searchEntry', { data: 'a' })
|
||||
emitter.emit('searchEntry', { data: 'b' })
|
||||
emitter.emit('end')
|
||||
// The listeners only appear after a brief delay - this simulates
|
||||
// the situation described in https://github.com/ldapjs/node-ldapjs/issues/602
|
||||
// and in https://github.com/ifroz/node-ldapjs/commit/5239f6c68827f2c25b4589089c199d15bb882412
|
||||
setTimeout(function () {
|
||||
emitter.on('end', function (...args) {
|
||||
gatherer.callback(['end', ...args])
|
||||
})
|
||||
emitter.on('searchEntry', function (...args) {
|
||||
gatherer.callback(['searchEntry', ...args])
|
||||
})
|
||||
}, 50)
|
||||
gatherer.finished.then(function (gatheredEvents) {
|
||||
expectedSequence.forEach(function (expectedEvent, i) {
|
||||
t.equal(JSON.stringify(expectedEvent), JSON.stringify(gatheredEvents[i]))
|
||||
})
|
||||
t.end()
|
||||
})
|
||||
})
|
Loading…
Reference in New Issue