From 72bfb9b0f749832b2310c5f5a6824c458dcda760 Mon Sep 17 00:00:00 2001 From: Patrick Mooney Date: Thu, 19 Jun 2014 14:24:05 -0500 Subject: [PATCH] Major overhaul of client connection logic - Remove PooledClient - Add reconnect functionality to client - Add 'idle' client event and options --- docs/client.md | 26 +- lib/client/client.js | 679 +++++++++++++++++++++++++++++++------------ lib/client/index.js | 8 - lib/client/pool.js | 278 ------------------ package.json | 7 +- test/client.test.js | 48 ++- 6 files changed, 543 insertions(+), 503 deletions(-) delete mode 100644 lib/client/pool.js diff --git a/docs/client.md b/docs/client.md index 581bb9e..6bd2419 100644 --- a/docs/client.md +++ b/docs/client.md @@ -30,32 +30,14 @@ client is: ||log|| You can optionally pass in a bunyan instance the client will use to acquire a logger. The client logs all messages at the `trace` level.|| ||timeout||How long the client should let operations live for before timing out. Default is Infinity.|| ||connectTimeout||How long the client should wait before timing out on TCP connections. Default is up to the OS.|| -||maxConnections||Whether or not to enable connection pooling, and if so, how many to maintain.|| ||tlsOptions||Additional [options](http://nodejs.org/api/tls.html#tls_tls_connect_port_host_options_callback) passed to the TLS connection layer when connecting via `ldaps://`|| -If using connection pooling, you can additionally pass in: - -||bindDN||The DN all connections should be bound as.|| -||bindCredentials||The credentials to use with bindDN.|| -||checkInterval||How often to schedule health checks.|| -||maxIdleTime||How long a client can sit idle before initiating a health check (subject to the frequency set by checkInterval).|| ## Connection management As LDAP is a stateful protocol (as opposed to HTTP), having connections torn -down from underneath you is difficult to deal with. That said, the "raw" -client, which is what you get when maxConnections is either unset or <= 1, does -not do anything for you here; you can handle that however you want. +down from underneath you is difficult to deal with. -More commonly, you probably want to use connection pooling, which performs -health checks, and while you will see occasional errors from a client, those -will be highly transient, as the pooling logic will purge them and create new -ones for you. - -It is highly recommended you just provide bindCredentials initially, as all -clients used will be authenticated, but you can call `bind` at any given time. -This is expensive though, as the pool must first drain, be destroyed, and then -recreated. So try not to do that. ## Common patterns @@ -76,9 +58,6 @@ The bind API only allows LDAP 'simple' binds (equivalent to HTTP Basic Authentication) for now. Note that all client APIs can optionally take an array of `Control` objects. You probably don't need them though... -If you have more than 1 connection in the connection pool, you will be called -back after *all* of the connections are bound, not just the first one. - Example: client.bind('cn=root', 'secret', function(err) { @@ -288,9 +267,6 @@ find almost anything you're looking for. Performs an unbind operation against the LDAP server. -The unbind operation takes no parameters other than a callback, and will unbind -(and disconnect) *all* of the connections in the pool. - Example: client.unbind(function(err) { diff --git a/lib/client/client.js b/lib/client/client.js index 7b7c3f9..bf0e32e 100644 --- a/lib/client/client.js +++ b/lib/client/client.js @@ -4,6 +4,9 @@ var EventEmitter = require('events').EventEmitter; var net = require('net'); var tls = require('tls'); var util = require('util'); +var once = require('once'); +var backoff = require('backoff'); +var vasync = require('vasync'); var assert = require('assert-plus'); @@ -77,139 +80,65 @@ function validateControls(controls) { return controls; } - -function setupSocket(socket, opts) { - var log = opts.log; - - socket.ldap = { - id: opts.url ? opts.url.href : opts.socketPath, - messageID: 0, - messages: {}, - getNextMessageID: function getNextMessageID() { - if (++socket.ldap.messageID >= MAX_MSGID) - socket.ldap.messageID = 1; - - return socket.ldap.messageID; - }, - parser: new Parser({ - log: log - }) - }; - - // This won't be set on TLS. So. Very. Annoying. - if (typeof (socket.setKeepAlive) !== 'function') { - socket.setKeepAlive = function setKeepAlive(enable, delay) { - return socket.socket ? socket.socket.setKeepAlive(enable, delay) : false; - }; +/** + * Queue to contain LDAP requests. + * + * @param {Object} opts queue options + * + * Accepted Options: + * - size: Maximum queue size + * - timeout: Set timeout between first queue insertion and queue flush. + */ +function RequestQueue(opts) { + if (!opts || typeof (opts) !== 'object') { + opts = {}; } - - // Since tls.socket doesn't emit 'close' events, we must register to receive - // them on net.socket instead - var closeSocket = (opts.secure ? socket.socket : socket); - // On close we have to walk the outstanding messages and go invoke their - // callback with an error. - closeSocket.on('close', function onClose(had_err) { - socket.removeAllListeners('connect'); - closeSocket.removeAllListeners('close'); - socket.removeAllListeners('data'); - socket.removeAllListeners('drain'); - socket.removeAllListeners('end'); - socket.removeAllListeners('error'); - socket.removeAllListeners('timeout'); - - if (log.trace()) - log.trace('close event had_err=%s', had_err ? 'yes' : 'no'); - - opts.emit('close', had_err); - Object.keys(socket.ldap.messages).forEach(function (msgid) { - var err; - if (socket.unbindMessageID !== parseInt(msgid, 10)) { - err = new ConnectionError(socket.ldap.id + ' closed'); - } else { - err = new UnbindResponse({ - messageID: msgid - }); - err.status = 'unbind'; - } - - if (typeof (socket.ldap.messages[msgid]) === 'function') { - var callback = socket.ldap.messages[msgid]; - delete socket.ldap.messages[msgid]; - return callback(err); - } else if (socket.ldap.messages[msgid]) { - if (err instanceof Error) - socket.ldap.messages[msgid].emit('error', err); - delete socket.ldap.messages[msgid]; - } - - delete socket.ldap.parser; - delete socket.ldap; - return false; - }); - }); - - socket.on('data', function onData(data) { - if (log.trace()) - log.trace('data event: %s', util.inspect(data)); - - socket.ldap.parser.write(data); - }); - - socket.on('end', function onEnd() { - if (log.trace()) - log.trace('end event'); - - opts.emit('end'); - socket.end(); - }); - - socket.on('error', function onError(err) { - if (log.trace()) - log.trace({err: err}, 'error event: %s', new Error().stack); - - if (opts.connectTimer) { - clearTimeout(opts.connectTimer); - opts.connectTimer = false; - } - - if (opts.listeners('error').length) - opts.emit('error', err); - - socket.end(); - }); - - socket.on('timeout', function onTimeout() { - if (log.trace()) - log.trace('timeout event'); - - opts.emit('socketTimeout'); - socket.end(); - }); - - // The "router" - socket.ldap.parser.on('message', function onMessage(message) { - message.connection = socket; - var callback = socket.ldap.messages[message.messageID]; - - if (!callback) { - log.error({message: message.json}, 'unsolicited message'); - return false; - } - - return callback(message); - }); - - socket.ldap.parser.on('error', function onParseError(err) { - log.trace({err: err}, 'parser error event'); - - if (opts.listeners('error').length) - opts.emit('error', err); - - socket.end(); - }); + this.size = (opts.size > 0) ? opts.size : Infinity; + this.timeout = (opts.timeout > 0) ? opts.timeout : 0; + this._queue = []; + this._timer = null; } +/** + * Insert request into queue. + * + */ +RequestQueue.prototype.queue = function queue(message, expect, emitter, cb) { + if (this._queue.length >= this.maxLength) { + return false; + } + this._queue.push([message, expect, emitter, cb]); + if (this.maxAge > 0) { + if (this._timer !== null) { + this._timer = setTimeout(this.purge.bind(this), this.maxAge); + } + } + return true; +}; +/** + * Process all queued requests with callback. + */ +RequestQueue.prototype.flush = function flush(cb) { + if (this._timer) { + clearTimeout(this._timer); + this._timer = null; + } + var items = this._queue; + this._queue = []; + items.forEach(function (req) { + cb(req[0], req[1], req[2], req[3]); + }); +}; + +/** + * Purge all queued requests with an error. + */ +RequestQueue.prototype.purge = function purge() { + this.flush(function (msg, expect, emitter, cb) { + cb(new Error('RequestQueue timeout')); + }); +}; ///--- API @@ -233,18 +162,37 @@ function Client(options) { var _url; if (options.url) _url = url.parse(options.url); - - this.connectTimeout = parseInt((options.connectTimeout || 0), 10); this.host = _url ? _url.hostname : undefined; - this.log = options.log.child({clazz: 'Client'}, true); this.port = _url ? _url.port : false; this.secure = _url ? _url.secure : false; + this.url = _url; this.tlsOptions = options.tlsOptions; this.socketPath = options.socketPath || false; - this.timeout = parseInt((options.timeout || 0), 10); - this.url = _url; - this.socket = this._connect(); + this.log = options.log.child({clazz: 'Client'}, true); + + this.timeout = parseInt((options.timeout || 0), 10); + this.connectTimeout = parseInt((options.connectTimeout || 0), 10); + this.idleTimeout = parseInt((options.idleTimeout || 0), 10); + if (options.reconnect) { + this.reconnect = { + initialDelay: parseInt(options.reconnect.initialDelay || 100, 10), + maxDelay: parseInt(options.reconnect.maxDelay || 10000, 10), + failAfter: parseInt(options.reconnect.failAfter || 0, 10) + }; + } + + this.queuing = (options.queuing !== undefined) ? options.queuing : true; + if (this.queuing) { + this._queue = new RequestQueue({ + size: parseInt((options.queueSize || 0), 10), + timeout: parseInt((options.queueTimeout || 0), 10) + }); + } + + this.socket = null; + this.connected = false; + this._connect(); } util.inherits(Client, EventEmitter); module.exports = Client; @@ -345,7 +293,11 @@ Client.prototype.add = function add(name, entry, controls, callback) { * @param {Function} callback of the form f(err, res). * @throws {TypeError} on invalid input. */ -Client.prototype.bind = function bind(name, credentials, controls, callback) { +Client.prototype.bind = function bind(name, + credentials, + controls, + callback, + _bypass) { if (typeof (name) !== 'string' && !(name instanceof dn.DN)) throw new TypeError('name (string) required'); assert.string(credentials, 'credentials'); @@ -364,7 +316,7 @@ Client.prototype.bind = function bind(name, credentials, controls, callback) { controls: controls }); - return this._send(req, [errors.LDAP_SUCCESS], null, callback); + return this._send(req, [errors.LDAP_SUCCESS], null, callback, _bypass); }; @@ -619,7 +571,11 @@ Client.prototype.modifyDN = function modifyDN(name, * @param {Function} callback of the form f(err, res). * @throws {TypeError} on invalid input. */ -Client.prototype.search = function search(base, options, controls, callback) { +Client.prototype.search = function search(base, + options, + controls, + callback, + _bypass) { if (typeof (base) !== 'string' && !(base instanceof dn.DN)) throw new TypeError('base (string) required'); if (Array.isArray(options) || (options instanceof Control)) { @@ -675,7 +631,11 @@ Client.prototype.search = function search(base, options, controls, callback) { controls: controls }); - return this._send(req, [errors.LDAP_SUCCESS], new EventEmitter(), callback); + return this._send(req, + [errors.LDAP_SUCCESS], + new EventEmitter(), + callback, + _bypass); }; @@ -706,70 +666,407 @@ Client.prototype.unbind = function unbind(callback) { }; - -///--- Private API - -Client.prototype._connect = function _connect() { - var log = this.log; - var proto = this.secure ? tls : net; - var self = this; - var socket = null; - this.connectTimer = false; - - function onConnect() { - if (self.connectTimer) - clearTimeout(self.connectTimer); - - socket.removeListener('connect', onConnect); - socket.removeListener('secureConnect', onConnect); - assert.ok(socket.ldap); - - socket.ldap.id = nextClientId() + '__' + socket.ldap.id; - self.log = self.log.child({ldap_id: socket.ldap.id}, true); - - log.trace('connect event'); - - self.socket = socket; - self.emit('connect', socket); +/** + * Disconnect from the LDAP server and do not allow reconnection. + * + * If the client is instantiated with proper reconnection options, it's + * possible to initiate new requests after a call to unbind since the client + * will attempt to reconnect in order to fulfill the request. + * + * Calling destroy will prevent any further reconnections from occuring. + */ +Client.prototype.destroy = function destroy() { + this.destroyed = true; + if (this._queue) { + // Purge any queued requests which are now meaningless + this._queue.flush(function (msg, expect, emitter, cb) { + if (typeof (cb) === 'function') { + cb(new Error('client destroyed')); + } + }); } - - socket = proto.connect((this.port || this.socketPath), - this.host, - this.secure ? this.tlsOptions : null); - - socket.once('connect', onConnect); - socket.once('secureConnect', onConnect); - setupSocket(socket, this); - - if (this.connectTimeout) { - this.connectTimer = setTimeout(function onConnectTimeout() { - if (!socket || !socket.readable || !socket.writeable) { - socket.destroy(); - - self.emit('connectTimeout'); - } - }, this.connectTimeout); - } - - return socket; + this.unbind(); }; -Client.prototype._send = function _send(message, expect, emitter, callback) { +///--- Private API + +/** + * Initiate LDAP connection. + */ +Client.prototype._connect = function _connect() { + if (this.connecting) { + return; + } + var self = this; + var log = this.log; + var socket; + + // Establish basic socket connection + function connectSocket(_, cb) { + cb = once(cb); + + function onResult(err, res) { + if (err) { + if (self.connectTimer) { + clearTimeout(self.connectTimer); + self.connectTimer = null; + } + self.emit('connectError', err); + } + cb(err, res); + } + function onConnect() { + if (self.connectTimer) { + clearTimeout(self.connectTimer); + self.connectTimer = null; + } + socket.removeAllListeners('error') + .removeAllListeners('connect') + .removeAllListeners('secureConnect'); + + socket.ldap.id = nextClientId() + '__' + socket.ldap.id; + self.log = self.log.child({ldap_id: socket.ldap.id}, true); + + // Move on to client setup + setupClient(cb); + } + + var port = (self.port || self.socketPath); + if (self.secure) { + socket = tls.connect(port, self.host, self.tlsOptions); + socket.once('secureConnect', onConnect); + } else { + socket = net.connect(port, self.host); + socket.once('connect', onConnect); + } + socket.once('error', onResult); + initSocket(); + + // Setup connection timeout handling, if desired + if (self.connectTimeout) { + self.connectTimer = setTimeout(function onConnectTimeout() { + if (!socket || !socket.readable || !socket.writeable) { + socket.destroy(); + self.socket = null; + onResult(new ConnectionError('connection timeout')); + } + }, self.connectTimeout); + } + } + + // Initialize socket events and LDAP parser. + function initSocket() { + socket.ldap = { + id: self.url ? self.url.href : self.socketPath, + messageID: 0, + messages: {}, + getNextMessageID: function getNextMessageID() { + if (++socket.ldap.messageID >= MAX_MSGID) + socket.ldap.messageID = 1; + + return socket.ldap.messageID; + }, + parser: new Parser({ + log: log + }) + }; + + // This won't be set on TLS. So. Very. Annoying. + if (typeof (socket.setKeepAlive) !== 'function') { + socket.setKeepAlive = function setKeepAlive(enable, delay) { + return socket.socket ? + socket.socket.setKeepAlive(enable, delay) : false; + }; + } + + socket.on('data', function onData(data) { + if (log.trace()) + log.trace('data event: %s', util.inspect(data)); + + socket.ldap.parser.write(data); + }); + + // The "router" + socket.ldap.parser.on('message', function onMessage(message) { + message.connection = socket; + var callback = socket.ldap.messages[message.messageID]; + + if (!callback) { + log.error({message: message.json}, 'unsolicited message'); + return false; + } + + return callback(message); + }); + + socket.ldap.parser.on('error', function onParseError(err) { + log.trace({err: err}, 'parser error event'); + self.emit('error', err); + self.connected = false; + socket.end(); + }); + } + + // After connect, register socket event handlers and run any setup actions + function setupClient(cb) { + cb = once(cb); + + // Indicate failure if anything goes awry during setup + function bail(err) { + socket.destroy(); + cb(err || new Error('client error during setup')); + } + // Work around lack of close event on tls.socket + ((self.secure) ? socket.socket : socket).once('close', bail); + socket.once('error', bail); + socket.once('end', bail); + socket.once('timeout', bail); + + self.socket = socket; + + // Run any requested setup (such as automatically performing a bind) on + // socket before signalling successful connection. + // This setup needs to bypass the request queue since all other activity is + // blocked until the connection is considered fully established post-setup. + // Only allow bind/search for now. + var basicClient = { + bind: function bindBypass(name, credentials, controls, callback) { + return self.bind(name, credentials, controls, callback, true); + }, + search: function searchBypass(base, options, controls, callback) { + return self.search(base, options, controls, callback, true); + }, + unbind: self.unbind.bind(self) + }; + var setupSteps = self.listeners('setup'); + if (setupSteps.length) { + vasync.forEachPipeline({ + func: function (f, callback) { + f(basicClient, callback); + }, + inputs: setupSteps + }, function (err, result) { + cb(err, socket); + }); + } else { + cb(null, socket); + } + } + + // Wire up "official" event handlers after successful connect/setup + function postSetup() { + socket.removeAllListeners('error') + .removeAllListeners('close') + .removeAllListeners('end') + .removeAllListeners('timeout'); + + // Work around lack of close event on tls.socket + ((self.secure) ? socket.socket : socket).once('close', + self._onClose.bind(self)); + socket.on('end', function onEnd() { + if (log.trace()) + log.trace('end event'); + + self.emit('end'); + socket.end(); + }); + socket.on('error', function onSocketError(err) { + if (log.trace()) + log.trace({err: err}, 'error event: %s', new Error().stack); + + self.emit('error', err); + socket.destroy(); + }); + socket.on('timeout', function onTimeout() { + if (log.trace()) + log.trace('timeout event'); + + self.emit('socketTimeout'); + socket.end(); + }); + } + + var retry = backoff.call(connectSocket, {}, function (err, res) { + self.connecting = false; + if (!err) { + postSetup(); + self.connected = true; + self.emit('connect', socket); + self.log.debug('connected after %d attempts', retry.getNumRetries()); + // Flush any queued requests + self._flushQueue(); + } else { + self.log.debug('failed to connect after %d attempts', + retry.getNumRetries()); + // Communicate the last-encountered error + if (err instanceof ConnectionError) { + self.emit('connectTimeout'); + } else { + self.emit('error', err); + } + } + }); + if (this.reconnect) { + retry.setStrategy(new backoff.ExponentialStrategy({ + initialDelay: this.reconnect.minDelay, + maxDelay: this.reconnect.maxDelay + })); + retry.failAfter(this.reconnect.failAfter || Infinity); + } else { + // Only attempt the connection once for non-reconnection clients + retry.failAfter(1); + } + this.connecting = true; + retry.start(); +}; + +/** + * Flush queued requests out to the socket. + */ +Client.prototype._flushQueue = function _flushQueue() { + if (this._queue) { + // Pull items we're about to process out of the queue. + this._queue.flush(this._send.bind(this)); + } +}; + +/** + * Clean up socket/parser resources after socket close. + */ +Client.prototype._onClose = function _onClose(had_err) { + var socket = this.socket; + socket.removeAllListeners('connect') + .removeAllListeners('data') + .removeAllListeners('drain') + .removeAllListeners('end') + .removeAllListeners('error') + .removeAllListeners('timeout'); + this.socket = null; + this.connected = false; + + if (!this.secure) { + socket.removeAllListeners('close'); + } else { + socket.socket.removeAllListeners('close'); + } + + if (this.log.trace()) + this.log.trace('close event had_err=%s', had_err ? 'yes' : 'no'); + + this.emit('close', had_err); + // On close we have to walk the outstanding messages and go invoke their + // callback with an error. + Object.keys(socket.ldap.messages).forEach(function (msgid) { + var err; + if (socket.unbindMessageID !== parseInt(msgid, 10)) { + err = new ConnectionError(socket.ldap.id + ' closed'); + } else { + // Unbinds will be communicated as a success since we're closed + err = new UnbindResponse({ + messageID: msgid + }); + err.status = 'unbind'; + } + + if (typeof (socket.ldap.messages[msgid]) === 'function') { + var callback = socket.ldap.messages[msgid]; + delete socket.ldap.messages[msgid]; + return callback(err); + } else if (socket.ldap.messages[msgid]) { + if (err instanceof Error) + socket.ldap.messages[msgid].emit('error', err); + delete socket.ldap.messages[msgid]; + } + return false; + }); + delete socket.ldap.parser; + delete socket.ldap; + if (had_err && this.reconnect) { + this._connect(); + } + return false; +}; + +/** + * Maintain idle timer for client. + * + * Will start timer to fire 'idle' event if conditions are satisfied. If + * conditions are not met and a timer is running, it will be cleared. + * + * @param {Boolean} override explicitly disable timer. + */ +Client.prototype._updateIdle = function _updateIdle(override) { + if (this.idleTimeout === 0) { + return; + } + // Client must be connected but not waiting on any request data + var self = this; + function isIdle(disable) { + // FIXME: this breaks with abandons + return ((disable !== true) && + (self.socket && self.connected) && + (Object.keys(self.socket.ldap.messages).length === 0)); + } + if (isIdle(override)) { + if (!this._idleTimer) { + this._idleTimer = setTimeout(function () { + // Double-check idleness in case socket was torn down + if (isIdle()) { + self.emit('idle'); + } + }, this.idleTimeout); + } + } else { + if (this._idleTimer) { + clearTimeout(this._idleTimer); + this._idleTimer = null; + } + } +}; + +/** + * Attempt to send an LDAP request. + */ +Client.prototype._send = function _send(message, + expect, + emitter, + callback, + _bypass) { assert.ok(message); assert.ok(expect); assert.ok(typeof (emitter) !== undefined); assert.ok(callback); + // Allow connect setup traffic to bypass checks + if (_bypass && this.socket && this.socket.writable) { + return this._sendSocket(message, expect, emitter, callback); + } + if (!this.socket || !this.connected) { + if (this._queue) { + if (!this._queue.queue(message, expect, emitter, callback)) { + callback(new Error('request queue full')); + } + // Initiate reconnect if needed + return this._connect(); + } + return callback(new ConnectionError('no socket')); + } else { + this._flushQueue(); + return this._sendSocket(message, expect, emitter, callback); + } +}; + +Client.prototype._sendSocket = function _sendSocket(message, + expect, + emitter, + callback) { var conn = this.socket; var log = this.log; var self = this; var timer = false; var sentEmitter = false; - if (!conn) - return callback(new ConnectionError('no socket')); - function _done(event, obj) { if (emitter) { if (event === 'error') { @@ -861,6 +1158,8 @@ Client.prototype._send = function _send(message, expect, emitter, callback) { return undefined; } else { delete conn.ldap.messages[message.messageID]; + // Potentially mark client as idle + self._updateIdle(); if (msg instanceof LDAPResult) { if (expect.indexOf(msg.status) === -1) @@ -890,6 +1189,8 @@ Client.prototype._send = function _send(message, expect, emitter, callback) { return callback(null); } else if (expect === 'unbind') { conn.unbindMessageID = message.id; + // Mark client as disconnected once unbind clears the socket + self.connected = false; conn.end(); } else if (emitter) { sentEmitter = true; @@ -901,6 +1202,8 @@ Client.prototype._send = function _send(message, expect, emitter, callback) { // Start actually doing something... message.messageID = conn.ldap.getNextMessageID(); conn.ldap.messages[message.messageID] = messageCallback; + // Mark client as active + this._updateIdle(true); if (self.timeout) { log.trace('Setting timeout to %d', self.timeout); diff --git a/lib/client/index.js b/lib/client/index.js index 5f8fae3..6186bd9 100644 --- a/lib/client/index.js +++ b/lib/client/index.js @@ -5,8 +5,6 @@ var assert = require('assert'); var Logger = require('bunyan'); var Client = require('./client'); -var ClientPool = require('./pool'); - ///--- Globals @@ -19,7 +17,6 @@ var DEF_LOG = new Logger({ }); - ///--- Functions function xor() { @@ -39,7 +36,6 @@ function xor() { ///--- Exports module.exports = { - createClient: function createClient(options) { if (typeof (options) !== 'object') throw new TypeError('options (object) required'); @@ -54,10 +50,6 @@ module.exports = { if (typeof (options.log) !== 'object') throw new TypeError('options.log must be an object'); - if (options.maxConnections > 1) - return new ClientPool(options); - return new Client(options); } - }; diff --git a/lib/client/pool.js b/lib/client/pool.js deleted file mode 100644 index 8b03dde..0000000 --- a/lib/client/pool.js +++ /dev/null @@ -1,278 +0,0 @@ -// Copyright 2012 Mark Cavage, Inc. All rights reserved. - -var assert = require('assert'); -var EventEmitter = require('events').EventEmitter; -var util = require('util'); - -var pooling = require('pooling'); - -var ConnectionError = require('../errors').ConnectionError; -var BindResponse = require('../messages').BindResponse; - -var Client = require('./client'); - - - -///--- Globals - -var STD_OPS = [ - 'add', - 'del', - 'modify', - 'modifyDN' -]; - -var RETURN_VAL_OPS = [ - 'compare', - 'exop' -]; - - - -///--- Internal Functions - -function createPool(options, clientpool) { - assert.ok(options); - - return pooling.createPool({ - checkInterval: options.checkInterval, - log: options.log, - name: 'ldapjs_' + (options.url || options.socketPath).replace(/[:/]/g, '_'), - max: options.maxConnections, - maxIdleTime: options.maxIdleTime, - - create: function createClient(callback) { - var client = new Client(options); - - client.on('error', function (err) { - client.removeAllListeners('connect'); - client.removeAllListeners('connectTimeout'); - if (clientpool.listeners('error').length) { - clientpool.emit('error', err); - } - return callback(err); - }); - - client.on('connectTimeout', function () { - client.removeAllListeners('connect'); - clientpool.emit('connectTimeout'); - }); - - client.once('connect', function onConnect() { - client.removeAllListeners('error'); - - if (!options.bindDN || !options.bindCredentials) - return callback(null, client); - - function bindCallback(err, res) { - if (err) - return callback(err, null); - - return callback(null, client); - } - - return client.bind(options.bindDN, - options.bindCredentials, - options.bindControls || [], - bindCallback); - }); - }, - - check: function check(client, callback) { - // just do a root dse search - client.search('', '(objectclass=*)', function (err, res) { - if (err) - return callback(err); - - res.on('error', function (e) { - res.removeAllListeners('end'); - callback(e); - }); - - res.on('end', function () { - res.removeAllListeners('error'); - callback(null); - }); - - return undefined; - }); - }, - - destroy: function destroy(client) { - client.unbind(function () {}); - } - }); -} - - - -///--- API - -function ClientPool(options) { - assert.ok(options); - EventEmitter.call(this, options); - - this.log = options.log.child({clazz: 'ClientPool'}, true); - this.options = { - bindDN: options.bindDN, - bindCredentials: options.bindCredentials, - bindControls: options.bindControls || [], - checkInterval: options.checkInterval, - connectTimeout: (options.connectTimeout || 0), - maxIdleTime: options.maxIdleTime, - maxConnections: options.maxConnections, - log: options.log, - socketPath: options.socketPath, - timeout: (options.timeout || 0), - url: options.url, - tlsOptions: options.tlsOptions - }; - this.pool = createPool(options, this); -} -util.inherits(ClientPool, EventEmitter); -module.exports = ClientPool; - - - -STD_OPS.forEach(function (op) { - ClientPool.prototype[op] = function clientProxy() { - var args = Array.prototype.slice.call(arguments); - var cb = args.pop(); - if (typeof (cb) !== 'function') - throw new TypeError('callback (Function) required'); - var self = this; - - return this.pool.acquire(function onAcquire(err, client) { - if (err) - return cb(err); - - args.push(function proxyCallback(err, res) { - self.pool.release(client); - return cb(err, res); - }); - - try { - return Client.prototype[op].apply(client, args); - } catch (e) { - self.pool.release(client); - return cb(e); - } - }); - }; -}); - - -RETURN_VAL_OPS.forEach(function (op) { - ClientPool.prototype[op] = function clientProxy() { - var args = Array.prototype.slice.call(arguments); - var cb = args.pop(); - if (typeof (cb) !== 'function') - throw new TypeError('callback (Function) required'); - var self = this; - - return this.pool.acquire(function onAcquire(poolErr, client) { - if (poolErr) - return cb(poolErr); - - args.push(function proxyCallback(err, val, res) { - self.pool.release(client); - return cb(err, val, res); - }); - - try { - return Client.prototype[op].apply(client, args); - } catch (e) { - self.pool.release(client); - return cb(e); - } - }); - }; -}); - - -ClientPool.prototype.search = function search(base, opts, controls, callback) { - if (typeof (controls) === 'function') { - callback = controls; - controls = []; - } - - var self = this; - - return this.pool.acquire(function onAcquire(err, client) { - if (err) - return callback(err); - - // This is largely in existence for search requests - client.timeout = self.timeout || client.timeout; - - - return client.search(base, opts, controls, function (err, res) { - function cleanup() { - self.pool.release(client); - } - - if (err) { - cleanup(); - return callback(err, res); - } - res.on('error', cleanup); - res.on('end', cleanup); - - return callback(null, res); - }); - }); -}; - - -ClientPool.prototype.abandon = function abandon(msgid, controls, callback) { - if (typeof (controls) === 'function') { - callback = controls; - controls = []; - } - - this.log.error({ - messageID: msgid - }, 'Abandon is not supported with connection pooling. Ignoring.'); - return callback(null); -}; - - -ClientPool.prototype.bind = function bind(dn, creds, controls, callback) { - if (typeof (controls) === 'function') { - callback = controls; - controls = []; - } - - var self = this; - - self.options.bindDN = null; - self.options.bindCredentials = null; - self.options.bindControls = null; - - return this.pool.shutdown(function () { - self.pool = createPool(self.options, self); - - return self.pool.acquire(function onAcquire(err, client) { - if (err) - return callback(err); - - return client.bind(dn, creds, controls, function (err, res) { - self.pool.release(client); - - if (err) - return callback(err, res); - - self.options.bindDN = dn; - self.options.bindCredentials = creds; - self.options.bindControls = controls; - - return callback(null, res); - }); - }); - }); -}; - - -ClientPool.prototype.unbind = function unbind(callback) { - return this.pool.shutdown(callback); -}; diff --git a/package.json b/package.json index 7c4e869..8b3cf80 100644 --- a/package.json +++ b/package.json @@ -31,15 +31,16 @@ "assert-plus": "0.1.5", "bunyan": "0.22.1", "dashdash": "1.6.0", - "pooling": "0.4.6" + "backoff": "2.4.0", + "once": "1.3.0", + "vasync": "1.5.0" }, "optionalDependencies": { "dtrace-provider": "0.2.8" }, "devDependencies": { "tap": "0.4.1", - "node-uuid": "1.4.0", - "vasync": "1.4.3" + "node-uuid": "1.4.0" }, "scripts": { "test": "./node_modules/.bin/tap ./test" diff --git a/test/client.test.js b/test/client.test.js index 72dc77c..e049614 100644 --- a/test/client.test.js +++ b/test/client.test.js @@ -77,6 +77,21 @@ test('setup', function (t) { return next(); }); + server.search('dc=slow', function (req, res, next) { + res.send({ + dn: 'dc=slow', + attributes: { + 'you': 'wish', + 'this': 'was', + 'faster': '.' + } + }); + setTimeout(function () { + res.end(); + next(); + }, 250); + }); + server.search('dc=timeout', function (req, res, next) { // Haha client! }); @@ -132,7 +147,6 @@ test('setup', function (t) { client = ldap.createClient({ connectTimeout: parseInt(process.env.LDAP_CONNECT_TIMEOUT || 0, 10), socketPath: SOCKET, - maxConnections: parseInt(process.env.LDAP_MAX_CONNS || 5, 10), idleTimeoutMillis: 10, log: new Logger({ name: 'ldapjs_unit_test', @@ -572,6 +586,38 @@ test('GH-24 attribute selection of *', function (t) { }); +test('idle timeout', function (t) { + // FIXME: this must be run before abandon + client.idleTimeout = 250; + function premature() { + t.ifError(true); + } + client.on('idle', premature); + client.search('dc=slow', 'objectclass=*', function (err, res) { + t.ifError(err); + res.on('searchEntry', function (res) { + t.ok(res); + }); + res.on('error', function (err) { + t.ifError(err); + }); + res.on('end', function () { + var late = setTimeout(function () { + t.ifError(false, 'too late'); + }, 500); + // It's ok to go idle now + client.removeListener('idle', premature); + client.on('idle', function () { + clearTimeout(late); + client.removeAllListeners('idle'); + client.idleTimeout = 0; + t.end(); + }); + }); + }); +}); + + test('abandon (GH-27)', function (t) { client.abandon(401876543, function (err) { t.ifError(err);