Client refactoring. Cut reconnect logic and force users to listen for connect event.

This commit is contained in:
Mark Cavage 2012-02-18 14:58:40 -08:00
parent 15c6e32801
commit 9819353042
2 changed files with 242 additions and 350 deletions

View File

@ -42,7 +42,7 @@ var Parser = messages.Parser;
var Filter = filters.Filter;
var PresenceFilter = filters.PresenceFilter;
var CMP_EXPECT = [errors.LDAP_COMPARE_TRUE, errors.LDAP_COMPARE_FALSE];
var MAX_MSGID = Math.pow(2, 31) - 1;
@ -115,143 +115,32 @@ function Client(options) {
throw new TypeError('options.log must be an object');
if (!xor(options.url, options.socketPath))
throw new TypeError('options.url ^ options.socketPath required');
throw new TypeError('options.url ^ options.socketPath (String) required');
EventEmitter.call(this, options);
var self = this;
this.secure = false;
if (options.url) {
this.url = url.parse(options.url);
this.secure = this.url.secure;
}
if (options.url)
options.url = url.parse(options.url);
this.connection = null;
this.connectTimeout = options.connectTimeout || false;
this.connectOptions = {
port: self.url ? self.url.port : options.socketPath,
host: self.url ? self.url.hostname : undefined,
port: options.url ? options.url.port : options.socketPath,
host: options.url ? options.url.hostname : undefined,
socketPath: options.socketPath || undefined
};
this.log = options.log;
this.reconnect = (typeof (options.reconnect) === 'number' ?
options.reconnect : 1000);
this.shutdown = false;
this.secure = options.url ? options.url.secure : false;
this.timeout = options.timeout || false;
this.url = options.url || false;
return this.connect(function () {});
// We'll emit a connect event when this is done
this.connect();
}
util.inherits(Client, EventEmitter);
module.exports = Client;
/**
* Connects this client, either at construct time, or after an unbind has
* been called. Under normal circumstances you don't need to call this method.
*
* @param {Function} callback invoked when `connect()` is done.
*/
Client.prototype.connect = function (callback) {
if (this.connection)
return callback();
var self = this;
var timer = false;
if (this.connectTimeout) {
timer = setTimeout(function () {
if (self.connection)
self.connection.destroy();
var err = new ConnectionError('timeout');
self.emit('connectTimeout');
return callback(err);
}, this.connectTimeout);
}
this.connection = this._newConnection();
function reconnect() {
self.connection = null;
if (self.reconnect) {
setTimeout(function () {
self.connect(function () {});
}, self.reconnect);
}
}
self.connection.on('close', function (had_err) {
self.emit('close', had_err);
reconnect();
});
self.connection.on('connect', function () {
if (timer)
clearTimeout(timer);
if (self._bindDN && self._credentials)
return self.bind(self._bindDN, self._credentials, function (err) {
if (err) {
self.log.error('Unable to bind(on(\'connect\')): %s', err.stack);
self.connection.end();
}
return callback();
});
return callback();
});
return false;
};
/**
* Performs a simple authentication against the server.
*
* @param {String} name the DN to bind as.
* @param {String} credentials the userPassword associated with name.
* @param {Control} controls (optional) either a Control or [Control].
* @param {Function} callback of the form f(err, res).
* @param {Socket} conn don't use this. Internal only (reconnects).
* @throws {TypeError} on invalid input.
*/
Client.prototype.bind = function (name, credentials, controls, callback, conn) {
if (typeof (name) !== 'string' && !(name instanceof dn.DN))
throw new TypeError('name (string) required');
if (typeof (credentials) !== 'string')
throw new TypeError('credentials (string) required');
if (typeof (controls) === 'function') {
callback = controls;
controls = [];
} else {
controls = validateControls(controls);
}
if (typeof (callback) !== 'function')
throw new TypeError('callback (function) required');
var self = this;
this.connect(function () {
var req = new BindRequest({
name: name || '',
authentication: 'Simple',
credentials: credentials || '',
controls: controls
});
return self._send(req, [errors.LDAP_SUCCESS], function (err, res) {
if (!err) { // In case we need to reconnect later
self._bindDN = name;
self._credentials = credentials;
}
return callback(err, res);
}, conn);
});
};
/**
* Sends an abandon request to the LDAP server.
*
@ -263,7 +152,7 @@ Client.prototype.bind = function (name, credentials, controls, callback, conn) {
* @param {Function} callback of the form f(err).
* @throws {TypeError} on invalid input.
*/
Client.prototype.abandon = function (messageID, controls, callback) {
Client.prototype.abandon = function abandon(messageID, controls, callback) {
if (typeof (messageID) !== 'number')
throw new TypeError('messageID (number) required');
if (typeof (controls) === 'function') {
@ -280,7 +169,7 @@ Client.prototype.abandon = function (messageID, controls, callback) {
controls: controls
});
return this._send(req, 'abandon', callback);
return this._send(req, 'abandon', null, callback);
};
@ -297,7 +186,7 @@ Client.prototype.abandon = function (messageID, controls, callback) {
* @param {Function} callback of the form f(err, res).
* @throws {TypeError} on invalid input.
*/
Client.prototype.add = function (name, entry, controls, callback) {
Client.prototype.add = function add(name, entry, controls, callback) {
if (typeof (name) !== 'string')
throw new TypeError('name (string) required');
if (typeof (entry) !== 'object')
@ -339,7 +228,41 @@ Client.prototype.add = function (name, entry, controls, callback) {
controls: controls
});
return this._send(req, [errors.LDAP_SUCCESS], callback);
return this._send(req, [errors.LDAP_SUCCESS], null, callback);
};
/**
* Performs a simple authentication against the server.
*
* @param {String} name the DN to bind as.
* @param {String} credentials the userPassword associated with name.
* @param {Control} controls (optional) either a Control or [Control].
* @param {Function} callback of the form f(err, res).
* @throws {TypeError} on invalid input.
*/
Client.prototype.bind = function bind(name, credentials, controls, callback) {
if (typeof (name) !== 'string' && !(name instanceof dn.DN))
throw new TypeError('name (string) required');
if (typeof (credentials) !== 'string')
throw new TypeError('credentials (string) required');
if (typeof (controls) === 'function') {
callback = controls;
controls = [];
} else {
controls = validateControls(controls);
}
if (typeof (callback) !== 'function')
throw new TypeError('callback (function) required');
var req = new BindRequest({
name: name || '',
authentication: 'Simple',
credentials: credentials || '',
controls: controls
});
return this._send(req, [errors.LDAP_SUCCESS], null, callback);
};
@ -353,7 +276,11 @@ Client.prototype.add = function (name, entry, controls, callback) {
* @param {Function} callback of the form f(err, boolean, res).
* @throws {TypeError} on invalid input.
*/
Client.prototype.compare = function (name, attr, value, controls, callback) {
Client.prototype.compare = function compare(name,
attr,
value,
controls,
callback) {
if (typeof (name) !== 'string')
throw new TypeError('name (string) required');
if (typeof (attr) !== 'string')
@ -376,16 +303,12 @@ Client.prototype.compare = function (name, attr, value, controls, callback) {
controls: controls
});
function _callback(err, res) {
return this._send(req, CMP_EXPECT, null, function (err, res) {
if (err)
return callback(err);
return callback(null, (res.status === errors.LDAP_COMPARE_TRUE), res);
}
return this._send(req,
[errors.LDAP_COMPARE_TRUE, errors.LDAP_COMPARE_FALSE],
_callback);
});
};
@ -397,7 +320,7 @@ Client.prototype.compare = function (name, attr, value, controls, callback) {
* @param {Function} callback of the form f(err, res).
* @throws {TypeError} on invalid input.
*/
Client.prototype.del = function (name, controls, callback) {
Client.prototype.del = function del(name, controls, callback) {
if (typeof (name) !== 'string')
throw new TypeError('name (string) required');
if (typeof (controls) === 'function') {
@ -414,7 +337,7 @@ Client.prototype.del = function (name, controls, callback) {
controls: controls
});
return this._send(req, [errors.LDAP_SUCCESS], callback);
return this._send(req, [errors.LDAP_SUCCESS], null, callback);
};
@ -431,7 +354,7 @@ Client.prototype.del = function (name, controls, callback) {
* @param {Function} callback of the form f(err, value, res).
* @throws {TypeError} on invalid input.
*/
Client.prototype.exop = function (name, value, controls, callback) {
Client.prototype.exop = function exop(name, value, controls, callback) {
if (typeof (name) !== 'string')
throw new TypeError('name (string) required');
if (typeof (value) === 'function') {
@ -456,14 +379,12 @@ Client.prototype.exop = function (name, value, controls, callback) {
controls: controls
});
function _callback(err, res) {
return this._send(req, [errors.LDAP_SUCCESS], null, function (err, res) {
if (err)
return callback(err);
return callback(null, res.responseValue || '', res);
}
return this._send(req, [errors.LDAP_SUCCESS], _callback);
});
};
@ -476,7 +397,7 @@ Client.prototype.exop = function (name, value, controls, callback) {
* @param {Function} callback of the form f(err, res).
* @throws {TypeError} on invalid input.
*/
Client.prototype.modify = function (name, change, controls, callback) {
Client.prototype.modify = function modify(name, change, controls, callback) {
if (typeof (name) !== 'string')
throw new TypeError('name (string) required');
if (typeof (change) !== 'object')
@ -529,7 +450,7 @@ Client.prototype.modify = function (name, change, controls, callback) {
controls: controls
});
return this._send(req, [errors.LDAP_SUCCESS], callback);
return this._send(req, [errors.LDAP_SUCCESS], null, callback);
};
@ -547,7 +468,10 @@ Client.prototype.modify = function (name, change, controls, callback) {
* @param {Function} callback of the form f(err, res).
* @throws {TypeError} on invalid input.
*/
Client.prototype.modifyDN = function (name, newName, controls, callback) {
Client.prototype.modifyDN = function modifyDN(name,
newName,
controls,
callback) {
if (typeof (name) !== 'string')
throw new TypeError('name (string) required');
if (typeof (newName) !== 'string')
@ -577,7 +501,7 @@ Client.prototype.modifyDN = function (name, newName, controls, callback) {
req.newRdn = newDN;
}
return this._send(req, [errors.LDAP_SUCCESS], callback);
return this._send(req, [errors.LDAP_SUCCESS], null, callback);
};
@ -604,7 +528,7 @@ Client.prototype.modifyDN = function (name, newName, controls, callback) {
* @param {Function} callback of the form f(err, res).
* @throws {TypeError} on invalid input.
*/
Client.prototype.search = function (base, options, controls, callback) {
Client.prototype.search = function search(base, options, controls, callback) {
if (typeof (base) !== 'string' && !(base instanceof dn.DN))
throw new TypeError('base (string) required');
if (Array.isArray(options) || (options instanceof Control)) {
@ -647,6 +571,7 @@ Client.prototype.search = function (base, options, controls, callback) {
}
}
}
var req = new SearchRequest({
baseObject: typeof (base) === 'string' ? dn.parse(base) : base,
scope: options.scope || 'base',
@ -659,31 +584,7 @@ Client.prototype.search = function (base, options, controls, callback) {
controls: controls
});
if (!this.connection)
return callback(new ConnectionError('no connection'));
var res = new EventEmitter();
// This is some whacky logic to account for the connection not being
// reconnected, and having thrown an error like "NotWriteable". Because
// the event emitter logic will never block, we'll end up returning from
// the event.on('error'), rather than "normally".
var done = false;
function errorIfNoConn(err) {
if (done)
return false;
done = true;
return callback(err);
}
res.once('error', errorIfNoConn);
this._send(req, [errors.LDAP_SUCCESS], res);
done = true;
res.removeListener('error', errorIfNoConn);
return callback(null, res);
return this._send(req, [errors.LDAP_SUCCESS], new EventEmitter(), callback);
};
@ -696,194 +597,81 @@ Client.prototype.search = function (base, options, controls, callback) {
* @param {Function} callback of the form f(err).
* @throws {TypeError} if you pass in callback as not a function.
*/
Client.prototype.unbind = function (callback) {
if (callback && typeof (callback) !== 'function')
throw new TypeError('callback must be a function');
Client.prototype.unbind = function unbind(callback) {
if (!callback)
callback = function () { self.log.trace('disconnected'); };
callback = function () {};
var self = this;
this.reconnect = false;
this._bindDN = null;
this._credentials = null;
if (typeof (callback) !== 'function')
throw new TypeError('callback must be a function');
if (!this.connection)
return callback();
var req = new UnbindRequest();
return self._send(req, 'unbind', callback);
return this._send(req, 'unbind', null, callback);
};
Client.prototype._send = function (message, expect, callback, connection) {
assert.ok(message);
assert.ok(expect);
assert.ok(callback);
var conn = connection || this.connection;
/**
* Connects this client, either at construct time, or after an unbind has
* been called. Under normal circumstances you don't need to call this method.
*
* @param {Function} (optional) callback invoked when `connect` is emitted.
*/
Client.prototype.connect = function connect(callback) {
var c = null;
var log = this.log;
var opts = this.connectOptions;
var proto = this.secure ? tls : net;
var self = this;
var timer;
var timer = false;
function closeConn(err) {
c = proto.connect(opts.port, opts.host);
if (this.connectTimeout) {
timer = setTimeout(function () {
c.destroy();
self.emit('connectTimeout', new ConnectionError('timeout'));
}, this.connectTimeout);
}
if (typeof (c.setKeepAlive) !== 'function') {
c.setKeepAlive = function setKeepAlive(enable, delay) {
return c.socket ? c.socket.setKeepAlive(enable, delay) : false;
};
}
c.ldap = {
id: self.url ? self.url.href : opts.socketPath,
messageID: 0,
messages: {},
get nextMessageID() {
if (++c.ldap.messageID >= MAX_MSGID)
c.ldap.messageID = 1;
return c.ldap.messageID;
},
parser: new Parser({
log: self.log
})
};
c.on('connect', function () {
if (timer)
clearTimeout(timer);
err = err || new ConnectionError('no connection');
assert.ok(c.ldap);
if (typeof (callback) === 'function') {
callback(err);
} else {
callback.emit('error', err);
}
c.ldap.id += c.fd ? (':' + c.fd) : '';
if (conn)
conn.destroy();
}
if (!conn)
return closeConn();
// Now set up the callback in the messages table
message.messageID = conn.ldap.nextMessageID;
if (expect !== 'abandon') {
conn.ldap.messages[message.messageID] = function (res) {
if (timer)
clearTimeout(timer);
if (self.log.debug())
self.log.debug({res: res.json}, '%s: response received', conn.ldap.id);
var err = null;
if (res instanceof LDAPResult) {
delete conn.ldap.messages[message.messageID];
if (expect.indexOf(res.status) === -1) {
err = errors.getError(res);
if (typeof (callback) === 'function')
return callback(err);
return callback.emit('error', err);
}
if (typeof (callback) === 'function')
return callback(null, res);
callback.emit('end', res);
} else if (res instanceof SearchEntry) {
assert.ok(callback instanceof EventEmitter);
callback.emit('searchEntry', res);
} else if (res instanceof SearchReference) {
assert.ok(callback instanceof EventEmitter);
callback.emit('searchReference', res);
} else if (res instanceof Error) {
if (typeof (callback) === 'function')
return callback(res);
assert.ok(callback instanceof EventEmitter);
callback.emit('error', res);
} else {
delete conn.ldap.messages[message.messageID];
err = new errors.ProtocolError(res.type);
if (typeof (callback) === 'function')
return callback(err);
callback.emit('error', err);
}
return false;
};
}
// If there's a user specified timeout, pick that up
if (this.timeout) {
timer = setTimeout(function () {
self.emit('timeout', message);
if (conn.ldap.messages[message.messageID])
conn.ldap.messages[message.messageID](new LDAPResult({
status: 80, // LDAP_OTHER
errorMessage: 'request timeout (client interrupt)'
}));
}, this.timeout);
}
try {
// Note if this was an unbind, we just go ahead and end, since there
// will never be a response
var _writeCb = null;
if (expect === 'abandon') {
_writeCb = function () {
return callback();
};
} else if (expect === 'unbind') {
_writeCb = function () {
conn.unbindMessageID = message.id;
conn.end();
};
}
// Finally send some data
if (this.log.debug())
this.log.debug({msg: message.json}, '%s: sending request', conn.ldap.id);
return conn.write(message.toBer(), _writeCb);
} catch (e) {
return closeConn(e);
}
};
Client.prototype._newConnection = function () {
var c;
var connectOpts = this.connectOptions;
var log = this.log;
var self = this;
if (this.secure) {
c = tls.connect(connectOpts.port, connectOpts.host, function () {
if (log.trace())
log.trace('%s connect event', c.ldap.id);
c.ldap.connected = true;
c.ldap.id += c.fd ? (':' + c.fd) : '';
c.emit('connect', c.ldap.id);
});
c.setKeepAlive = function (enable, delay) {
return c.socket.setKeepAlive(enable, delay);
};
} else {
c = net.createConnection(connectOpts.port, connectOpts.host);
}
assert.ok(c);
c.parser = new Parser({
log: self.log
});
// Wrap the events
c.ldap = {
id: self.url ? self.url.hostname : connectOpts.socketPath,
messageID: 0,
messages: {}
};
c.ldap.__defineGetter__('nextMessageID', function () {
if (++c.ldap.messageID >= MAX_MSGID)
c.ldap.messageID = 1;
return c.ldap.messageID;
});
c.on('connect', function () {
if (log.trace())
log.trace('%s connect event', c.ldap.id);
c.ldap.connected = true;
c.ldap.id += c.fd ? (':' + c.fd) : '';
self.emit('connect', c.ldap.id);
self.connection = c;
self.emit('connect', c);
return (typeof (callback) === 'function' ? callback(null, c) : false);
});
c.on('end', function () {
@ -893,6 +681,8 @@ Client.prototype._newConnection = function () {
c.end();
});
// On close we have to walk the outstanding messages and go invoke their
// callback with an error
c.on('close', function (had_err) {
if (log.trace())
log.trace('%s close event had_err=%s', c.ldap.id, had_err ? 'yes' : 'no');
@ -918,8 +708,8 @@ Client.prototype._newConnection = function () {
delete c.ldap.messages[msgid];
}
delete c.ldap.parser;
delete c.ldap;
delete c.parser;
return false;
});
});
@ -946,25 +736,24 @@ Client.prototype._newConnection = function () {
if (log.trace())
log.trace('%s data event: %s', c.ldap.id, util.inspect(data));
c.parser.write(data);
c.ldap.parser.write(data);
});
// The "router"
c.parser.on('message', function (message) {
c.ldap.parser.on('message', function (message) {
message.connection = c;
var callback = c.ldap.messages[message.messageID];
if (!callback) {
log.error('%s: unsolicited message: %j', c.ldap.id, message.json);
log.error({message: message.json}, '%s: unsolicited message', c.ldap.id);
return false;
}
return callback(message);
});
c.parser.on('error', function (err) {
if (log.trace())
log.trace({err: err}, '%s error event', c.ldap.id);
c.ldap.parser.on('error', function (err) {
log.debug({err: err}, '%s parser error event', c.ldap.id);
if (self.listeners('error').length)
self.emit('error', err);
@ -974,3 +763,106 @@ Client.prototype._newConnection = function () {
return c;
};
Client.prototype._send = function _send(message, expect, emitter, callback) {
assert.ok(message);
assert.ok(expect);
assert.ok(typeof (emitter) !== undefined);
assert.ok(callback);
var conn = this.connection;
var self = this;
var timer = false;
if (!conn)
return callback(new ConnectionError('no socket'));
message.messageID = conn.ldap.nextMessageID;
conn.ldap.messages[message.messageID] = function messageCallback(res) {
if (timer)
clearTimeout(timer);
if (expect === 'abandon')
return callback(null);
if (self.log.debug())
self.log.debug({res: res.json}, '%s: response received', conn.ldap.id);
var err = null;
if (res instanceof LDAPResult) {
delete conn.ldap.messages[message.messageID];
if (expect.indexOf(res.status) === -1) {
err = errors.getError(res);
if (emitter)
return emitter.emit('error', err);
return callback(err);
}
if (emitter)
return emitter.emit('end', res);
return callback(null, res);
} else if (res instanceof SearchEntry || res instanceof SearchReference) {
assert.ok(emitter);
var event = res.constructor.name;
event = event[0].toLowerCase() + event.slice(1);
return emitter.emit(event, res);
} else if (res instanceof Error) {
if (emitter)
return emitter.emit('error', res);
return callback(res);
}
delete conn.ldap.messages[message.messageID];
err = new errors.ProtocolError(res.type);
if (emitter)
return emitter.emit('error', err);
return callback(err);
};
// If there's a user specified timeout, pick that up
if (this.timeout) {
timer = setTimeout(function () {
self.emit('timeout', message);
if (conn.ldap.messages[message.messageID]) {
conn.ldap.messages[message.messageID](new LDAPResult({
status: 80, // LDAP_OTHER
errorMessage: 'request timeout (client interrupt)'
}));
}
}, this.timeout);
}
try {
// Finally send some data
if (this.log.debug())
this.log.debug({msg: message.json}, '%s: sending request', conn.ldap.id);
return conn.write(message.toBer(), function writeCallback() {
if (expect === 'abandon') {
return callback(null);
} else if (expect === 'unbind') {
conn.unbindMessageID = message.id;
conn.end();
} else if (emitter) {
return callback(null, emitter);
}
return false;
});
} catch (e) {
if (timer)
clearTimeout(timer);
conn.destroy();
delete self.connection;
return callback(e);
}
};

View File

@ -119,12 +119,12 @@ test('GH-55 Client emits connect multiple times', function (t) {
c.on('connect', function (socket) {
t.ok(socket);
count++;
});
c.bind('cn=root', 'secret', function (err, res) {
t.ifError(err);
c.unbind(function () {
t.equal(count, 1);
t.end();
c.bind('cn=root', 'secret', function (err, res) {
t.ifError(err);
c.unbind(function () {
t.equal(count, 1);
t.end();
});
});
});
});