Use dataSource.connect to avoid duplicate connects

This commit is contained in:
Raymond Feng 2017-04-04 10:19:58 -07:00
parent 957058e2d4
commit 1f995ec674
3 changed files with 80 additions and 68 deletions

View File

@ -23,7 +23,8 @@ var debug = require('debug')('loopback:connector:memory');
*/
exports.initialize = function initializeDataSource(dataSource, callback) {
dataSource.connector = new Memory(null, dataSource.settings);
dataSource.connector.connect(callback);
// Use dataSource.connect to avoid duplicate file reads from cache
dataSource.connect(callback);
};
exports.Memory = Memory;

View File

@ -263,6 +263,65 @@ DataSource._resolveConnector = function(name, loader) {
};
};
/**
* Connect to the data source
* @param callback
*/
DataSource.prototype.connect = function(callback) {
callback = callback || utils.createPromiseCallback();
var self = this;
if (this.connected) {
// The data source is already connected, return immediately
process.nextTick(callback);
return callback.promise;
}
if (typeof this.connector.connect !== 'function') {
// Connector doesn't have the connect function
// Assume no connect is needed
self.connected = true;
self.connecting = false;
process.nextTick(function() {
self.emit('connected');
callback();
});
return callback.promise;
}
// Queue the callback
this.pendingConnectCallbacks = this.pendingConnectCallbacks || [];
this.pendingConnectCallbacks.push(callback);
// The connect is already in progress
if (this.connecting) return callback.promise;
// Set connecting flag to be true
this.connecting = true;
this.connector.connect(function(err, result) {
self.connecting = false;
if (!err) self.connected = true;
var cbs = self.pendingConnectCallbacks;
self.pendingConnectCallbacks = [];
if (!err) {
self.emit('connected');
} else {
self.emit('error', err);
}
// Invoke all pending callbacks
async.each(cbs, function(cb, done) {
try {
cb(err);
} catch (e) {
// Ignore error to make sure all callbacks are invoked
debug('Uncaught error raised by connect callback function: ', e);
} finally {
done();
}
}, function(err) {
if (err) throw err; // It should not happen
});
});
return callback.promise;
};
/**
* Set up the data source
* @param {String} name The name
@ -364,61 +423,6 @@ DataSource.prototype.setup = function(name, settings) {
throw err;
}
}
dataSource.connect = function(callback) {
callback = callback || utils.createPromiseCallback();
var self = this;
if (this.connected) {
// The data source is already connected, return immediately
process.nextTick(callback);
return callback.promise;
}
if (typeof dataSource.connector.connect !== 'function') {
// Connector doesn't have the connect function
// Assume no connect is needed
self.connected = true;
self.connecting = false;
process.nextTick(function() {
self.emit('connected');
callback();
});
return callback.promise;
}
// Queue the callback
this.pendingConnectCallbacks = this.pendingConnectCallbacks || [];
this.pendingConnectCallbacks.push(callback);
// The connect is already in progress
if (this.connecting) return callback.promise;
// Set connecting flag to be true
this.connecting = true;
this.connector.connect(function(err, result) {
self.connecting = false;
if (!err) self.connected = true;
var cbs = self.pendingConnectCallbacks;
self.pendingConnectCallbacks = [];
if (!err) {
self.emit('connected');
} else {
self.emit('error', err);
}
// Invoke all pending callbacks
async.each(cbs, function(cb, done) {
try {
cb(err);
} catch (e) {
// Ignore error to make sure all callbacks are invoked
debug('Uncaught error raised by connect callback function: ', e);
} finally {
done();
}
}, function(err) {
if (err) throw err; // It should not happen
});
});
return callback.promise;
};
};
function isModelClass(cls) {
@ -1924,6 +1928,7 @@ DataSource.prototype.disconnect = function disconnect(cb) {
});
} else {
process.nextTick(function() {
self.connected = false;
cb && cb();
});
}

View File

@ -37,7 +37,7 @@ describe('Memory connector', function() {
var ds;
function createUserModel() {
ds = new DataSource({
var ds = new DataSource({
connector: 'memory',
file: file,
});
@ -62,6 +62,7 @@ describe('Memory connector', function() {
before(function() {
User = createUserModel();
ds = User.dataSource;
});
it('should allow multiple connects', function(done) {
@ -92,9 +93,10 @@ describe('Memory connector', function() {
* newly created ones.
*/
it('should not have out of sequence read/write', function(done) {
ds.connected = false;
ds.connector.connected = false; // Change the state to force reconnect
ds.connector.cache = {};
// Create the new data source with the same file to simulate
// existing records
var User = createUserModel();
var ds = User.dataSource;
async.times(10, function(n, next) {
if (n === 10) {
@ -121,17 +123,21 @@ describe('Memory connector', function() {
});
it('should persist delete', function(done) {
// Now try to delete one
User.deleteById(ids[0], function(err) {
if (err) {
return done(err);
}
readModels(function(err, json) {
// Force the data source to reconnect so that the updated records
// are reloaded
ds.disconnect(function() {
// Now try to delete one
User.deleteById(ids[0], function(err) {
if (err) {
return done(err);
}
assert.equal(Object.keys(json.models.User).length, 4);
done();
readModels(function(err, json) {
if (err) {
return done(err);
}
assert.equal(Object.keys(json.models.User).length, 4);
done();
});
});
});
});