Merge pull request #1307 from strongloop/feature/mem-connector-connect
Use dataSource.connect to avoid duplicate connects
This commit is contained in:
commit
322fa48267
|
@ -23,7 +23,8 @@ var debug = require('debug')('loopback:connector:memory');
|
||||||
*/
|
*/
|
||||||
exports.initialize = function initializeDataSource(dataSource, callback) {
|
exports.initialize = function initializeDataSource(dataSource, callback) {
|
||||||
dataSource.connector = new Memory(null, dataSource.settings);
|
dataSource.connector = new Memory(null, dataSource.settings);
|
||||||
dataSource.connector.connect(callback);
|
// Use dataSource.connect to avoid duplicate file reads from cache
|
||||||
|
dataSource.connect(callback);
|
||||||
};
|
};
|
||||||
|
|
||||||
exports.Memory = Memory;
|
exports.Memory = Memory;
|
||||||
|
|
|
@ -263,6 +263,65 @@ DataSource._resolveConnector = function(name, loader) {
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Connect to the data source
|
||||||
|
* @param callback
|
||||||
|
*/
|
||||||
|
DataSource.prototype.connect = function(callback) {
|
||||||
|
callback = callback || utils.createPromiseCallback();
|
||||||
|
var self = this;
|
||||||
|
if (this.connected) {
|
||||||
|
// The data source is already connected, return immediately
|
||||||
|
process.nextTick(callback);
|
||||||
|
return callback.promise;
|
||||||
|
}
|
||||||
|
if (typeof this.connector.connect !== 'function') {
|
||||||
|
// Connector doesn't have the connect function
|
||||||
|
// Assume no connect is needed
|
||||||
|
self.connected = true;
|
||||||
|
self.connecting = false;
|
||||||
|
process.nextTick(function() {
|
||||||
|
self.emit('connected');
|
||||||
|
callback();
|
||||||
|
});
|
||||||
|
return callback.promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Queue the callback
|
||||||
|
this.pendingConnectCallbacks = this.pendingConnectCallbacks || [];
|
||||||
|
this.pendingConnectCallbacks.push(callback);
|
||||||
|
|
||||||
|
// The connect is already in progress
|
||||||
|
if (this.connecting) return callback.promise;
|
||||||
|
// Set connecting flag to be true
|
||||||
|
this.connecting = true;
|
||||||
|
this.connector.connect(function(err, result) {
|
||||||
|
self.connecting = false;
|
||||||
|
if (!err) self.connected = true;
|
||||||
|
var cbs = self.pendingConnectCallbacks;
|
||||||
|
self.pendingConnectCallbacks = [];
|
||||||
|
if (!err) {
|
||||||
|
self.emit('connected');
|
||||||
|
} else {
|
||||||
|
self.emit('error', err);
|
||||||
|
}
|
||||||
|
// Invoke all pending callbacks
|
||||||
|
async.each(cbs, function(cb, done) {
|
||||||
|
try {
|
||||||
|
cb(err);
|
||||||
|
} catch (e) {
|
||||||
|
// Ignore error to make sure all callbacks are invoked
|
||||||
|
debug('Uncaught error raised by connect callback function: ', e);
|
||||||
|
} finally {
|
||||||
|
done();
|
||||||
|
}
|
||||||
|
}, function(err) {
|
||||||
|
if (err) throw err; // It should not happen
|
||||||
|
});
|
||||||
|
});
|
||||||
|
return callback.promise;
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set up the data source
|
* Set up the data source
|
||||||
* @param {String} name The name
|
* @param {String} name The name
|
||||||
|
@ -364,61 +423,6 @@ DataSource.prototype.setup = function(name, settings) {
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dataSource.connect = function(callback) {
|
|
||||||
callback = callback || utils.createPromiseCallback();
|
|
||||||
var self = this;
|
|
||||||
if (this.connected) {
|
|
||||||
// The data source is already connected, return immediately
|
|
||||||
process.nextTick(callback);
|
|
||||||
return callback.promise;
|
|
||||||
}
|
|
||||||
if (typeof dataSource.connector.connect !== 'function') {
|
|
||||||
// Connector doesn't have the connect function
|
|
||||||
// Assume no connect is needed
|
|
||||||
self.connected = true;
|
|
||||||
self.connecting = false;
|
|
||||||
process.nextTick(function() {
|
|
||||||
self.emit('connected');
|
|
||||||
callback();
|
|
||||||
});
|
|
||||||
return callback.promise;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Queue the callback
|
|
||||||
this.pendingConnectCallbacks = this.pendingConnectCallbacks || [];
|
|
||||||
this.pendingConnectCallbacks.push(callback);
|
|
||||||
|
|
||||||
// The connect is already in progress
|
|
||||||
if (this.connecting) return callback.promise;
|
|
||||||
// Set connecting flag to be true
|
|
||||||
this.connecting = true;
|
|
||||||
this.connector.connect(function(err, result) {
|
|
||||||
self.connecting = false;
|
|
||||||
if (!err) self.connected = true;
|
|
||||||
var cbs = self.pendingConnectCallbacks;
|
|
||||||
self.pendingConnectCallbacks = [];
|
|
||||||
if (!err) {
|
|
||||||
self.emit('connected');
|
|
||||||
} else {
|
|
||||||
self.emit('error', err);
|
|
||||||
}
|
|
||||||
// Invoke all pending callbacks
|
|
||||||
async.each(cbs, function(cb, done) {
|
|
||||||
try {
|
|
||||||
cb(err);
|
|
||||||
} catch (e) {
|
|
||||||
// Ignore error to make sure all callbacks are invoked
|
|
||||||
debug('Uncaught error raised by connect callback function: ', e);
|
|
||||||
} finally {
|
|
||||||
done();
|
|
||||||
}
|
|
||||||
}, function(err) {
|
|
||||||
if (err) throw err; // It should not happen
|
|
||||||
});
|
|
||||||
});
|
|
||||||
return callback.promise;
|
|
||||||
};
|
|
||||||
};
|
};
|
||||||
|
|
||||||
function isModelClass(cls) {
|
function isModelClass(cls) {
|
||||||
|
@ -1924,6 +1928,7 @@ DataSource.prototype.disconnect = function disconnect(cb) {
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
process.nextTick(function() {
|
process.nextTick(function() {
|
||||||
|
self.connected = false;
|
||||||
cb && cb();
|
cb && cb();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,7 +37,7 @@ describe('Memory connector', function() {
|
||||||
var ds;
|
var ds;
|
||||||
|
|
||||||
function createUserModel() {
|
function createUserModel() {
|
||||||
ds = new DataSource({
|
var ds = new DataSource({
|
||||||
connector: 'memory',
|
connector: 'memory',
|
||||||
file: file,
|
file: file,
|
||||||
});
|
});
|
||||||
|
@ -62,6 +62,7 @@ describe('Memory connector', function() {
|
||||||
|
|
||||||
before(function() {
|
before(function() {
|
||||||
User = createUserModel();
|
User = createUserModel();
|
||||||
|
ds = User.dataSource;
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should allow multiple connects', function(done) {
|
it('should allow multiple connects', function(done) {
|
||||||
|
@ -92,9 +93,10 @@ describe('Memory connector', function() {
|
||||||
* newly created ones.
|
* newly created ones.
|
||||||
*/
|
*/
|
||||||
it('should not have out of sequence read/write', function(done) {
|
it('should not have out of sequence read/write', function(done) {
|
||||||
ds.connected = false;
|
// Create the new data source with the same file to simulate
|
||||||
ds.connector.connected = false; // Change the state to force reconnect
|
// existing records
|
||||||
ds.connector.cache = {};
|
var User = createUserModel();
|
||||||
|
var ds = User.dataSource;
|
||||||
|
|
||||||
async.times(10, function(n, next) {
|
async.times(10, function(n, next) {
|
||||||
if (n === 10) {
|
if (n === 10) {
|
||||||
|
@ -121,6 +123,9 @@ describe('Memory connector', function() {
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should persist delete', function(done) {
|
it('should persist delete', function(done) {
|
||||||
|
// Force the data source to reconnect so that the updated records
|
||||||
|
// are reloaded
|
||||||
|
ds.disconnect(function() {
|
||||||
// Now try to delete one
|
// Now try to delete one
|
||||||
User.deleteById(ids[0], function(err) {
|
User.deleteById(ids[0], function(err) {
|
||||||
if (err) {
|
if (err) {
|
||||||
|
@ -135,6 +140,7 @@ describe('Memory connector', function() {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
});
|
||||||
|
|
||||||
it('should persist upsert', function(done) {
|
it('should persist upsert', function(done) {
|
||||||
User.upsert({id: ids[1], name: 'John'}, function(err, result) {
|
User.upsert({id: ids[1], name: 'John'}, function(err, result) {
|
||||||
|
|
Loading…
Reference in New Issue