Client reconnect logic

This commit is contained in:
Mark Cavage 2011-09-26 16:45:49 -07:00
parent e2cac51a29
commit 50a0c59d1f
4 changed files with 178 additions and 98 deletions

View File

@ -28,22 +28,21 @@ client is:
||socketPath|| If you're running an LDAP server over a Unix Domain Socket, use this.||
||log4js|| You can optionally pass in a log4js instance the client will use to acquire a logger. The client logs all messages at the `Trace` level.||
||numConnections||The size of the connection pool. Default is 1.||
||reconnect||Whether or not to automatically reconnect (and rebind) on socket errors. Takes amount of time in millliseconds. Default is 1000. 0/false will disable altogether.||
## Connection management
If you'll recall, the LDAP protocol is connection-oriented, and completely
asynchronous on a connection (meaning you can send as many requests as you want
without waiting for responses). However, our friend `bind` is a little
different in that you generally want to wait for binds to be completed since
subsequent operations assume that level of privilege.
As LDAP is a stateful protocol (as opposed to HTTP), having connections torn
down from underneath you is difficult to deal with. As such, the ldapjs client
will automatically reconnect when the underlying socket has errors. You can
disable this behavior by passing `reconnect=false` in the options at construct
time, or just setting the reconnect property to false at any time.
The ldapjs client deals with this by maintaing a connection pool, and splaying
requests across that connection pool, with the exception of `bind` and `unbind`,
which it will apply to all connections in the pool. By default, a client will
have one connection in the pool (since it's async already, you don't always need
the complexity of a pool). And after that, the operations in the client are
pretty much a mapping of the LDAP C API, but made higher-level, so they make
sense in JS.
On reconnect, the client will additionally automatically rebind (assuming you
ever successfully called bind). Only after the rebind succeeds will other
operations be allowed back through; in the meantime all callbacks will receive
a `DisconnectedError`. If you never called `bind`, the client will allow
operations when the socket is connected.
## Common patterns

View File

