From 5204bb7ac0edc6ae77d523d8e6fcb464487e5aa5 Mon Sep 17 00:00:00 2001 From: Robert Kawecki Date: Tue, 12 May 2020 16:51:28 +0200 Subject: [PATCH] Implement queueing events until a listener appears MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This resolves an issue arising from using both a callback and an EventEmitter together in the Client.search() API. Since the emitter would only be available through the callback, some events could be emitted before the callback is triggered, resulting in missed events. This change incorporates a test case originally by László Szűcs (@ifroz). For GH-602 --- lib/client/client.js | 3 +- lib/corked_emitter.js | 50 +++++++++++++++++ test/client.test.js | 19 +++++++ test/corked_emitter.test.js | 104 ++++++++++++++++++++++++++++++++++++ 4 files changed, 175 insertions(+), 1 deletion(-) create mode 100644 lib/corked_emitter.js create mode 100644 test/corked_emitter.test.js diff --git a/lib/client/client.js b/lib/client/client.js index 637a62e..b254d50 100644 --- a/lib/client/client.js +++ b/lib/client/client.js @@ -25,6 +25,7 @@ var errors = require('../errors') var filters = require('../filters') var messages = require('../messages') var url = require('../url') +var CorkedEmitter = require('../corked_emitter') /// --- Globals @@ -632,7 +633,7 @@ Client.prototype.search = function search (base, pager.on('search', sendRequest) pager.begin() } else { - sendRequest(controls, new EventEmitter(), callback) + sendRequest(controls, new CorkedEmitter(), callback) } } diff --git a/lib/corked_emitter.js b/lib/corked_emitter.js new file mode 100644 index 0000000..c5f59de --- /dev/null +++ b/lib/corked_emitter.js @@ -0,0 +1,50 @@ +'use strict' + +var EventEmitter = require('events').EventEmitter + +/** + * A CorkedEmitter is a variant of an EventEmitter where events emitted + * wait for the appearance of the first listener of any kind. That is, + * a CorkedEmitter will store all .emit()s it receives, to be replayed + * later when an .on() is applied. + * It is meant for situations where the consumers of the emitter are + * unable to register listeners right away, and cannot afford to miss + * any events emitted from the start. + * Note that, whenever the first emitter (for any event) appears, + * the emitter becomes uncorked and works as usual for ALL events, and + * will not cache anything anymore. This is necessary to avoid + * re-ordering emits - either everything is being buffered, or nothing. + */ +function CorkedEmitter () { + var self = this + EventEmitter.call(self) + /** + * An array of arguments objects (array-likes) to emit on open. + */ + self._outstandingEmits = [] + /** + * Whether the normal flow of emits is restored yet. + */ + self._opened = false + // When the first listener appears, we enqueue an opening. + // It is not done immediately, so that other listeners can be + // registered in the same critical section. + self.once('newListener', function () { + setImmediate(function releaseStoredEvents () { + self._opened = true + self._outstandingEmits.forEach(function (args) { + self.emit.apply(self, args) + }) + }) + }) +} +CorkedEmitter.prototype = Object.create(EventEmitter.prototype) +CorkedEmitter.prototype.emit = function emit (eventName) { + if (this._opened || eventName === 'newListener') { + EventEmitter.prototype.emit.apply(this, arguments) + } else { + this._outstandingEmits.push(arguments) + } +} + +module.exports = CorkedEmitter diff --git a/test/client.test.js b/test/client.test.js index 8d1090c..bedb3dd 100644 --- a/test/client.test.js +++ b/test/client.test.js @@ -703,6 +703,25 @@ tap.test('search basic', function (t) { }) }) +tap.test('GH-602 search basic with delayed event listener binding', function (t) { + t.context.client.search('cn=test, ' + SUFFIX, '(objectclass=*)', function (err, res) { + t.error(err) + setTimeout(() => { + let gotEntry = 0 + res.on('searchEntry', function (entry) { + gotEntry++ + }) + res.on('error', function (err) { + t.fail(err) + }) + res.on('end', function (res) { + t.equal(gotEntry, 2) + t.end() + }) + }, 100) + }) +}) + tap.test('search sizeLimit', function (t) { t.test('over limit', function (t2) { t.context.client.search('cn=sizelimit', {}, function (err, res) { diff --git a/test/corked_emitter.test.js b/test/corked_emitter.test.js new file mode 100644 index 0000000..5edecd0 --- /dev/null +++ b/test/corked_emitter.test.js @@ -0,0 +1,104 @@ +'use strict' + +const { test } = require('tap') +const CorkedEmitter = require('../lib/corked_emitter') + +function gatherEventSequence (expectedNumber) { + const gatheredEvents = [] + let callback + const finished = new Promise(function (resolve) { + callback = function (...args) { + gatheredEvents.push(...args) + if (gatheredEvents.length >= expectedNumber) { + // Prevent result mutation after our promise is resolved: + resolve(gatheredEvents.slice()) + } + } + }) + return { + finished, + callback + } +} + +test('normal emit flow', function (t) { + const emitter = new CorkedEmitter() + const expectedSequence = [ + ['searchEntry', { data: 'a' }], + ['searchEntry', { data: 'b' }], + ['end'] + ] + const gatherer = gatherEventSequence(3) + emitter.on('searchEntry', function (...args) { + gatherer.callback(['searchEntry', ...args]) + }) + emitter.on('end', function (...args) { + gatherer.callback(['end', ...args]) + }) + emitter.emit('searchEntry', { data: 'a' }) + emitter.emit('searchEntry', { data: 'b' }) + emitter.emit('end') + gatherer.finished.then(function (gatheredEvents) { + expectedSequence.forEach(function (expectedEvent, i) { + t.equal(JSON.stringify(expectedEvent), JSON.stringify(gatheredEvents[i])) + }) + t.end() + }) +}) + +test('reversed listener registration', function (t) { + const emitter = new CorkedEmitter() + const expectedSequence = [ + ['searchEntry', { data: 'a' }], + ['searchEntry', { data: 'b' }], + ['end'] + ] + const gatherer = gatherEventSequence(3) + // This time, we swap the event listener registrations. + // The order of emits should remain unchanged. + emitter.on('end', function (...args) { + gatherer.callback(['end', ...args]) + }) + emitter.on('searchEntry', function (...args) { + gatherer.callback(['searchEntry', ...args]) + }) + emitter.emit('searchEntry', { data: 'a' }) + emitter.emit('searchEntry', { data: 'b' }) + emitter.emit('end') + gatherer.finished.then(function (gatheredEvents) { + expectedSequence.forEach(function (expectedEvent, i) { + t.equal(JSON.stringify(expectedEvent), JSON.stringify(gatheredEvents[i])) + }) + t.end() + }) +}) + +test('delayed listener registration', function (t) { + const emitter = new CorkedEmitter() + const expectedSequence = [ + ['searchEntry', { data: 'a' }], + ['searchEntry', { data: 'b' }], + ['end'] + ] + const gatherer = gatherEventSequence(3) + emitter.emit('searchEntry', { data: 'a' }) + emitter.emit('searchEntry', { data: 'b' }) + emitter.emit('end') + // The listeners only appear after a brief delay - this simulates + // the situation described in https://github.com/ldapjs/node-ldapjs/issues/602 + // and in https://github.com/ifroz/node-ldapjs/commit/5239f6c68827f2c25b4589089c199d15bb882412 + setTimeout(function () { + emitter.on('end', function (...args) { + gatherer.callback(['end', ...args]) + }) + emitter.on('searchEntry', function (...args) { + gatherer.callback(['searchEntry', ...args]) + }) + }, 50) + gatherer.finished.then(function (gatheredEvents) { + expectedSequence.forEach(function (expectedEvent, i) { + t.equal(JSON.stringify(expectedEvent), JSON.stringify(gatheredEvents[i])) + }) + t.end() + }) +})