Better handling of disconnect/reconnect over TLS and Plain sockets

This commit is contained in:
Mark Cavage 2011-11-09 14:44:12 -08:00
parent aedd9f222c
commit 8fdff3e06d
1 changed files with 89 additions and 43 deletions

View File

@ -142,6 +142,7 @@ function Client(options) {
this.reconnect = (typeof(options.reconnect) === 'number' ? this.reconnect = (typeof(options.reconnect) === 'number' ?
options.reconnect : 1000); options.reconnect : 1000);
this.connection = null;
this.connect(); this.connect();
} }
util.inherits(Client, EventEmitter); util.inherits(Client, EventEmitter);
@ -152,18 +153,41 @@ module.exports = Client;
* Connects this client, either at construct time, or after an unbind has * Connects this client, either at construct time, or after an unbind has
* been called. Under normal circumstances you don't need to call this method. * been called. Under normal circumstances you don't need to call this method.
* *
* @param {Function} callback invoked when `connect()` is done.
*/ */
Client.prototype.connect = function() { Client.prototype.connect = function(callback) {
if (this.connection) if (this.connection)
return; return;
var self = this; var self = this;
this.connection = this._newConnection(); var connection = this.connection = this._newConnection();
this.connection.on('end', function() {
delete self.connection; function reconnect() {
self.connection = null;
if (self.reconnect)
setTimeout(function() { self.connect(); }, self.reconnect);
}
this.connection.on('close', function(had_err) {
self.emit('close', had_err);
reconnect();
});
this.connection.on('connect', function() {
if (self._bindDN && self._credentials)
return self.bind(self._bindDN, self._credentials, function(err) {
if (err)
c.end();
self.emit('connect')
});
self.emit('connect')
}); });
}; };
/** /**
* Performs a simple authentication against the server. * Performs a simple authentication against the server.
* *
@ -610,7 +634,25 @@ Client.prototype.search = function(base, options, controls, callback) {
return callback(new ConnectionError('no connection')); return callback(new ConnectionError('no connection'));
var res = new EventEmitter(); var res = new EventEmitter();
// This is some whacky logic to account for the connection not being
// reconnected, and having thrown an error like "NotWriteable". Because
// the event emitter logic will never block, we'll end up returning from
// the event.on('error'), rather than "normally".
var self = this;
var done = false;
function errorIfNoConn(err) {
if (done)
return;
done = true;
return callback(err);
}
res.once('error', errorIfNoConn);
this._send(req, [errors.LDAP_SUCCESS], res); this._send(req, [errors.LDAP_SUCCESS], res);
done = true;
res.removeListener('error', errorIfNoConn);
return callback(null, res); return callback(null, res);
}; };
@ -646,16 +688,21 @@ Client.prototype._send = function(message, expect, callback, connection) {
assert.ok(expect); assert.ok(expect);
assert.ok(callback); assert.ok(callback);
var conn = this.connection || connection; var conn = connection || this.connection;
var self = this; var self = this;
if (!conn) { function closedConn() {
conn.destroy();
if (typeof(callback) === 'function') if (typeof(callback) === 'function')
return callback(new ConnectionError('no connection')); return callback(new ConnectionError('no connection'));
return callback.emit('error', new ConnectionError('no connection')); return callback.emit('error', new ConnectionError('no connection'));
} }
if (!conn)
return closedConn();
// Now set up the callback in the messages table // Now set up the callback in the messages table
message.messageID = conn.ldap.nextMessageID; message.messageID = conn.ldap.nextMessageID;
if (expect !== 'abandon') { if (expect !== 'abandon') {
@ -680,7 +727,6 @@ Client.prototype._send = function(message, expect, callback, connection) {
return callback(null, res); return callback(null, res);
callback.emit('end', res); callback.emit('end', res);
} else if (res instanceof SearchEntry) { } else if (res instanceof SearchEntry) {
assert.ok(callback instanceof EventEmitter); assert.ok(callback instanceof EventEmitter);
callback.emit('searchEntry', res); callback.emit('searchEntry', res);
@ -690,8 +736,11 @@ Client.prototype._send = function(message, expect, callback, connection) {
callback.emit('searchReference', res); callback.emit('searchReference', res);
} else if (res instanceof Error) { } else if (res instanceof Error) {
return callback(res); if (typeof(callback) === 'function')
return callback(res);
assert.ok(callback instanceof EventEmitter);
callback.emit('error', res);
} else { } else {
delete conn.ldap.messages[message.messageID]; delete conn.ldap.messages[message.messageID];
@ -723,7 +772,13 @@ Client.prototype._send = function(message, expect, callback, connection) {
} else { } else {
// noop // noop
} }
return conn.write(message.toBer(), _writeCb);
try {
return conn.write(message.toBer(), _writeCb);
} catch (e) {
conn.end();
return closedConn();
}
}; };
@ -734,7 +789,17 @@ Client.prototype._newConnection = function() {
var self = this; var self = this;
if (this.secure) { if (this.secure) {
c = tls.connect(connectOpts.port, connectOpts.host); c = tls.connect(connectOpts.port, connectOpts.host, function() {
if (log.isTraceEnabled())
log.trace('%s connect event', c.ldap.id);
c.ldap.connected = true;
c.ldap.id += ':' + c.fd;
c.emit('connect', true);
});
c.setKeepAlive = function(enable, delay) {
return c.socket.setKeepAlive(enable, delay);
};
} else { } else {
c = net.createConnection(connectOpts.port, connectOpts.host); c = net.createConnection(connectOpts.port, connectOpts.host);
} }
@ -758,33 +823,21 @@ Client.prototype._newConnection = function() {
}); });
c.on('connect', function() { c.on('connect', function() {
if (log.isTraceEnabled())
log.trace('%s connect event', c.ldap.id); c.ldap.connected = true;
c.ldap.id += ':' + c.fd; c.ldap.id += ':' + c.fd;
self.emit('connect', c.ldap.id); self.emit('connect', c.ldap.id);
if (log.isTraceEnabled())
log.trace('%s connect event', c.ldap.id);
if (self._bindDN && self._credentials) { // reconnect case
self.bind(self._bindDN, self._credentials, [], function(err) {
if (err) {
log.trace('%s error rebinding: %s', c.ldap.id, err.stack);
return c.end();
}
self.connection = c;
}, c);
} else {
self.connection = c;
}
}); });
c.on('end', function() { c.on('end', function() {
self.emit('end');
if (log.isTraceEnabled()) if (log.isTraceEnabled())
log.trace('%s end event', c.ldap.id); log.trace('%s end event', c.ldap.id);
c.end();
}); });
c.addListener('close', function(had_err) { c.on('close', function(had_err) {
self.emit('close', had_err);
if (log.isTraceEnabled()) if (log.isTraceEnabled())
log.trace('%s close event had_err=%s', c.ldap.id, had_err ? 'yes' : 'no'); log.trace('%s close event had_err=%s', c.ldap.id, had_err ? 'yes' : 'no');
@ -808,31 +861,23 @@ Client.prototype._newConnection = function() {
c.ldap.messages[msgid].emit('error', err); c.ldap.messages[msgid].emit('error', err);
delete c.ldap.messages[msgid]; delete c.ldap.messages[msgid];
} }
delete c.ldap;
delete c.parser;
}); });
delete c.ldap;
if (self.reconnect) {
self.connection = null;
setTimeout(function() {
delete c.unbindMessageID;
self._newConnection();
}, self.reconnect);
}
}); });
c.on('error', function(err) { c.on('error', function(err) {
if (self.listeners('error').length)
self.emit('error', err);
if (log.isTraceEnabled()) if (log.isTraceEnabled())
log.trace('%s error event=%s', c.ldap.id, err ? err.stack : '?'); log.trace('%s error event=%s', c.ldap.id, err ? err.stack : '?');
if (self.listeners('error').length)
self.emit('error', err);
c.end(); c.end();
}); });
c.on('timeout', function() { c.on('timeout', function() {
self.emit('timeout');
if (log.isTraceEnabled()) if (log.isTraceEnabled())
log.trace('%s timeout event=%s', c.ldap.id); log.trace('%s timeout event=%s', c.ldap.id);
@ -842,14 +887,15 @@ Client.prototype._newConnection = function() {
c.on('data', function(data) { c.on('data', function(data) {
if (log.isTraceEnabled()) if (log.isTraceEnabled())
log.trace('%s data event: %s', c.ldap.id, util.inspect(data)); log.trace('%s data event: %s', c.ldap.id, util.inspect(data));
c.parser.write(data); c.parser.write(data);
}); });
// The "router" // The "router"
c.parser.on('message', function(message) { c.parser.on('message', function(message) {
message.connection = c; message.connection = c;
var callback = c.ldap.messages[message.messageID]; var callback = c.ldap.messages[message.messageID];
if (!callback) { if (!callback) {
log.error('%s: unsolicited message: %j', c.ldap.id, message.json); log.error('%s: unsolicited message: %j', c.ldap.id, message.json);
return; return;