@ -73,13 +73,17 @@ function validateControls(controls) {
}
function DisconnectedError(message) {
Error.call(this, message);
if (Error.captureStackTrace)
Error.captureStackTrace(this, DisconnectedError);
function ConnectionError(message) {
errors.LDAPError.call(this,
'ConnectionError',
0x80, // LDAP_OTHER,
message,
null,
ConnectionError);
}
util.inherits(DisconnectedError, Error);
util.inherits(ConnectionError, errors.LDAPError);
///--- API
@ -120,7 +124,8 @@ function Client(options) {
this.log4js = options.log4js || logStub;
this.connectOptions = {
port: self.url ? self.url.port : options.socketPath,
host: self.url ? self.url.hostname : undefined
host: self.url ? self.url.hostname : undefined,
socketPath: options.socketPath || undefined
};
this.shutdown = false;
@ -131,73 +136,9 @@ function Client(options) {
return self._log;
});
// Build the connection pool
function newConnection() {
var c;
if (self.secure) {
c = tls.connect(self.connectOptions.port, self.connectOptions.host);
} else {
c = net.createConnection(self.connectOptions.port,
self.connectOptions.host);
}
assert.ok(c);
c.parser = new Parser({
log4js: self.log4js
});
// Wrap the events
c.ldap = {
id: options.socketPath || self.url.hostname,
connected: true, // lie, but node queues for us
messageID: 0,
messages: {}
};
c.ldap.__defineGetter__('nextMessageID', function() {
if (++c.ldap.messageID >= MAX_MSGID)
c.ldap.messageID = 1;
return c.ldap.messageID;
});
c.on('connect', function() {
c.ldap.connected = true;
c.ldap.id += ':' + c.fd;
self.emit('connect', c.ldap.id);
});
c.on('end', function() {
self.emit('end');
});
c.addListener('close', function(had_err) {
self.emit('close', had_err);
});
c.on('error', function(err) {
self.emit('error', err);
});
c.on('timeout', function() {
self.emit('timeout');
});
c.on('data', function(data) {
if (self.log.isTraceEnabled())
self.log.trace('data on %s: %s', c.ldap.id, util.inspect(data));
c.parser.write(data);
});
// The "router"
c.parser.on('message', function(message) {
message.connection = c;
var callback = c.ldap.messages[message.messageID];
if (!callback) {
self.log.error('%s: received unsolicited message: %j', c.ldap.id,
message.json);
return;
}
return callback(message);
});
return c;
}
self.connection = newConnection();
this.connection = this._newConnection();
this.reconnect = (typeof(options.reconnect) === 'number' ?
options.reconnect : 1000);
}
util.inherits(Client, EventEmitter);
module.exports = Client;
@ -210,9 +151,10 @@ module.exports = Client;
* @param {String} credentials the userPassword associated with name.
* @param {Control} controls (optional) either a Control or [Control].
* @param {Function} callback of the form f(err, res).
* @param {Socket} conn don't use this. Internal only (reconnects).
* @throws {TypeError} on invalid input.
*/
Client.prototype.bind = function(name, credentials, controls, callback) {
Client.prototype.bind = function(name, credentials, controls, callback, conn) {
if (typeof(name) !== 'string')
throw new TypeError('name (string) required');
if (typeof(credentials) !== 'string')
@ -235,7 +177,14 @@ Client.prototype.bind = function(name, credentials, controls, callback) {
controls: controls
});
return self._send(req, [errors.LDAP_SUCCESS], callback);
return self._send(req, [errors.LDAP_SUCCESS], function(err, res) {
if (!err) { // In case we need to reconnect later
self._bindDN = name;
self._credentials = credentials;
}
return callback(err, res);
}, conn);
};
@ -581,6 +530,9 @@ Client.prototype.search = function(base, options, controls, callback) {
controls: controls
});
if (!this.connection)
return callback(new ConnectionError('no connection'));
var res = new EventEmitter();
this._send(req, [errors.LDAP_SUCCESS], res);
return callback(null, res);
@ -599,26 +551,34 @@ Client.prototype.search = function(base, options, controls, callback) {
Client.prototype.unbind = function(callback) {
if (callback && typeof(callback) !== 'function')
throw new TypeError('callback must be a function');
var self = this;
if (!callback)
callback = function defUnbindCb() { self.log.trace('disconnected'); };
var self = this;
this.reconnect = false;
this._bindDN = null;
this._credentials = null;
var req = new UnbindRequest();
return self._send(req, 'unbind', callback);
};
Client.prototype._send = function(message, expect, callback) {
Client.prototype._send = function(message, expect, callback, connection) {
assert.ok(message);
assert.ok(expect);
assert.ok(callback);
var conn = this.connection || connection;
var self = this;
var conn = self.connection;
if (!conn) {
if (typeof(callback) === 'function')
return callback(new ConnectionError('no connection'));
return callback.emit('error', new ConnectionError('no connection'));
}
// Now set up the callback in the messages table
message.messageID = conn.ldap.nextMessageID;
@ -645,6 +605,8 @@ Client.prototype._send = function(message, expect, callback) {
} else if (res instanceof SearchEntry) {
assert.ok(callback instanceof EventEmitter);
callback.emit('searchEntry', res);
} else if (res instanceof Error) {
return callback(res);
} else {
delete conn.ldap.messages[message.messageID];
@ -663,10 +625,127 @@ Client.prototype._send = function(message, expect, callback) {
// Note if this was an unbind, we just go ahead and end, since there
// will never be a response
return conn.write(message.toBer(), (expect === 'unbind' ? function() {
conn.on('end', function() {
return callback();
});
conn.end();
} : null));
};
Client.prototype._newConnection = function() {
var c;
var connectOpts = this.connectOptions;
var log = this.log;
var self = this;
if (this.secure) {
c = tls.connect(connectOpts.port, connectOpts.host);
} else {
c = net.createConnection(connectOpts.port, connectOpts.host);
}
assert.ok(c);
c.parser = new Parser({
log4js: self.log4js
});
// Wrap the events
c.ldap = {
id: connectOpts.socketPath || self.url.hostname,
messageID: 0,
messages: {}
};
c.ldap.__defineGetter__('nextMessageID', function() {
if (++c.ldap.messageID >= MAX_MSGID)
c.ldap.messageID = 1;
return c.ldap.messageID;
});
c.on('connect', function() {
c.ldap.id += ':' + c.fd;
self.emit('connect', c.ldap.id);
if (log.isTraceEnabled())
log.trace('%s connect event', c.ldap.id);
if (self._bindDN && self._credentials) { // reconnect case
self.bind(self._bindDN, self._credentials, [], function(err) {
if(err) {
log.trace('%s error rebinding: %s', c.ldap.id, err.stack);
return c.end();
}
self.connection = c;
}, c);
} else {
self.connection = c;
}
});
c.on('end', function() {
self.emit('end');
if (log.isTraceEnabled())
log.trace('%s end event', c.ldap.id);
});
c.addListener('close', function(had_err) {
self.emit('close', had_err);
if (log.isTraceEnabled())
log.trace('%s close event had_err=%s', c.ldap.id, had_err ? 'yes' : 'no');
Object.keys(c.ldap.messages).forEach(function(msgid) {
if (typeof(c.ldap.messages[msgid]) === 'function') {
var _cb = c.ldap.messages[msgid];
delete c.ldap.messages[msgid];
return _cb(new ConnectionError(c.ldap.id + ' closed'));
} else if (c.ldap.messages[msgid]) {
c.ldap.messages[msgid].emit('error', new ConnectionError(c.ldap.id +
' closed'));
}
delete c.ldap.messages[msgid];
});
delete c.ldap;
if (self.reconnect) {
self.connection = null;
setTimeout(function() { self._newConnection() }, self.reconnect);
}
});
c.on('error', function(err) {
if (self.listeners('error').length)
self.emit('error', err);
if (log.isTraceEnabled())
log.trace('%s error event=%s', c.ldap.id, err ? err.stack : '?');
c.end();
});
c.on('timeout', function() {
self.emit('timeout');
if (log.isTraceEnabled())
log.trace('%s timeout event=%s', c.ldap.id);
c.end();
});
c.on('data', function(data) {
if (log.isTraceEnabled())
log.trace('%s data event: %s', c.ldap.id, util.inspect(data));
c.parser.write(data);
});
// The "router"
c.parser.on('message', function(message) {
message.connection = c;
var callback = c.ldap.messages[message.messageID];
if (!callback) {
log.error('%s: unsolicited message: %j', c.ldap.id, message.json);
return;
}
return callback(message);
});
return c;
}

View File

@ -80,6 +80,7 @@ util.inherits(LDAPError, Error);
// Some whacky games here to make sure all the codes are exported
module.exports = {};
module.exports.LDAPError = LDAPError;
Object.keys(CODES).forEach(function(code) {
module.exports[code] = CODES[code];

View File

@ -96,10 +96,11 @@ test('setup', function(t) {
server.listen(SOCKET, function() {
client = ldap.createClient({
socketPath: SOCKET
socketPath: SOCKET,
reconnect: false // turn this off for unit testing
});
t.ok(client);
// client.log4js.setLevel('Debug');
client.log4js.setLevel('Trace');
t.end();
});