Improve client request state tracking
- Handle abandoned requests properly - Re-enable client abandon test - Report client idleness correctly (even after abandons) Fix mcavage/node-ldapjs#211
This commit is contained in:
parent
6607d83b86
commit
2939fa1fad
|
@ -141,7 +141,7 @@ RequestQueue.prototype.flush = function flush(cb) {
|
||||||
*/
|
*/
|
||||||
RequestQueue.prototype.purge = function purge() {
|
RequestQueue.prototype.purge = function purge() {
|
||||||
this.flush(function (msg, expect, emitter, cb) {
|
this.flush(function (msg, expect, emitter, cb) {
|
||||||
cb(new Error('RequestQueue timeout'));
|
cb(new errors.TimeoutError('request queue timeout'));
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -160,6 +160,118 @@ RequestQueue.prototype.thaw = function thaw() {
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Track message callback by messageID.
|
||||||
|
*/
|
||||||
|
function MessageTracker(opts) {
|
||||||
|
assert.object(opts);
|
||||||
|
assert.string(opts.id);
|
||||||
|
assert.object(opts.parser);
|
||||||
|
|
||||||
|
this.id = opts.id;
|
||||||
|
this._msgid = 0;
|
||||||
|
this._messages = {};
|
||||||
|
this._abandoned = {};
|
||||||
|
this.parser = opts.parser;
|
||||||
|
|
||||||
|
var self = this;
|
||||||
|
this.__defineGetter__('pending', function () {
|
||||||
|
return Object.keys(self._messages);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record a messageID and callback.
|
||||||
|
*/
|
||||||
|
MessageTracker.prototype.track = function track(message, callback) {
|
||||||
|
var msgid = this._nextID();
|
||||||
|
message.messageID = msgid;
|
||||||
|
this._messages[msgid] = callback;
|
||||||
|
return msgid;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetch callback based on messageID.
|
||||||
|
*/
|
||||||
|
MessageTracker.prototype.fetch = function fetch(msgid) {
|
||||||
|
var msg = this._messages[msgid];
|
||||||
|
if (msg) {
|
||||||
|
this._purgeAbandoned(msgid);
|
||||||
|
return msg;
|
||||||
|
}
|
||||||
|
// It's possible that the server has not received the abandon request yet.
|
||||||
|
// While waiting for evidence that the abandon has been received, incoming
|
||||||
|
// messages that match the abandoned msgid will be handled as normal.
|
||||||
|
msg = this._abandoned[msgid];
|
||||||
|
if (msg) {
|
||||||
|
return msg.cb;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cease tracking for a given messageID.
|
||||||
|
*/
|
||||||
|
MessageTracker.prototype.remove = function remove(msgid) {
|
||||||
|
if (this._messages[msgid]) {
|
||||||
|
delete this._messages[msgid];
|
||||||
|
} else if (this._abandoned[msgid]) {
|
||||||
|
delete this._abandoned[msgid];
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mark a messageID as abandoned.
|
||||||
|
*/
|
||||||
|
MessageTracker.prototype.abandon = function abandonMsg(msgid) {
|
||||||
|
if (this._messages[msgid]) {
|
||||||
|
// Keep track of "when" the message was abandoned
|
||||||
|
this._abandoned[msgid] = {
|
||||||
|
age: this._msgid,
|
||||||
|
cb: this._messages[msgid]
|
||||||
|
};
|
||||||
|
delete this._messages[msgid];
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Purge old items from abandoned list.
|
||||||
|
*/
|
||||||
|
MessageTracker.prototype._purgeAbandoned = function _purgeAbandoned(msgid) {
|
||||||
|
var self = this;
|
||||||
|
// Is (comp >= ref) according to sliding window
|
||||||
|
function geWindow(ref, comp) {
|
||||||
|
var max = ref + (MAX_MSGID/2);
|
||||||
|
var min = ref;
|
||||||
|
if (max >= MAX_MSGID) {
|
||||||
|
// Handle roll-over
|
||||||
|
max = max - MAX_MSGID - 1;
|
||||||
|
return ((comp <= max) || (comp >= min));
|
||||||
|
} else {
|
||||||
|
return ((comp <= max) && (comp >= min));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Object.keys(this._abandoned).forEach(function (id) {
|
||||||
|
// Abandoned messageIDs can be forgotten if a received messageID is "newer"
|
||||||
|
if (geWindow(self._abandoned[id].age, msgid)) {
|
||||||
|
self._abandoned[id].cb(new errors.AbandonedError(
|
||||||
|
'client request abandoned'));
|
||||||
|
delete self._abandoned[id];
|
||||||
|
}
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocate the next messageID according to a sliding window.
|
||||||
|
*/
|
||||||
|
MessageTracker.prototype._nextID = function _nextID() {
|
||||||
|
if (++this._msgid >= MAX_MSGID)
|
||||||
|
this._msgid = 1;
|
||||||
|
|
||||||
|
return this._msgid;
|
||||||
|
};
|
||||||
|
|
||||||
///--- API
|
///--- API
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -806,20 +918,10 @@ Client.prototype._connect = function _connect() {
|
||||||
|
|
||||||
// Initialize socket events and LDAP parser.
|
// Initialize socket events and LDAP parser.
|
||||||
function initSocket() {
|
function initSocket() {
|
||||||
socket.ldap = {
|
socket.ldap = new MessageTracker({
|
||||||
id: self.url ? self.url.href : self.socketPath,
|
id: self.url ? self.url.href : self.socketPath,
|
||||||
messageID: 0,
|
parser: new Parser({log: log})
|
||||||
messages: {},
|
});
|
||||||
getNextMessageID: function getNextMessageID() {
|
|
||||||
if (++socket.ldap.messageID >= MAX_MSGID)
|
|
||||||
socket.ldap.messageID = 1;
|
|
||||||
|
|
||||||
return socket.ldap.messageID;
|
|
||||||
},
|
|
||||||
parser: new Parser({
|
|
||||||
log: log
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
// This won't be set on TLS. So. Very. Annoying.
|
// This won't be set on TLS. So. Very. Annoying.
|
||||||
if (typeof (socket.setKeepAlive) !== 'function') {
|
if (typeof (socket.setKeepAlive) !== 'function') {
|
||||||
|
@ -839,7 +941,7 @@ Client.prototype._connect = function _connect() {
|
||||||
// The "router"
|
// The "router"
|
||||||
socket.ldap.parser.on('message', function onMessage(message) {
|
socket.ldap.parser.on('message', function onMessage(message) {
|
||||||
message.connection = socket;
|
message.connection = socket;
|
||||||
var callback = socket.ldap.messages[message.messageID];
|
var callback = socket.ldap.fetch(message.messageID);
|
||||||
|
|
||||||
if (!callback) {
|
if (!callback) {
|
||||||
log.error({message: message.json}, 'unsolicited message');
|
log.error({message: message.json}, 'unsolicited message');
|
||||||
|
@ -1020,29 +1122,20 @@ Client.prototype._onClose = function _onClose(had_err) {
|
||||||
this.emit('close', had_err);
|
this.emit('close', had_err);
|
||||||
// On close we have to walk the outstanding messages and go invoke their
|
// On close we have to walk the outstanding messages and go invoke their
|
||||||
// callback with an error.
|
// callback with an error.
|
||||||
Object.keys(socket.ldap.messages).forEach(function (msgid) {
|
socket.ldap.pending.forEach(function (msgid) {
|
||||||
var err;
|
var cb = socket.ldap.fetch(msgid);
|
||||||
|
socket.ldap.remove(msgid);
|
||||||
|
|
||||||
if (socket.unbindMessageID !== parseInt(msgid, 10)) {
|
if (socket.unbindMessageID !== parseInt(msgid, 10)) {
|
||||||
err = new ConnectionError(socket.ldap.id + ' closed');
|
return cb(new ConnectionError(socket.ldap.id + ' closed'));
|
||||||
} else {
|
} else {
|
||||||
// Unbinds will be communicated as a success since we're closed
|
// Unbinds will be communicated as a success since we're closed
|
||||||
err = new UnbindResponse({
|
var unbind = new UnbindResponse({messageID: msgid});
|
||||||
messageID: msgid
|
unbind.status = 'unbind';
|
||||||
});
|
return cb(unbind);
|
||||||
err.status = 'unbind';
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (typeof (socket.ldap.messages[msgid]) === 'function') {
|
|
||||||
var callback = socket.ldap.messages[msgid];
|
|
||||||
delete socket.ldap.messages[msgid];
|
|
||||||
return callback(err);
|
|
||||||
} else if (socket.ldap.messages[msgid]) {
|
|
||||||
if (err instanceof Error)
|
|
||||||
socket.ldap.messages[msgid].emit('error', err);
|
|
||||||
delete socket.ldap.messages[msgid];
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
});
|
});
|
||||||
|
|
||||||
delete socket.ldap.parser;
|
delete socket.ldap.parser;
|
||||||
delete socket.ldap;
|
delete socket.ldap;
|
||||||
|
|
||||||
|
@ -1070,10 +1163,9 @@ Client.prototype._updateIdle = function _updateIdle(override) {
|
||||||
// Client must be connected but not waiting on any request data
|
// Client must be connected but not waiting on any request data
|
||||||
var self = this;
|
var self = this;
|
||||||
function isIdle(disable) {
|
function isIdle(disable) {
|
||||||
// FIXME: this breaks with abandons
|
|
||||||
return ((disable !== true) &&
|
return ((disable !== true) &&
|
||||||
(self.socket && self.connected) &&
|
(self.socket && self.connected) &&
|
||||||
(Object.keys(self.socket.ldap.messages).length === 0));
|
(self.socket.ldap.pending.length === 0));
|
||||||
}
|
}
|
||||||
if (isIdle(override)) {
|
if (isIdle(override)) {
|
||||||
if (!this._idleTimer) {
|
if (!this._idleTimer) {
|
||||||
|
@ -1224,7 +1316,7 @@ Client.prototype._sendSocket = function _sendSocket(message,
|
||||||
// page search continued, just return for now
|
// page search continued, just return for now
|
||||||
return undefined;
|
return undefined;
|
||||||
} else {
|
} else {
|
||||||
delete conn.ldap.messages[message.messageID];
|
conn.ldap.remove(message.messageID);
|
||||||
// Potentially mark client as idle
|
// Potentially mark client as idle
|
||||||
self._updateIdle();
|
self._updateIdle();
|
||||||
|
|
||||||
|
@ -1243,16 +1335,19 @@ Client.prototype._sendSocket = function _sendSocket(message,
|
||||||
|
|
||||||
function onRequestTimeout() {
|
function onRequestTimeout() {
|
||||||
self.emit('timeout', message);
|
self.emit('timeout', message);
|
||||||
if (conn.ldap.messages[message.messageID]) {
|
var cb = conn.ldap.fetch(message.messageID);
|
||||||
conn.ldap.messages[message.messageID](new LDAPResult({
|
if (cb) {
|
||||||
status: 80, // LDAP_OTHER
|
//FIXME: the timed-out request should be abandoned
|
||||||
errorMessage: 'request timeout (client interrupt)'
|
cb(new errors.TimeoutError('request timeout (client interrupt)'));
|
||||||
}));
|
|
||||||
}
|
}
|
||||||
} // end function onRequestTimeout()
|
} // end function onRequestTimeout()
|
||||||
|
|
||||||
function writeCallback() {
|
function writeCallback() {
|
||||||
if (expect === 'abandon') {
|
if (expect === 'abandon') {
|
||||||
|
// Mark the messageID specified as abandoned
|
||||||
|
conn.ldap.abandon(message.abandonID);
|
||||||
|
// No need to track the abandon request itself
|
||||||
|
conn.ldap.remove(message.id);
|
||||||
return callback(null);
|
return callback(null);
|
||||||
} else if (expect === 'unbind') {
|
} else if (expect === 'unbind') {
|
||||||
conn.unbindMessageID = message.id;
|
conn.unbindMessageID = message.id;
|
||||||
|
@ -1267,8 +1362,7 @@ Client.prototype._sendSocket = function _sendSocket(message,
|
||||||
} // end writeCallback()
|
} // end writeCallback()
|
||||||
|
|
||||||
// Start actually doing something...
|
// Start actually doing something...
|
||||||
message.messageID = conn.ldap.getNextMessageID();
|
conn.ldap.track(message, messageCallback);
|
||||||
conn.ldap.messages[message.messageID] = messageCallback;
|
|
||||||
// Mark client as active
|
// Mark client as active
|
||||||
this._updateIdle(true);
|
this._updateIdle(true);
|
||||||
|
|
||||||
|
|
|
@ -100,14 +100,37 @@ module.exports.getMessage = function (code) {
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
///--- Custom application errors
|
||||||
|
|
||||||
function ConnectionError(message) {
|
function ConnectionError(message) {
|
||||||
LDAPError.call(this,
|
LDAPError.call(this,
|
||||||
'ConnectionError',
|
'ConnectionError',
|
||||||
0x80, // LDAP_OTHER,
|
CODES.LDAP_OTHER,
|
||||||
message,
|
message,
|
||||||
null,
|
null,
|
||||||
ConnectionError);
|
ConnectionError);
|
||||||
}
|
}
|
||||||
util.inherits(ConnectionError, LDAPError);
|
util.inherits(ConnectionError, LDAPError);
|
||||||
module.exports.ConnectionError = ConnectionError;
|
module.exports.ConnectionError = ConnectionError;
|
||||||
|
|
||||||
|
function AbandonedError(message) {
|
||||||
|
LDAPError.call(this,
|
||||||
|
'AbandonedError',
|
||||||
|
CODES.LDAP_OTHER,
|
||||||
|
message,
|
||||||
|
null,
|
||||||
|
AbandonedError);
|
||||||
|
}
|
||||||
|
util.inherits(AbandonedError, LDAPError);
|
||||||
|
module.exports.AbandonedError = AbandonedError;
|
||||||
|
|
||||||
|
function TimeoutError(message) {
|
||||||
|
LDAPError.call(this,
|
||||||
|
'TimeoutError',
|
||||||
|
CODES.LDAP_OTHER,
|
||||||
|
message,
|
||||||
|
null,
|
||||||
|
TimeoutError);
|
||||||
|
}
|
||||||
|
util.inherits(TimeoutError, LDAPError);
|
||||||
|
module.exports.TimeoutError = TimeoutError;
|
||||||
|
|
|
@ -619,7 +619,6 @@ test('GH-24 attribute selection of *', function (t) {
|
||||||
|
|
||||||
|
|
||||||
test('idle timeout', function (t) {
|
test('idle timeout', function (t) {
|
||||||
// FIXME: this must be run before abandon
|
|
||||||
client.idleTimeout = 250;
|
client.idleTimeout = 250;
|
||||||
function premature() {
|
function premature() {
|
||||||
t.ifError(true);
|
t.ifError(true);
|
||||||
|
@ -856,14 +855,13 @@ test('no auto-reconnect on unbind', function (t) {
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
// Abandon handling is improper on the server and not robust in the client
|
test('abandon (GH-27)', function (t) {
|
||||||
// FIXME: Disable test until one/both are fixed.
|
// FIXME: test abandoning a real request
|
||||||
//test('abandon (GH-27)', function (t) {
|
client.abandon(401876543, function (err) {
|
||||||
// client.abandon(401876543, function (err) {
|
t.ifError(err);
|
||||||
// t.ifError(err);
|
t.end();
|
||||||
// t.end();
|
});
|
||||||
// });
|
});
|
||||||
//});
|
|
||||||
|
|
||||||
|
|
||||||
test('search timeout (GH-51)', function (t) {
|
test('search timeout (GH-51)', function (t) {
|
||||||
|
|
Loading…
Reference in New Issue