Improve client paged search handling

Search response objects now include more instrumentation and control when
automatically fetching paged results.  See the SearchPager documentation
for more details.

Fix mcavage/node-ldapjs#203
This commit is contained in:
Patrick Mooney 2014-07-24 15:41:24 -05:00
parent 37ce094fc4
commit d665378c0e
4 changed files with 385 additions and 75 deletions

View File

@ -224,15 +224,16 @@ client.bind(parsed.binddn, parsed.password, function (err, res) {
});
controls.push(pCtrl);
}
if (parsed.paged) {
var ctrl = new ldap.PagedResultsControl({ value: { size: parsed.paged } });
controls.push(ctrl);
}
var req = {
scope: parsed.scope || 'sub',
filter: parsed._args[0],
attributes: parsed._args.length > 1 ? parsed._args.slice(1) : []
};
if (parsed.paged) {
req.paged = {
pageSize: parsed.paged
};
}
client.search(parsed.base, req, controls, function (err, res) {
if (err)
perror(err);

View File

@ -13,7 +13,7 @@ var assert = require('assert-plus');
var Attribute = require('../attribute');
var Change = require('../change');
var Control = require('../controls/index').Control;
var PagedResultsControl = require('../controls/index').PagedResultsControl;
var SearchPager = require('./search_pager');
var Protocol = require('../protocol');
var dn = require('../dn');
var errors = require('../errors');
@ -781,23 +781,51 @@ Client.prototype.search = function search(base,
}
}
var req = new SearchRequest({
baseObject: typeof (base) === 'string' ? dn.parse(base) : base,
scope: options.scope || 'base',
filter: options.filter,
derefAliases: options.derefAliases || Protocol.NEVER_DEREF_ALIASES,
sizeLimit: options.sizeLimit || 0,
timeLimit: options.timeLimit || 10,
typesOnly: options.typesOnly || false,
attributes: options.attributes || [],
controls: controls
});
var self = this;
function sendRequest(ctrls, emitter, cb) {
var req = new SearchRequest({
baseObject: typeof (base) === 'string' ? dn.parse(base) : base,
scope: options.scope || 'base',
filter: options.filter,
derefAliases: options.derefAliases || Protocol.NEVER_DEREF_ALIASES,
sizeLimit: options.sizeLimit || 0,
timeLimit: options.timeLimit || 10,
typesOnly: options.typesOnly || false,
attributes: options.attributes || [],
controls: ctrls
});
return this._send(req,
[errors.LDAP_SUCCESS],
new EventEmitter(),
callback,
_bypass);
return self._send(req,
[errors.LDAP_SUCCESS],
emitter,
cb,
_bypass);
}
if (options.paged) {
// Perform automated search paging
var pageOpts = typeof (options.paged) === 'object' ? options.paged : {};
var size = 100; // Default page size
if (pageOpts.pageSize > 0) {
size = pageOpts.pageSize;
} else if (options.sizeLimit > 1) {
// According to the RFC, servers should ignore the paging control if
// pageSize >= sizelimit. Some will still send results, but it's safer
// to stay under that figure when assigning a default value.
size = options.sizeLimit - 1;
}
var pager = new SearchPager({
callback: callback,
controls: controls,
pageSize: size,
pagePause: pageOpts.pagePause
});
pager.on('search', sendRequest);
pager.begin();
} else {
sendRequest(controls, new EventEmitter, callback);
}
};
@ -1260,57 +1288,6 @@ Client.prototype._sendSocket = function _sendSocket(message,
return callback(null, obj);
} // end function _done(event, obj)
function _continuePagedSearch(msg) {
// this function looks for a paged control in the response msg
// and continue searching or not according to RFC 2696:
// http://www.ietf.org/rfc/rfc2696.txt
if (Array.isArray(msg.controls) && msg.controls.length > 0) {
log.trace('message has %d controls', msg.controls.length);
for (var i = 0; i < msg.controls.length; i++) {
var resControl = msg.controls[i];
// check paged control in response
if (resControl instanceof PagedResultsControl) {
log.debug('paged search: end of page');
if (resControl.value.cookie && resControl.value.cookie.length > 0) {
log.trace('paged search: received cookie in response');
if (Array.isArray(message.controls) &&
message.controls.length > 0) {
for (var j = 0; j < message.controls.length; j++) {
var reqControl = message.controls[j];
if (reqControl instanceof PagedResultsControl) {
// update request cookie and re-send
reqControl.value.cookie = resControl.value.cookie;
try {
log.debug('paged search: continuing');
conn.write(message.toBer());
return true;
} catch (e) {
if (timer)
clearTimeout(timer);
log.trace({err: e}, 'Error writing message to socket');
callback(e);
return false;
}
}
}
}
} else {
log.debug('paged search done');
}
}
}
}
// not a paged search or all pages received
return false;
} // end function _continuePagedSearch(msg)
function messageCallback(msg) {
if (timer)
clearTimeout(timer);
@ -1325,9 +1302,6 @@ Client.prototype._sendSocket = function _sendSocket(message,
var event = msg.constructor.name;
event = event[0].toLowerCase() + event.slice(1);
return _done(event, msg);
} else if (_continuePagedSearch(msg)) {
// page search continued, just return for now
return undefined;
} else {
conn.ldap.remove(message.messageID);
// Potentially mark client as idle

172
lib/client/search_pager.js Normal file
View File

@ -0,0 +1,172 @@
// Copyright 2014 Joyent, Inc. All rights reserved.
var EventEmitter = require('events').EventEmitter;
var util = require('util');
var assert = require('assert-plus');
var dn = require('../dn');
var messages = require('../messages/index');
var Protocol = require('../protocol');
var PagedControl = require('../controls/paged_results_control.js');
///--- API
/**
* Handler object for paged search operations.
*
* Provided to consumers in place of the normal search EventEmitter it adds the
* following new events:
* 1. page - Emitted whenever the end of a result page is encountered.
* If this is the last page, 'end' will also be emitted.
* The event passes two arguments:
* 1. The result object (similar to 'end')
* 2. A callback function optionally used to continue the search
* operation if the pagePause option was specified during
* initialization.
* 2. pageError - Emitted if the server does not support paged search results
* If there are no listeners for this event, the 'error' event
* will be emitted (and 'end' will not be). By listening to
* 'pageError', a successful search that lacks paging will be
* able to emit 'end'.
* 3. search - Emitted as an internal event to trigger another client search.
*/
function SearchPager(opts) {
assert.object(opts);
assert.func(opts.callback);
assert.number(opts.pageSize);
EventEmitter.call(this, {});
this.callback = opts.callback;
this.controls = opts.controls;
this.pageSize = opts.pageSize;
this.pagePause = opts.pagePause;
this.controls.forEach(function (control) {
if (control.type === PagedControl.OID) {
// The point of using SearchPager is not having to do this.
// Toss an error if the pagedResultsControl is present
throw new Error('redundant pagedResultControl');
}
});
this.finished = false;
this.started = false;
var emitter = new EventEmitter();
emitter.on('searchEntry', this.emit.bind(this, 'searchEntry'));
emitter.on('end', this._onEnd.bind(this));
emitter.on('error', this._onError.bind(this));
this.childEmitter = emitter;
}
util.inherits(SearchPager, EventEmitter);
module.exports = SearchPager;
/**
* Start the paged search.
*/
SearchPager.prototype.begin = function begin() {
// Starting first page
this._nextPage(null);
};
SearchPager.prototype._onEntry = function _onEntry(entry) {
this.emit('searchEntry', entry);
};
SearchPager.prototype._onEnd = function _onEnd(res) {
var self = this;
var cookie = null;
var nullFunc = function () { };
res.controls.forEach(function (control) {
if (control.type === PagedControl.OID) {
cookie = control.value.cookie;
}
});
if (cookie === null) {
// paged search not supported
this.finished = true;
this.emit('page', res, nullFunc);
var err = new Error('missing paged control');
err.name = 'PagedError';
if (this.listeners('pageError') > 0) {
this.emit('pageError', err);
// If the consumer as subscribed to pageError, SearchPager is absolved
// from deliverying the fault via the 'error' event. Emitting an 'end'
// event after 'error' breaks the contract that the standard client
// provides, so it's only a possibility if 'pageError' is used instead.
this.emit('end', res);
} else {
this.emit('error', err);
// No end event possible per explaination above.
}
} else if (cookie.length === 0) {
// end of paged results
this.finished = true;
this.emit('page', nullFunc);
this.emit('end', res);
} else {
if (this.pagePause) {
// Wait to fetch next page until callback is invoked
// Halt page fetching if called with error
this.emit('page', res, function (err) {
if (!err) {
self._nextPage(cookie);
} else {
// the paged search has been canceled so emit an end
self.emit('end', res);
}
});
} else {
this.emit('page', res, nullFunc);
this._nextPage(cookie);
}
}
};
SearchPager.prototype._onError = function _onError(err) {
this.finished = true;
this.emit('error', err);
};
/**
* Initiate a search for the next page using the returned cookie value.
*/
SearchPager.prototype._nextPage = function _nextPage(cookie) {
var controls = this.controls.slice(0);
controls.push(new PagedControl({
value: {
size: this.pageSize,
cookie: cookie
}
}));
this.emit('search', controls, this.childEmitter,
this._sendCallback.bind(this));
};
/**
* Callback provided to the client API for successful transmission.
*/
SearchPager.prototype._sendCallback = function _sendCallback(err, res) {
if (err) {
this.finished = true;
if (!this.started) {
// EmitSend error during the first page, bail via callback
this.callback(err, null);
} else {
this.emit('error', err);
}
} else {
// search successfully send
if (!this.started) {
this.started = true;
// send self as emitter as the client would
this.callback(null, this);
}
}
};

View File

@ -5,6 +5,7 @@ var Logger = require('bunyan');
var test = require('tape').test;
var uuid = require('node-uuid');
var vasync = require('vasync');
var util = require('util');
///--- Globals
@ -135,6 +136,85 @@ test('setup', function (t) {
return next();
});
server.search('cn=sizelimit', function (req, res, next) {
var sizeLimit = 200;
var i;
for (i = 0; i < 1000; i++) {
if (req.sizeLimit > 0 && i >= req.sizeLimit) {
break;
} else if (i > sizeLimit) {
res.end(ldap.LDAP_SIZE_LIMIT_EXCEEDED);
return next();
}
res.send({
dn: util.format('o=%d, cn=sizelimit', i),
attributes: {
o: [i],
objectclass: ['pagedResult']
}
});
}
res.end();
return next();
});
server.search('cn=paged', function (req, res, next) {
var min = 0;
var max = 1000;
function sendResults(start, end) {
start = (start < min) ? min : start;
end = (end > max || end < min) ? max : end;
var i;
for (i = start; i < end; i++) {
res.send({
dn: util.format('o=%d, cn=paged', i),
attributes: {
o: [i],
objectclass: ['pagedResult']
}
});
}
return i;
}
var cookie = null;
var pageSize = 0;
req.controls.forEach(function (control) {
if (control.type === ldap.PagedResultsControl.OID) {
pageSize = control.value.size;
cookie = control.value.cookie;
}
});
if (cookie && Buffer.isBuffer(cookie)) {
// Do simple paging
var first = min;
if (cookie.length !== 0) {
first = parseInt(cookie.toString(), 10);
}
var last = sendResults(first, first + pageSize);
var resultCookie;
if (last < max) {
resultCookie = new Buffer(last.toString());
} else {
resultCookie = new Buffer('');
}
res.controls.push(new ldap.PagedResultsControl({
value: {
size: pageSize, // correctness not required here
cookie: resultCookie
}
}));
res.end();
next();
} else {
// don't allow non-paged searches for this test endpoint
next(new ldap.UnwillingToPerformError());
}
});
server.search('dc=empty', function (req, res, next) {
res.send({
dn: 'dc=empty',
@ -464,6 +544,89 @@ test('search basic', function (t) {
});
test('search sizeLimit', function (t) {
t.test('over limit', function (t2) {
client.search('cn=sizelimit', {}, function (err, res) {
t2.ifError(err);
res.on('error', function (error) {
t2.equal(error.name, 'SizeLimitExceededError');
t2.end();
});
});
});
t.test('under limit', function (t2) {
var limit = 100;
client.search('cn=sizelimit', {sizeLimit: limit}, function (err, res) {
t2.ifError(err);
var count = 0;
res.on('searchEntry', function (entry) {
count++;
});
res.on('end', function () {
t2.pass();
t2.equal(count, limit);
t2.end();
});
res.on('error', t2.ifError.bind(t));
});
});
});
test('search paged', function (t) {
t.test('paged - no pauses', function (t2) {
var countEntries = 0;
var countPages = 0;
client.search('cn=paged', {paged: {pageSize: 100}}, function (err, res) {
t2.ifError(err);
res.on('searchEntry', function () {
countEntries++;
});
res.on('page', function () {
countPages++;
});
res.on('error', t2.ifError.bind(t2));
res.on('end', function () {
t2.equal(countEntries, 1000);
t2.equal(countPages, 10);
t2.end();
});
});
});
t.test('paged - pauses', function (t2) {
var countPages = 0;
client.search('cn=paged', {
paged: {
pageSize: 100,
pagePause: true
}
}, function (err, res) {
t2.ifError(err);
res.on('page', function (result, cb) {
countPages++;
// cancel after 9 to verify callback usage
if (countPages === 9) {
// another page should never be encountered
res.removeAllListeners('page')
.on('page', t2.fail.bind(null, 'unexpected page'));
return cb(new Error());
}
return cb();
});
res.on('error', t2.ifError.bind(t2));
res.on('end', function () {
t2.equal(countPages, 9);
t2.end();
});
});
});
t.end();
});
test('search referral', function (t) {
client.search('cn=ref, ' + SUFFIX, '(objectclass=*)', function (err, res) {
t.ifError(err);