diff --git a/lib/client/client.js b/lib/client/client.js index 637a62e..b254d50 100644 --- a/lib/client/client.js +++ b/lib/client/client.js @@ -25,6 +25,7 @@ var errors = require('../errors') var filters = require('../filters') var messages = require('../messages') var url = require('../url') +var CorkedEmitter = require('../corked_emitter') /// --- Globals @@ -632,7 +633,7 @@ Client.prototype.search = function search (base, pager.on('search', sendRequest) pager.begin() } else { - sendRequest(controls, new EventEmitter(), callback) + sendRequest(controls, new CorkedEmitter(), callback) } } diff --git a/lib/corked_emitter.js b/lib/corked_emitter.js new file mode 100644 index 0000000..c5f59de --- /dev/null +++ b/lib/corked_emitter.js @@ -0,0 +1,50 @@ +'use strict' + +var EventEmitter = require('events').EventEmitter + +/** + * A CorkedEmitter is a variant of an EventEmitter where events emitted + * wait for the appearance of the first listener of any kind. That is, + * a CorkedEmitter will store all .emit()s it receives, to be replayed + * later when an .on() is applied. + * It is meant for situations where the consumers of the emitter are + * unable to register listeners right away, and cannot afford to miss + * any events emitted from the start. + * Note that, whenever the first emitter (for any event) appears, + * the emitter becomes uncorked and works as usual for ALL events, and + * will not cache anything anymore. This is necessary to avoid + * re-ordering emits - either everything is being buffered, or nothing. + */ +function CorkedEmitter () { + var self = this + EventEmitter.call(self) + /** + * An array of arguments objects (array-likes) to emit on open. + */ + self._outstandingEmits = [] + /** + * Whether the normal flow of emits is restored yet. + */ + self._opened = false + // When the first listener appears, we enqueue an opening. + // It is not done immediately, so that other listeners can be + // registered in the same critical section. + self.once('newListener', function () { + setImmediate(function releaseStoredEvents () { + self._opened = true + self._outstandingEmits.forEach(function (args) { + self.emit.apply(self, args) + }) + }) + }) +} +CorkedEmitter.prototype = Object.create(EventEmitter.prototype) +CorkedEmitter.prototype.emit = function emit (eventName) { + if (this._opened || eventName === 'newListener') { + EventEmitter.prototype.emit.apply(this, arguments) + } else { + this._outstandingEmits.push(arguments) + } +} + +module.exports = CorkedEmitter diff --git a/test/client.test.js b/test/client.test.js index 8d1090c..bedb3dd 100644 --- a/test/client.test.js +++ b/test/client.test.js @@ -703,6 +703,25 @@ tap.test('search basic', function (t) { }) }) +tap.test('GH-602 search basic with delayed event listener binding', function (t) { + t.context.client.search('cn=test, ' + SUFFIX, '(objectclass=*)', function (err, res) { + t.error(err) + setTimeout(() => { + let gotEntry = 0 + res.on('searchEntry', function (entry) { + gotEntry++ + }) + res.on('error', function (err) { + t.fail(err) + }) + res.on('end', function (res) { + t.equal(gotEntry, 2) + t.end() + }) + }, 100) + }) +}) + tap.test('search sizeLimit', function (t) { t.test('over limit', function (t2) { t.context.client.search('cn=sizelimit', {}, function (err, res) { diff --git a/test/corked_emitter.test.js b/test/corked_emitter.test.js new file mode 100644 index 0000000..5edecd0 --- /dev/null +++ b/test/corked_emitter.test.js @@ -0,0 +1,104 @@ +'use strict' + +const { test } = require('tap') +const CorkedEmitter = require('../lib/corked_emitter') + +function gatherEventSequence (expectedNumber) { + const gatheredEvents = [] + let callback + const finished = new Promise(function (resolve) { + callback = function (...args) { + gatheredEvents.push(...args) + if (gatheredEvents.length >= expectedNumber) { + // Prevent result mutation after our promise is resolved: + resolve(gatheredEvents.slice()) + } + } + }) + return { + finished, + callback + } +} + +test('normal emit flow', function (t) { + const emitter = new CorkedEmitter() + const expectedSequence = [ + ['searchEntry', { data: 'a' }], + ['searchEntry', { data: 'b' }], + ['end'] + ] + const gatherer = gatherEventSequence(3) + emitter.on('searchEntry', function (...args) { + gatherer.callback(['searchEntry', ...args]) + }) + emitter.on('end', function (...args) { + gatherer.callback(['end', ...args]) + }) + emitter.emit('searchEntry', { data: 'a' }) + emitter.emit('searchEntry', { data: 'b' }) + emitter.emit('end') + gatherer.finished.then(function (gatheredEvents) { + expectedSequence.forEach(function (expectedEvent, i) { + t.equal(JSON.stringify(expectedEvent), JSON.stringify(gatheredEvents[i])) + }) + t.end() + }) +}) + +test('reversed listener registration', function (t) { + const emitter = new CorkedEmitter() + const expectedSequence = [ + ['searchEntry', { data: 'a' }], + ['searchEntry', { data: 'b' }], + ['end'] + ] + const gatherer = gatherEventSequence(3) + // This time, we swap the event listener registrations. + // The order of emits should remain unchanged. + emitter.on('end', function (...args) { + gatherer.callback(['end', ...args]) + }) + emitter.on('searchEntry', function (...args) { + gatherer.callback(['searchEntry', ...args]) + }) + emitter.emit('searchEntry', { data: 'a' }) + emitter.emit('searchEntry', { data: 'b' }) + emitter.emit('end') + gatherer.finished.then(function (gatheredEvents) { + expectedSequence.forEach(function (expectedEvent, i) { + t.equal(JSON.stringify(expectedEvent), JSON.stringify(gatheredEvents[i])) + }) + t.end() + }) +}) + +test('delayed listener registration', function (t) { + const emitter = new CorkedEmitter() + const expectedSequence = [ + ['searchEntry', { data: 'a' }], + ['searchEntry', { data: 'b' }], + ['end'] + ] + const gatherer = gatherEventSequence(3) + emitter.emit('searchEntry', { data: 'a' }) + emitter.emit('searchEntry', { data: 'b' }) + emitter.emit('end') + // The listeners only appear after a brief delay - this simulates + // the situation described in https://github.com/ldapjs/node-ldapjs/issues/602 + // and in https://github.com/ifroz/node-ldapjs/commit/5239f6c68827f2c25b4589089c199d15bb882412 + setTimeout(function () { + emitter.on('end', function (...args) { + gatherer.callback(['end', ...args]) + }) + emitter.on('searchEntry', function (...args) { + gatherer.callback(['searchEntry', ...args]) + }) + }, 50) + gatherer.finished.then(function (gatheredEvents) { + expectedSequence.forEach(function (expectedEvent, i) { + t.equal(JSON.stringify(expectedEvent), JSON.stringify(gatheredEvents[i])) + }) + t.end() + }) +})