Refine client reconnection logic

- Emit setupError for errors during client setup
- Client accepts more generic options.reconnect
- Fix unbind hang in client.destroy
- Add tests for client reconnect/setup scenarios
This commit is contained in:
Patrick Mooney 2014-06-26 10:11:07 -05:00
parent 9e2bbe1072
commit 28d3ed86e1
2 changed files with 212 additions and 39 deletions

View File

@ -195,10 +195,13 @@ function Client(options) {
this.connectTimeout = parseInt((options.connectTimeout || 0), 10);
this.idleTimeout = parseInt((options.idleTimeout || 0), 10);
if (options.reconnect) {
// Fall back to defaults if options.reconnect === true
var rOpts = (typeof (options.reconnect) === 'object') ?
options.reconnect : {};
this.reconnect = {
initialDelay: parseInt(options.reconnect.initialDelay || 100, 10),
maxDelay: parseInt(options.reconnect.maxDelay || 10000, 10),
failAfter: parseInt(options.reconnect.failAfter || 0, 10)
initialDelay: parseInt(rOpts.initialDelay || 100, 10),
maxDelay: parseInt(rOpts.maxDelay || 10000, 10),
failAfter: parseInt(rOpts.failAfter, 10) || Infinity
};
}
@ -704,7 +707,11 @@ Client.prototype.destroy = function destroy() {
cb(new Error('client destroyed'));
}
});
this.unbind();
if (this.connected) {
this.unbind();
} else if (this.socket) {
this.socket.destroy();
}
this.emit('destroy');
};
@ -723,7 +730,7 @@ Client.prototype._connect = function _connect() {
var socket;
// Establish basic socket connection
function connectSocket(_, cb) {
function connectSocket(cb) {
cb = once(cb);
function onResult(err, res) {
@ -867,6 +874,10 @@ Client.prototype._connect = function _connect() {
},
inputs: setupSteps
}, function (err, result) {
if (err) {
// Users may wish to take specific actions if setup steps fail.
self.emit('setupError', err);
}
cb(err, socket);
});
} else {
@ -907,44 +918,55 @@ Client.prototype._connect = function _connect() {
});
}
var retry = backoff.call(connectSocket, {}, function (err, res) {
self.connecting = false;
if (!err) {
postSetup();
self.connected = true;
self.emit('connect', socket);
self.log.debug('connected after %d attempts', retry.getNumRetries());
// Flush any queued requests
self._flushQueue();
} else {
self.log.debug('failed to connect after %d attempts',
retry.getNumRetries());
// Communicate the last-encountered error
if (err instanceof ConnectionError) {
self.emit('connectTimeout');
} else {
self.emit('error', err);
}
}
});
var retry;
var failAfter;
if (this.reconnect) {
retry.setStrategy(new backoff.ExponentialStrategy({
initialDelay: this.reconnect.minDelay,
retry = backoff.exponential({
initialDelay: this.reconnect.initialDelay,
maxDelay: this.reconnect.maxDelay
}));
retry.failAfter(this.reconnect.failAfter || Infinity);
});
failAfter = this.reconnect.failAfter;
} else {
// Only attempt the connection once for non-reconnection clients
retry.failAfter(1);
retry = backoff.exponential({
initialDelay: 1,
maxDelay: 2
});
failAfter = 1;
}
this.connecting = true;
retry.start();
// Abort reconnection attempts if client is destroyed
this.on('destroy', function () {
if (retry.isRunning()) {
retry.abort();
retry.failAfter(failAfter);
retry.on('ready', function (num, delay) {
if (self.destroyed) {
return;
}
connectSocket(function (err) {
self.connecting = false;
if (!err) {
postSetup();
self.connected = true;
self.emit('connect', socket);
self.log.debug('connected after %d attempt(s)', num+1);
// Flush any queued requests
self._flushQueue();
self._connectRetry = null;
} else {
retry.backoff(err);
}
});
});
retry.on('fail', function (err) {
self.log.debug('failed to connect after %d attempts', failAfter);
// Communicate the last-encountered error
if (err instanceof ConnectionError) {
self.emit('connectTimeout');
} else {
self.emit('error', err);
}
});
this._connectRetry = retry;
this.connecting = true;
retry.backoff();
};
/**

View File

@ -4,6 +4,7 @@ var Logger = require('bunyan');
var test = require('tap').test;
var uuid = require('node-uuid');
var vasync = require('vasync');
///--- Globals
@ -147,7 +148,6 @@ test('setup', function (t) {
client = ldap.createClient({
connectTimeout: parseInt(process.env.LDAP_CONNECT_TIMEOUT || 0, 10),
socketPath: SOCKET,
idleTimeoutMillis: 10,
log: new Logger({
name: 'ldapjs_unit_test',
stream: process.stderr,
@ -618,11 +618,131 @@ test('idle timeout', function (t) {
});
test('setup action', function (t) {
var setupClient = ldap.createClient({
connectTimeout: parseInt(process.env.LDAP_CONNECT_TIMEOUT || 0, 10),
socketPath: SOCKET,
log: new Logger({
name: 'ldapjs_unit_test',
stream: process.stderr,
level: (process.env.LOG_LEVEL || 'info'),
serializers: Logger.stdSerializers,
src: true
})
});
setupClient.on('setup', function (clt, cb) {
clt.bind(BIND_DN, BIND_PW, function (err, res) {
t.ifError(err);
cb(err);
});
});
setupClient.search(SUFFIX, {scope: 'base'}, function (err, res) {
t.ifError(err);
t.ok(res);
res.on('end', function () {
setupClient.destroy();
t.end();
});
});
});
test('setup reconnect', function (t) {
var rClient = ldap.createClient({
connectTimeout: parseInt(process.env.LDAP_CONNECT_TIMEOUT || 0, 10),
socketPath: SOCKET,
reconnect: true,
log: new Logger({
name: 'ldapjs_unit_test',
stream: process.stderr,
level: (process.env.LOG_LEVEL || 'info'),
serializers: Logger.stdSerializers,
src: true
})
});
rClient.on('setup', function (clt, cb) {
clt.bind(BIND_DN, BIND_PW, function (err, res) {
t.ifError(err);
cb(err);
});
});
function doSearch(_, cb) {
rClient.search(SUFFIX, {scope: 'base'}, function (err, res) {
t.ifError(err);
res.on('end', function () {
cb();
});
});
}
vasync.pipeline({
funcs: [
doSearch,
function cleanDisconnect(_, cb) {
t.ok(rClient.connected);
rClient.once('close', function (had_err) {
t.ifError(had_err);
t.equal(rClient.connected, false);
cb();
});
rClient.unbind();
},
doSearch,
function simulateError(_, cb) {
var msg = 'fake socket error';
rClient.once('error', function (err) {
t.equal(err.message, msg);
t.ok(err);
});
rClient.once('close', function (had_err) {
// can't test had_err because the socket error is being faked
cb();
});
rClient.socket.emit('error', new Error(msg));
},
doSearch
]
}, function (err, res) {
t.ifError(err);
rClient.destroy();
t.end();
});
});
test('setup abort', function (t) {
var setupClient = ldap.createClient({
connectTimeout: parseInt(process.env.LDAP_CONNECT_TIMEOUT || 0, 10),
socketPath: SOCKET,
reconnect: true,
log: new Logger({
name: 'ldapjs_unit_test',
stream: process.stderr,
level: (process.env.LOG_LEVEL || 'info'),
serializers: Logger.stdSerializers,
src: true
})
});
var message = 'It\'s a trap!';
setupClient.on('setup', function (clt, cb) {
// simulate failure
t.ok(clt);
cb(new Error(message));
});
setupClient.on('setupError', function (err) {
t.ok(true);
t.equal(err.message, message);
setupClient.destroy();
t.end();
});
});
test('abort reconnect', function (t) {
var abortClient = ldap.createClient({
connectTimeout: parseInt(process.env.LDAP_CONNECT_TIMEOUT || 0, 10),
socketPath: '/dev/null',
reconnect: {},
reconnect: true,
log: new Logger({
name: 'ldapjs_unit_test',
stream: process.stderr,
@ -646,6 +766,37 @@ test('abort reconnect', function (t) {
});
test('reconnect max retries', function (t) {
var RETRIES = 5;
var rClient = ldap.createClient({
connectTimeout: 100,
socketPath: '/dev/null',
reconnect: {
failAfter: RETRIES,
// Keep the test duration low
initialDelay: 10,
maxDelay: 100
},
log: new Logger({
name: 'ldapjs_unit_test',
stream: process.stderr,
level: (process.env.LOG_LEVEL || 'info'),
serializers: Logger.stdSerializers,
src: true
})
});
var count = 0;
rClient.on('connectError', function () {
count++;
});
rClient.on('error', function (err) {
t.equal(count, RETRIES);
rClient.destroy();
t.end();
});
});
test('abandon (GH-27)', function (t) {
client.abandon(401876543, function (err) {
t.ifError(err);