Major overhaul of client connection logic

- Remove PooledClient
- Add reconnect functionality to client
- Add 'idle' client event and options
This commit is contained in:
Patrick Mooney 2014-06-19 14:24:05 -05:00
parent f3e376d40b
commit 72bfb9b0f7
6 changed files with 543 additions and 503 deletions

View File

@ -30,32 +30,14 @@ client is:
||log|| You can optionally pass in a bunyan instance the client will use to acquire a logger. The client logs all messages at the `trace` level.|| ||log|| You can optionally pass in a bunyan instance the client will use to acquire a logger. The client logs all messages at the `trace` level.||
||timeout||How long the client should let operations live for before timing out. Default is Infinity.|| ||timeout||How long the client should let operations live for before timing out. Default is Infinity.||
||connectTimeout||How long the client should wait before timing out on TCP connections. Default is up to the OS.|| ||connectTimeout||How long the client should wait before timing out on TCP connections. Default is up to the OS.||
||maxConnections||Whether or not to enable connection pooling, and if so, how many to maintain.||
||tlsOptions||Additional [options](http://nodejs.org/api/tls.html#tls_tls_connect_port_host_options_callback) passed to the TLS connection layer when connecting via `ldaps://`|| ||tlsOptions||Additional [options](http://nodejs.org/api/tls.html#tls_tls_connect_port_host_options_callback) passed to the TLS connection layer when connecting via `ldaps://`||
If using connection pooling, you can additionally pass in:
||bindDN||The DN all connections should be bound as.||
||bindCredentials||The credentials to use with bindDN.||
||checkInterval||How often to schedule health checks.||
||maxIdleTime||How long a client can sit idle before initiating a health check (subject to the frequency set by checkInterval).||
## Connection management ## Connection management
As LDAP is a stateful protocol (as opposed to HTTP), having connections torn As LDAP is a stateful protocol (as opposed to HTTP), having connections torn
down from underneath you is difficult to deal with. That said, the "raw" down from underneath you is difficult to deal with.
client, which is what you get when maxConnections is either unset or <= 1, does
not do anything for you here; you can handle that however you want.
More commonly, you probably want to use connection pooling, which performs
health checks, and while you will see occasional errors from a client, those
will be highly transient, as the pooling logic will purge them and create new
ones for you.
It is highly recommended you just provide bindCredentials initially, as all
clients used will be authenticated, but you can call `bind` at any given time.
This is expensive though, as the pool must first drain, be destroyed, and then
recreated. So try not to do that.
## Common patterns ## Common patterns
@ -76,9 +58,6 @@ The bind API only allows LDAP 'simple' binds (equivalent to HTTP Basic
Authentication) for now. Note that all client APIs can optionally take an array Authentication) for now. Note that all client APIs can optionally take an array
of `Control` objects. You probably don't need them though... of `Control` objects. You probably don't need them though...
If you have more than 1 connection in the connection pool, you will be called
back after *all* of the connections are bound, not just the first one.
Example: Example:
client.bind('cn=root', 'secret', function(err) { client.bind('cn=root', 'secret', function(err) {
@ -288,9 +267,6 @@ find almost anything you're looking for.
Performs an unbind operation against the LDAP server. Performs an unbind operation against the LDAP server.
The unbind operation takes no parameters other than a callback, and will unbind
(and disconnect) *all* of the connections in the pool.
Example: Example:
client.unbind(function(err) { client.unbind(function(err) {

View File

@ -4,6 +4,9 @@ var EventEmitter = require('events').EventEmitter;
var net = require('net'); var net = require('net');
var tls = require('tls'); var tls = require('tls');
var util = require('util'); var util = require('util');
var once = require('once');
var backoff = require('backoff');
var vasync = require('vasync');
var assert = require('assert-plus'); var assert = require('assert-plus');
@ -77,139 +80,65 @@ function validateControls(controls) {
return controls; return controls;
} }
/**
function setupSocket(socket, opts) { * Queue to contain LDAP requests.
var log = opts.log; *
* @param {Object} opts queue options
socket.ldap = { *
id: opts.url ? opts.url.href : opts.socketPath, * Accepted Options:
messageID: 0, * - size: Maximum queue size
messages: {}, * - timeout: Set timeout between first queue insertion and queue flush.
getNextMessageID: function getNextMessageID() { */
if (++socket.ldap.messageID >= MAX_MSGID) function RequestQueue(opts) {
socket.ldap.messageID = 1; if (!opts || typeof (opts) !== 'object') {
opts = {};
return socket.ldap.messageID;
},
parser: new Parser({
log: log
})
};
// This won't be set on TLS. So. Very. Annoying.
if (typeof (socket.setKeepAlive) !== 'function') {
socket.setKeepAlive = function setKeepAlive(enable, delay) {
return socket.socket ? socket.socket.setKeepAlive(enable, delay) : false;
};
} }
this.size = (opts.size > 0) ? opts.size : Infinity;
// Since tls.socket doesn't emit 'close' events, we must register to receive this.timeout = (opts.timeout > 0) ? opts.timeout : 0;
// them on net.socket instead this._queue = [];
var closeSocket = (opts.secure ? socket.socket : socket); this._timer = null;
// On close we have to walk the outstanding messages and go invoke their
// callback with an error.
closeSocket.on('close', function onClose(had_err) {
socket.removeAllListeners('connect');
closeSocket.removeAllListeners('close');
socket.removeAllListeners('data');
socket.removeAllListeners('drain');
socket.removeAllListeners('end');
socket.removeAllListeners('error');
socket.removeAllListeners('timeout');
if (log.trace())
log.trace('close event had_err=%s', had_err ? 'yes' : 'no');
opts.emit('close', had_err);
Object.keys(socket.ldap.messages).forEach(function (msgid) {
var err;
if (socket.unbindMessageID !== parseInt(msgid, 10)) {
err = new ConnectionError(socket.ldap.id + ' closed');
} else {
err = new UnbindResponse({
messageID: msgid
});
err.status = 'unbind';
}
if (typeof (socket.ldap.messages[msgid]) === 'function') {
var callback = socket.ldap.messages[msgid];
delete socket.ldap.messages[msgid];
return callback(err);
} else if (socket.ldap.messages[msgid]) {
if (err instanceof Error)
socket.ldap.messages[msgid].emit('error', err);
delete socket.ldap.messages[msgid];
}
delete socket.ldap.parser;
delete socket.ldap;
return false;
});
});
socket.on('data', function onData(data) {
if (log.trace())
log.trace('data event: %s', util.inspect(data));
socket.ldap.parser.write(data);
});
socket.on('end', function onEnd() {
if (log.trace())
log.trace('end event');
opts.emit('end');
socket.end();
});
socket.on('error', function onError(err) {
if (log.trace())
log.trace({err: err}, 'error event: %s', new Error().stack);
if (opts.connectTimer) {
clearTimeout(opts.connectTimer);
opts.connectTimer = false;
}
if (opts.listeners('error').length)
opts.emit('error', err);
socket.end();
});
socket.on('timeout', function onTimeout() {
if (log.trace())
log.trace('timeout event');
opts.emit('socketTimeout');
socket.end();
});
// The "router"
socket.ldap.parser.on('message', function onMessage(message) {
message.connection = socket;
var callback = socket.ldap.messages[message.messageID];
if (!callback) {
log.error({message: message.json}, 'unsolicited message');
return false;
}
return callback(message);
});
socket.ldap.parser.on('error', function onParseError(err) {
log.trace({err: err}, 'parser error event');
if (opts.listeners('error').length)
opts.emit('error', err);
socket.end();
});
} }
/**
* Insert request into queue.
*
*/
RequestQueue.prototype.queue = function queue(message, expect, emitter, cb) {
if (this._queue.length >= this.maxLength) {
return false;
}
this._queue.push([message, expect, emitter, cb]);
if (this.maxAge > 0) {
if (this._timer !== null) {
this._timer = setTimeout(this.purge.bind(this), this.maxAge);
}
}
return true;
};
/**
* Process all queued requests with callback.
*/
RequestQueue.prototype.flush = function flush(cb) {
if (this._timer) {
clearTimeout(this._timer);
this._timer = null;
}
var items = this._queue;
this._queue = [];
items.forEach(function (req) {
cb(req[0], req[1], req[2], req[3]);
});
};
/**
* Purge all queued requests with an error.
*/
RequestQueue.prototype.purge = function purge() {
this.flush(function (msg, expect, emitter, cb) {
cb(new Error('RequestQueue timeout'));
});
};
///--- API ///--- API
@ -233,18 +162,37 @@ function Client(options) {
var _url; var _url;
if (options.url) if (options.url)
_url = url.parse(options.url); _url = url.parse(options.url);
this.connectTimeout = parseInt((options.connectTimeout || 0), 10);
this.host = _url ? _url.hostname : undefined; this.host = _url ? _url.hostname : undefined;
this.log = options.log.child({clazz: 'Client'}, true);
this.port = _url ? _url.port : false; this.port = _url ? _url.port : false;
this.secure = _url ? _url.secure : false; this.secure = _url ? _url.secure : false;
this.url = _url;
this.tlsOptions = options.tlsOptions; this.tlsOptions = options.tlsOptions;
this.socketPath = options.socketPath || false; this.socketPath = options.socketPath || false;
this.timeout = parseInt((options.timeout || 0), 10);
this.url = _url;
this.socket = this._connect(); this.log = options.log.child({clazz: 'Client'}, true);
this.timeout = parseInt((options.timeout || 0), 10);
this.connectTimeout = parseInt((options.connectTimeout || 0), 10);
this.idleTimeout = parseInt((options.idleTimeout || 0), 10);
if (options.reconnect) {
this.reconnect = {
initialDelay: parseInt(options.reconnect.initialDelay || 100, 10),
maxDelay: parseInt(options.reconnect.maxDelay || 10000, 10),
failAfter: parseInt(options.reconnect.failAfter || 0, 10)
};
}
this.queuing = (options.queuing !== undefined) ? options.queuing : true;
if (this.queuing) {
this._queue = new RequestQueue({
size: parseInt((options.queueSize || 0), 10),
timeout: parseInt((options.queueTimeout || 0), 10)
});
}
this.socket = null;
this.connected = false;
this._connect();
} }
util.inherits(Client, EventEmitter); util.inherits(Client, EventEmitter);
module.exports = Client; module.exports = Client;
@ -345,7 +293,11 @@ Client.prototype.add = function add(name, entry, controls, callback) {
* @param {Function} callback of the form f(err, res). * @param {Function} callback of the form f(err, res).
* @throws {TypeError} on invalid input. * @throws {TypeError} on invalid input.
*/ */
Client.prototype.bind = function bind(name, credentials, controls, callback) { Client.prototype.bind = function bind(name,
credentials,
controls,
callback,
_bypass) {
if (typeof (name) !== 'string' && !(name instanceof dn.DN)) if (typeof (name) !== 'string' && !(name instanceof dn.DN))
throw new TypeError('name (string) required'); throw new TypeError('name (string) required');
assert.string(credentials, 'credentials'); assert.string(credentials, 'credentials');
@ -364,7 +316,7 @@ Client.prototype.bind = function bind(name, credentials, controls, callback) {
controls: controls controls: controls
}); });
return this._send(req, [errors.LDAP_SUCCESS], null, callback); return this._send(req, [errors.LDAP_SUCCESS], null, callback, _bypass);
}; };
@ -619,7 +571,11 @@ Client.prototype.modifyDN = function modifyDN(name,
* @param {Function} callback of the form f(err, res). * @param {Function} callback of the form f(err, res).
* @throws {TypeError} on invalid input. * @throws {TypeError} on invalid input.
*/ */
Client.prototype.search = function search(base, options, controls, callback) { Client.prototype.search = function search(base,
options,
controls,
callback,
_bypass) {
if (typeof (base) !== 'string' && !(base instanceof dn.DN)) if (typeof (base) !== 'string' && !(base instanceof dn.DN))
throw new TypeError('base (string) required'); throw new TypeError('base (string) required');
if (Array.isArray(options) || (options instanceof Control)) { if (Array.isArray(options) || (options instanceof Control)) {
@ -675,7 +631,11 @@ Client.prototype.search = function search(base, options, controls, callback) {
controls: controls controls: controls
}); });
return this._send(req, [errors.LDAP_SUCCESS], new EventEmitter(), callback); return this._send(req,
[errors.LDAP_SUCCESS],
new EventEmitter(),
callback,
_bypass);
}; };
@ -706,70 +666,407 @@ Client.prototype.unbind = function unbind(callback) {
}; };
/**
* Disconnect from the LDAP server and do not allow reconnection.
*
* If the client is instantiated with proper reconnection options, it's
* possible to initiate new requests after a call to unbind since the client
* will attempt to reconnect in order to fulfill the request.
*
* Calling destroy will prevent any further reconnections from occuring.
*/
Client.prototype.destroy = function destroy() {
this.destroyed = true;
if (this._queue) {
// Purge any queued requests which are now meaningless
this._queue.flush(function (msg, expect, emitter, cb) {
if (typeof (cb) === 'function') {
cb(new Error('client destroyed'));
}
});
}
this.unbind();
};
///--- Private API ///--- Private API
/**
* Initiate LDAP connection.
*/
Client.prototype._connect = function _connect() { Client.prototype._connect = function _connect() {
var log = this.log; if (this.connecting) {
var proto = this.secure ? tls : net; return;
}
var self = this; var self = this;
var socket = null; var log = this.log;
this.connectTimer = false; var socket;
function onConnect() { // Establish basic socket connection
if (self.connectTimer) function connectSocket(_, cb) {
cb = once(cb);
function onResult(err, res) {
if (err) {
if (self.connectTimer) {
clearTimeout(self.connectTimer); clearTimeout(self.connectTimer);
self.connectTimer = null;
socket.removeListener('connect', onConnect); }
socket.removeListener('secureConnect', onConnect); self.emit('connectError', err);
assert.ok(socket.ldap); }
cb(err, res);
}
function onConnect() {
if (self.connectTimer) {
clearTimeout(self.connectTimer);
self.connectTimer = null;
}
socket.removeAllListeners('error')
.removeAllListeners('connect')
.removeAllListeners('secureConnect');
socket.ldap.id = nextClientId() + '__' + socket.ldap.id; socket.ldap.id = nextClientId() + '__' + socket.ldap.id;
self.log = self.log.child({ldap_id: socket.ldap.id}, true); self.log = self.log.child({ldap_id: socket.ldap.id}, true);
log.trace('connect event'); // Move on to client setup
setupClient(cb);
self.socket = socket;
self.emit('connect', socket);
} }
socket = proto.connect((this.port || this.socketPath), var port = (self.port || self.socketPath);
this.host, if (self.secure) {
this.secure ? this.tlsOptions : null); socket = tls.connect(port, self.host, self.tlsOptions);
socket.once('connect', onConnect);
socket.once('secureConnect', onConnect); socket.once('secureConnect', onConnect);
setupSocket(socket, this); } else {
socket = net.connect(port, self.host);
socket.once('connect', onConnect);
}
socket.once('error', onResult);
initSocket();
if (this.connectTimeout) { // Setup connection timeout handling, if desired
this.connectTimer = setTimeout(function onConnectTimeout() { if (self.connectTimeout) {
self.connectTimer = setTimeout(function onConnectTimeout() {
if (!socket || !socket.readable || !socket.writeable) { if (!socket || !socket.readable || !socket.writeable) {
socket.destroy(); socket.destroy();
self.socket = null;
onResult(new ConnectionError('connection timeout'));
}
}, self.connectTimeout);
}
}
// Initialize socket events and LDAP parser.
function initSocket() {
socket.ldap = {
id: self.url ? self.url.href : self.socketPath,
messageID: 0,
messages: {},
getNextMessageID: function getNextMessageID() {
if (++socket.ldap.messageID >= MAX_MSGID)
socket.ldap.messageID = 1;
return socket.ldap.messageID;
},
parser: new Parser({
log: log
})
};
// This won't be set on TLS. So. Very. Annoying.
if (typeof (socket.setKeepAlive) !== 'function') {
socket.setKeepAlive = function setKeepAlive(enable, delay) {
return socket.socket ?
socket.socket.setKeepAlive(enable, delay) : false;
};
}
socket.on('data', function onData(data) {
if (log.trace())
log.trace('data event: %s', util.inspect(data));
socket.ldap.parser.write(data);
});
// The "router"
socket.ldap.parser.on('message', function onMessage(message) {
message.connection = socket;
var callback = socket.ldap.messages[message.messageID];
if (!callback) {
log.error({message: message.json}, 'unsolicited message');
return false;
}
return callback(message);
});
socket.ldap.parser.on('error', function onParseError(err) {
log.trace({err: err}, 'parser error event');
self.emit('error', err);
self.connected = false;
socket.end();
});
}
// After connect, register socket event handlers and run any setup actions
function setupClient(cb) {
cb = once(cb);
// Indicate failure if anything goes awry during setup
function bail(err) {
socket.destroy();
cb(err || new Error('client error during setup'));
}
// Work around lack of close event on tls.socket
((self.secure) ? socket.socket : socket).once('close', bail);
socket.once('error', bail);
socket.once('end', bail);
socket.once('timeout', bail);
self.socket = socket;
// Run any requested setup (such as automatically performing a bind) on
// socket before signalling successful connection.
// This setup needs to bypass the request queue since all other activity is
// blocked until the connection is considered fully established post-setup.
// Only allow bind/search for now.
var basicClient = {
bind: function bindBypass(name, credentials, controls, callback) {
return self.bind(name, credentials, controls, callback, true);
},
search: function searchBypass(base, options, controls, callback) {
return self.search(base, options, controls, callback, true);
},
unbind: self.unbind.bind(self)
};
var setupSteps = self.listeners('setup');
if (setupSteps.length) {
vasync.forEachPipeline({
func: function (f, callback) {
f(basicClient, callback);
},
inputs: setupSteps
}, function (err, result) {
cb(err, socket);
});
} else {
cb(null, socket);
}
}
// Wire up "official" event handlers after successful connect/setup
function postSetup() {
socket.removeAllListeners('error')
.removeAllListeners('close')
.removeAllListeners('end')
.removeAllListeners('timeout');
// Work around lack of close event on tls.socket
((self.secure) ? socket.socket : socket).once('close',
self._onClose.bind(self));
socket.on('end', function onEnd() {
if (log.trace())
log.trace('end event');
self.emit('end');
socket.end();
});
socket.on('error', function onSocketError(err) {
if (log.trace())
log.trace({err: err}, 'error event: %s', new Error().stack);
self.emit('error', err);
socket.destroy();
});
socket.on('timeout', function onTimeout() {
if (log.trace())
log.trace('timeout event');
self.emit('socketTimeout');
socket.end();
});
}
var retry = backoff.call(connectSocket, {}, function (err, res) {
self.connecting = false;
if (!err) {
postSetup();
self.connected = true;
self.emit('connect', socket);
self.log.debug('connected after %d attempts', retry.getNumRetries());
// Flush any queued requests
self._flushQueue();
} else {
self.log.debug('failed to connect after %d attempts',
retry.getNumRetries());
// Communicate the last-encountered error
if (err instanceof ConnectionError) {
self.emit('connectTimeout'); self.emit('connectTimeout');
} else {
self.emit('error', err);
} }
}, this.connectTimeout);
} }
});
return socket; if (this.reconnect) {
retry.setStrategy(new backoff.ExponentialStrategy({
initialDelay: this.reconnect.minDelay,
maxDelay: this.reconnect.maxDelay
}));
retry.failAfter(this.reconnect.failAfter || Infinity);
} else {
// Only attempt the connection once for non-reconnection clients
retry.failAfter(1);
}
this.connecting = true;
retry.start();
}; };
/**
* Flush queued requests out to the socket.
*/
Client.prototype._flushQueue = function _flushQueue() {
if (this._queue) {
// Pull items we're about to process out of the queue.
this._queue.flush(this._send.bind(this));
}
};
Client.prototype._send = function _send(message, expect, emitter, callback) { /**
* Clean up socket/parser resources after socket close.
*/
Client.prototype._onClose = function _onClose(had_err) {
var socket = this.socket;
socket.removeAllListeners('connect')
.removeAllListeners('data')
.removeAllListeners('drain')
.removeAllListeners('end')
.removeAllListeners('error')
.removeAllListeners('timeout');
this.socket = null;
this.connected = false;
if (!this.secure) {
socket.removeAllListeners('close');
} else {
socket.socket.removeAllListeners('close');
}
if (this.log.trace())
this.log.trace('close event had_err=%s', had_err ? 'yes' : 'no');
this.emit('close', had_err);
// On close we have to walk the outstanding messages and go invoke their
// callback with an error.
Object.keys(socket.ldap.messages).forEach(function (msgid) {
var err;
if (socket.unbindMessageID !== parseInt(msgid, 10)) {
err = new ConnectionError(socket.ldap.id + ' closed');
} else {
// Unbinds will be communicated as a success since we're closed
err = new UnbindResponse({
messageID: msgid
});
err.status = 'unbind';
}
if (typeof (socket.ldap.messages[msgid]) === 'function') {
var callback = socket.ldap.messages[msgid];
delete socket.ldap.messages[msgid];
return callback(err);
} else if (socket.ldap.messages[msgid]) {
if (err instanceof Error)
socket.ldap.messages[msgid].emit('error', err);
delete socket.ldap.messages[msgid];
}
return false;
});
delete socket.ldap.parser;
delete socket.ldap;
if (had_err && this.reconnect) {
this._connect();
}
return false;
};
/**
* Maintain idle timer for client.
*
* Will start timer to fire 'idle' event if conditions are satisfied. If
* conditions are not met and a timer is running, it will be cleared.
*
* @param {Boolean} override explicitly disable timer.
*/
Client.prototype._updateIdle = function _updateIdle(override) {
if (this.idleTimeout === 0) {
return;
}
// Client must be connected but not waiting on any request data
var self = this;
function isIdle(disable) {
// FIXME: this breaks with abandons
return ((disable !== true) &&
(self.socket && self.connected) &&
(Object.keys(self.socket.ldap.messages).length === 0));
}
if (isIdle(override)) {
if (!this._idleTimer) {
this._idleTimer = setTimeout(function () {
// Double-check idleness in case socket was torn down
if (isIdle()) {
self.emit('idle');
}
}, this.idleTimeout);
}
} else {
if (this._idleTimer) {
clearTimeout(this._idleTimer);
this._idleTimer = null;
}
}
};
/**
* Attempt to send an LDAP request.
*/
Client.prototype._send = function _send(message,
expect,
emitter,
callback,
_bypass) {
assert.ok(message); assert.ok(message);
assert.ok(expect); assert.ok(expect);
assert.ok(typeof (emitter) !== undefined); assert.ok(typeof (emitter) !== undefined);
assert.ok(callback); assert.ok(callback);
// Allow connect setup traffic to bypass checks
if (_bypass && this.socket && this.socket.writable) {
return this._sendSocket(message, expect, emitter, callback);
}
if (!this.socket || !this.connected) {
if (this._queue) {
if (!this._queue.queue(message, expect, emitter, callback)) {
callback(new Error('request queue full'));
}
// Initiate reconnect if needed
return this._connect();
}
return callback(new ConnectionError('no socket'));
} else {
this._flushQueue();
return this._sendSocket(message, expect, emitter, callback);
}
};
Client.prototype._sendSocket = function _sendSocket(message,
expect,
emitter,
callback) {
var conn = this.socket; var conn = this.socket;
var log = this.log; var log = this.log;
var self = this; var self = this;
var timer = false; var timer = false;
var sentEmitter = false; var sentEmitter = false;
if (!conn)
return callback(new ConnectionError('no socket'));
function _done(event, obj) { function _done(event, obj) {
if (emitter) { if (emitter) {
if (event === 'error') { if (event === 'error') {
@ -861,6 +1158,8 @@ Client.prototype._send = function _send(message, expect, emitter, callback) {
return undefined; return undefined;
} else { } else {
delete conn.ldap.messages[message.messageID]; delete conn.ldap.messages[message.messageID];
// Potentially mark client as idle
self._updateIdle();
if (msg instanceof LDAPResult) { if (msg instanceof LDAPResult) {
if (expect.indexOf(msg.status) === -1) if (expect.indexOf(msg.status) === -1)
@ -890,6 +1189,8 @@ Client.prototype._send = function _send(message, expect, emitter, callback) {
return callback(null); return callback(null);
} else if (expect === 'unbind') { } else if (expect === 'unbind') {
conn.unbindMessageID = message.id; conn.unbindMessageID = message.id;
// Mark client as disconnected once unbind clears the socket
self.connected = false;
conn.end(); conn.end();
} else if (emitter) { } else if (emitter) {
sentEmitter = true; sentEmitter = true;
@ -901,6 +1202,8 @@ Client.prototype._send = function _send(message, expect, emitter, callback) {
// Start actually doing something... // Start actually doing something...
message.messageID = conn.ldap.getNextMessageID(); message.messageID = conn.ldap.getNextMessageID();
conn.ldap.messages[message.messageID] = messageCallback; conn.ldap.messages[message.messageID] = messageCallback;
// Mark client as active
this._updateIdle(true);
if (self.timeout) { if (self.timeout) {
log.trace('Setting timeout to %d', self.timeout); log.trace('Setting timeout to %d', self.timeout);

View File

@ -5,8 +5,6 @@ var assert = require('assert');
var Logger = require('bunyan'); var Logger = require('bunyan');
var Client = require('./client'); var Client = require('./client');
var ClientPool = require('./pool');
///--- Globals ///--- Globals
@ -19,7 +17,6 @@ var DEF_LOG = new Logger({
}); });
///--- Functions ///--- Functions
function xor() { function xor() {
@ -39,7 +36,6 @@ function xor() {
///--- Exports ///--- Exports
module.exports = { module.exports = {
createClient: function createClient(options) { createClient: function createClient(options) {
if (typeof (options) !== 'object') if (typeof (options) !== 'object')
throw new TypeError('options (object) required'); throw new TypeError('options (object) required');
@ -54,10 +50,6 @@ module.exports = {
if (typeof (options.log) !== 'object') if (typeof (options.log) !== 'object')
throw new TypeError('options.log must be an object'); throw new TypeError('options.log must be an object');
if (options.maxConnections > 1)
return new ClientPool(options);
return new Client(options); return new Client(options);
} }
}; };

View File

@ -1,278 +0,0 @@
// Copyright 2012 Mark Cavage, Inc. All rights reserved.
var assert = require('assert');
var EventEmitter = require('events').EventEmitter;
var util = require('util');
var pooling = require('pooling');
var ConnectionError = require('../errors').ConnectionError;
var BindResponse = require('../messages').BindResponse;
var Client = require('./client');
///--- Globals
var STD_OPS = [
'add',
'del',
'modify',
'modifyDN'
];
var RETURN_VAL_OPS = [
'compare',
'exop'
];
///--- Internal Functions
function createPool(options, clientpool) {
assert.ok(options);
return pooling.createPool({
checkInterval: options.checkInterval,
log: options.log,
name: 'ldapjs_' + (options.url || options.socketPath).replace(/[:/]/g, '_'),
max: options.maxConnections,
maxIdleTime: options.maxIdleTime,
create: function createClient(callback) {
var client = new Client(options);
client.on('error', function (err) {
client.removeAllListeners('connect');
client.removeAllListeners('connectTimeout');
if (clientpool.listeners('error').length) {
clientpool.emit('error', err);
}
return callback(err);
});
client.on('connectTimeout', function () {
client.removeAllListeners('connect');
clientpool.emit('connectTimeout');
});
client.once('connect', function onConnect() {
client.removeAllListeners('error');
if (!options.bindDN || !options.bindCredentials)
return callback(null, client);
function bindCallback(err, res) {
if (err)
return callback(err, null);
return callback(null, client);
}
return client.bind(options.bindDN,
options.bindCredentials,
options.bindControls || [],
bindCallback);
});
},
check: function check(client, callback) {
// just do a root dse search
client.search('', '(objectclass=*)', function (err, res) {
if (err)
return callback(err);
res.on('error', function (e) {
res.removeAllListeners('end');
callback(e);
});
res.on('end', function () {
res.removeAllListeners('error');
callback(null);
});
return undefined;
});
},
destroy: function destroy(client) {
client.unbind(function () {});
}
});
}
///--- API
function ClientPool(options) {
assert.ok(options);
EventEmitter.call(this, options);
this.log = options.log.child({clazz: 'ClientPool'}, true);
this.options = {
bindDN: options.bindDN,
bindCredentials: options.bindCredentials,
bindControls: options.bindControls || [],
checkInterval: options.checkInterval,
connectTimeout: (options.connectTimeout || 0),
maxIdleTime: options.maxIdleTime,
maxConnections: options.maxConnections,
log: options.log,
socketPath: options.socketPath,
timeout: (options.timeout || 0),
url: options.url,
tlsOptions: options.tlsOptions
};
this.pool = createPool(options, this);
}
util.inherits(ClientPool, EventEmitter);
module.exports = ClientPool;
STD_OPS.forEach(function (op) {
ClientPool.prototype[op] = function clientProxy() {
var args = Array.prototype.slice.call(arguments);
var cb = args.pop();
if (typeof (cb) !== 'function')
throw new TypeError('callback (Function) required');
var self = this;
return this.pool.acquire(function onAcquire(err, client) {
if (err)
return cb(err);
args.push(function proxyCallback(err, res) {
self.pool.release(client);
return cb(err, res);
});
try {
return Client.prototype[op].apply(client, args);
} catch (e) {
self.pool.release(client);
return cb(e);
}
});
};
});
RETURN_VAL_OPS.forEach(function (op) {
ClientPool.prototype[op] = function clientProxy() {
var args = Array.prototype.slice.call(arguments);
var cb = args.pop();
if (typeof (cb) !== 'function')
throw new TypeError('callback (Function) required');
var self = this;
return this.pool.acquire(function onAcquire(poolErr, client) {
if (poolErr)
return cb(poolErr);
args.push(function proxyCallback(err, val, res) {
self.pool.release(client);
return cb(err, val, res);
});
try {
return Client.prototype[op].apply(client, args);
} catch (e) {
self.pool.release(client);
return cb(e);
}
});
};
});
ClientPool.prototype.search = function search(base, opts, controls, callback) {
if (typeof (controls) === 'function') {
callback = controls;
controls = [];
}
var self = this;
return this.pool.acquire(function onAcquire(err, client) {
if (err)
return callback(err);
// This is largely in existence for search requests
client.timeout = self.timeout || client.timeout;
return client.search(base, opts, controls, function (err, res) {
function cleanup() {
self.pool.release(client);
}
if (err) {
cleanup();
return callback(err, res);
}
res.on('error', cleanup);
res.on('end', cleanup);
return callback(null, res);
});
});
};
ClientPool.prototype.abandon = function abandon(msgid, controls, callback) {
if (typeof (controls) === 'function') {
callback = controls;
controls = [];
}
this.log.error({
messageID: msgid
}, 'Abandon is not supported with connection pooling. Ignoring.');
return callback(null);
};
ClientPool.prototype.bind = function bind(dn, creds, controls, callback) {
if (typeof (controls) === 'function') {
callback = controls;
controls = [];
}
var self = this;
self.options.bindDN = null;
self.options.bindCredentials = null;
self.options.bindControls = null;
return this.pool.shutdown(function () {
self.pool = createPool(self.options, self);
return self.pool.acquire(function onAcquire(err, client) {
if (err)
return callback(err);
return client.bind(dn, creds, controls, function (err, res) {
self.pool.release(client);
if (err)
return callback(err, res);
self.options.bindDN = dn;
self.options.bindCredentials = creds;
self.options.bindControls = controls;
return callback(null, res);
});
});
});
};
ClientPool.prototype.unbind = function unbind(callback) {
return this.pool.shutdown(callback);
};

View File

@ -31,15 +31,16 @@
"assert-plus": "0.1.5", "assert-plus": "0.1.5",
"bunyan": "0.22.1", "bunyan": "0.22.1",
"dashdash": "1.6.0", "dashdash": "1.6.0",
"pooling": "0.4.6" "backoff": "2.4.0",
"once": "1.3.0",
"vasync": "1.5.0"
}, },
"optionalDependencies": { "optionalDependencies": {
"dtrace-provider": "0.2.8" "dtrace-provider": "0.2.8"
}, },
"devDependencies": { "devDependencies": {
"tap": "0.4.1", "tap": "0.4.1",
"node-uuid": "1.4.0", "node-uuid": "1.4.0"
"vasync": "1.4.3"
}, },
"scripts": { "scripts": {
"test": "./node_modules/.bin/tap ./test" "test": "./node_modules/.bin/tap ./test"

View File

@ -77,6 +77,21 @@ test('setup', function (t) {
return next(); return next();
}); });
server.search('dc=slow', function (req, res, next) {
res.send({
dn: 'dc=slow',
attributes: {
'you': 'wish',
'this': 'was',
'faster': '.'
}
});
setTimeout(function () {
res.end();
next();
}, 250);
});
server.search('dc=timeout', function (req, res, next) { server.search('dc=timeout', function (req, res, next) {
// Haha client! // Haha client!
}); });
@ -132,7 +147,6 @@ test('setup', function (t) {
client = ldap.createClient({ client = ldap.createClient({
connectTimeout: parseInt(process.env.LDAP_CONNECT_TIMEOUT || 0, 10), connectTimeout: parseInt(process.env.LDAP_CONNECT_TIMEOUT || 0, 10),
socketPath: SOCKET, socketPath: SOCKET,
maxConnections: parseInt(process.env.LDAP_MAX_CONNS || 5, 10),
idleTimeoutMillis: 10, idleTimeoutMillis: 10,
log: new Logger({ log: new Logger({
name: 'ldapjs_unit_test', name: 'ldapjs_unit_test',
@ -572,6 +586,38 @@ test('GH-24 attribute selection of *', function (t) {
}); });
test('idle timeout', function (t) {
// FIXME: this must be run before abandon
client.idleTimeout = 250;
function premature() {
t.ifError(true);
}
client.on('idle', premature);
client.search('dc=slow', 'objectclass=*', function (err, res) {
t.ifError(err);
res.on('searchEntry', function (res) {
t.ok(res);
});
res.on('error', function (err) {
t.ifError(err);
});
res.on('end', function () {
var late = setTimeout(function () {
t.ifError(false, 'too late');
}, 500);
// It's ok to go idle now
client.removeListener('idle', premature);
client.on('idle', function () {
clearTimeout(late);
client.removeAllListeners('idle');
client.idleTimeout = 0;
t.end();
});
});
});
});
test('abandon (GH-27)', function (t) { test('abandon (GH-27)', function (t) {
client.abandon(401876543, function (err) { client.abandon(401876543, function (err) {
t.ifError(err); t.ifError(err);