Fix in-mem connector file operation racing condition

This commit is contained in:
Raymond Feng 2017-04-04 08:25:15 -07:00
parent 966d5daed7
commit 0a408476ec
3 changed files with 181 additions and 80 deletions

View File

@ -13,6 +13,7 @@ var geo = require('../geo');
var utils = require('../utils'); var utils = require('../utils');
var fs = require('fs'); var fs = require('fs');
var async = require('async'); var async = require('async');
var debug = require('debug')('loopback:connector:memory');
/** /**
* Initialize the Memory connector against the given data source * Initialize the Memory connector against the given data source
@ -104,45 +105,92 @@ Memory.prototype.collectionSeq = function(model, val) {
return this.ids[model]; return this.ids[model];
}; };
Memory.prototype.loadFromFile = function(callback) { /**
* Create a queue to serialize file read/write operations
* @returns {*} The file operation queue
*/
Memory.prototype.setupFileQueue = function() {
var self = this; var self = this;
if (!this.fileQueue) {
// Create a queue for writes
this.fileQueue = async.queue(function(task, done) {
var callback = task.callback || function() {};
var file = self.settings.file;
if (task.operation === 'write') {
// Flush out the models/ids
var data = JSON.stringify({
ids: self.ids,
models: self.cache,
}, null, ' ');
debug('Writing cache to %s: %s', file, data);
fs.writeFile(file, data, function(err) {
debug('Cache has been written to %s', file);
done(err);
callback(err, task.data);
});
} else if (task.operation === 'read') {
debug('Reading cache from %s: %s', file, data);
fs.readFile(file, {
encoding: 'utf8',
flag: 'r',
}, function(err, data) {
if (err && err.code !== 'ENOENT') {
done(err);
callback(err);
} else {
debug('Cache has been read from %s: %s', file, data);
self.parseAndLoad(data, function(err) {
done(err);
callback(err);
});
}
});
} else {
var err = new Error('Unknown type of task');
done(err);
callback(err);
}
}, 1);
}
return this.fileQueue;
};
Memory.prototype.parseAndLoad = function(data, callback) {
if (data) {
try {
data = JSON.parse(data.toString());
} catch (e) {
return callback && callback(e);
}
this.ids = data.ids || {};
this.cache = data.models || {};
} else {
if (!this.cache) {
this.ids = {};
this.cache = {};
}
}
callback && callback();
};
Memory.prototype.loadFromFile = function(callback) {
var hasLocalStorage = typeof window !== 'undefined' && window.localStorage; var hasLocalStorage = typeof window !== 'undefined' && window.localStorage;
var localStorage = hasLocalStorage && this.settings.localStorage; var localStorage = hasLocalStorage && this.settings.localStorage;
if (self.settings.file) { if (this.settings.file) {
fs.readFile(self.settings.file, {encoding: 'utf8', flag: 'r'}, function(err, data) { debug('Queueing read %s', this.settings.file);
if (err && err.code !== 'ENOENT') { this.setupFileQueue().push({
callback && callback(err); operation: 'read',
} else { callback: callback,
parseAndLoad(data);
}
}); });
} else if (localStorage) { } else if (localStorage) {
var data = window.localStorage.getItem(localStorage); var data = window.localStorage.getItem(localStorage);
data = data || '{}'; data = data || '{}';
parseAndLoad(data); this.parseAndLoad(data, callback);
} else { } else {
process.nextTick(callback); process.nextTick(callback);
} }
function parseAndLoad(data) {
if (data) {
try {
data = JSON.parse(data.toString());
} catch (e) {
return callback(e);
}
self.ids = data.ids || {};
self.cache = data.models || {};
} else {
if (!self.cache) {
self.ids = {};
self.cache = {};
}
}
callback && callback();
}
}; };
/*! /*!
@ -150,36 +198,22 @@ Memory.prototype.loadFromFile = function(callback) {
* @param {Function} callback * @param {Function} callback
*/ */
Memory.prototype.saveToFile = function(result, callback) { Memory.prototype.saveToFile = function(result, callback) {
var self = this;
var file = this.settings.file; var file = this.settings.file;
var hasLocalStorage = typeof window !== 'undefined' && window.localStorage; var hasLocalStorage = typeof window !== 'undefined' && window.localStorage;
var localStorage = hasLocalStorage && this.settings.localStorage; var localStorage = hasLocalStorage && this.settings.localStorage;
if (file) { if (file) {
if (!self.writeQueue) { debug('Queueing write %s', this.settings.file);
// Create a queue for writes
self.writeQueue = async.queue(function(task, cb) {
// Flush out the models/ids
var data = JSON.stringify({
ids: self.ids,
models: self.cache,
}, null, ' ');
fs.writeFile(self.settings.file, data, function(err) {
cb(err);
task.callback && task.callback(err, task.data);
});
}, 1);
}
// Enqueue the write // Enqueue the write
self.writeQueue.push({ this.setupFileQueue().push({
operation: 'write',
data: result, data: result,
callback: callback, callback: callback,
}); });
} else if (localStorage) { } else if (localStorage) {
// Flush out the models/ids // Flush out the models/ids
var data = JSON.stringify({ var data = JSON.stringify({
ids: self.ids, ids: this.ids,
models: self.cache, models: this.cache,
}, null, ' '); }, null, ' ');
window.localStorage.setItem(localStorage, data); window.localStorage.setItem(localStorage, data);
process.nextTick(function() { process.nextTick(function() {
@ -506,7 +540,7 @@ function applyFilter(filter) {
// Support referencesMany and other embedded relations // Support referencesMany and other embedded relations
// Also support array types. Mongo, possibly PostgreSQL // Also support array types. Mongo, possibly PostgreSQL
if (Array.isArray(value)) { if (Array.isArray(value)) {
var matcher = where[key]; var matcher = where[key];
// The following condition is for the case where we are querying with // The following condition is for the case where we are querying with
// a neq filter, and when the value is an empty array ([]). // a neq filter, and when the value is an empty array ([]).
if (matcher.neq !== undefined && value.length <= 0) { if (matcher.neq !== undefined && value.length <= 0) {

View File

@ -365,36 +365,59 @@ DataSource.prototype.setup = function(name, settings) {
} }
} }
dataSource.connect = function(cb) { dataSource.connect = function(callback) {
var dataSource = this; callback = callback || utils.createPromiseCallback();
if (dataSource.connected || dataSource.connecting) { var self = this;
process.nextTick(function() { if (this.connected) {
cb && cb(); // The data source is already connected, return immediately
}); process.nextTick(callback);
return; return callback.promise;
} }
dataSource.connecting = true; if (typeof dataSource.connector.connect !== 'function') {
if (dataSource.connector.connect) { // Connector doesn't have the connect function
dataSource.connector.connect(function(err, result) { // Assume no connect is needed
if (!err) { self.connected = true;
dataSource.connected = true; self.connecting = false;
dataSource.connecting = false; process.nextTick(function() {
dataSource.emit('connected'); self.emit('connected');
} else { callback();
dataSource.connected = false; });
dataSource.connecting = false; return callback.promise;
dataSource.emit('error', err); }
// Queue the callback
this.pendingConnectCallbacks = this.pendingConnectCallbacks || [];
this.pendingConnectCallbacks.push(callback);
// The connect is already in progress
if (this.connecting) return callback.promise;
// Set connecting flag to be true
this.connecting = true;
this.connector.connect(function(err, result) {
self.connecting = false;
if (!err) self.connected = true;
var cbs = self.pendingConnectCallbacks;
self.pendingConnectCallbacks = [];
if (!err) {
self.emit('connected');
} else {
self.emit('error', err);
}
// Invoke all pending callbacks
async.each(cbs, function(cb, done) {
try {
cb(err);
} catch (e) {
// Ignore error to make sure all callbacks are invoked
debug('Uncaught error raised by connect callback function: ', e);
} finally {
done();
} }
cb && cb(err, result); }, function(err) {
if (err) throw err; // It should not happen
}); });
} else { });
process.nextTick(function() { return callback.promise;
dataSource.connected = true;
dataSource.connecting = false;
dataSource.emit('connected');
cb && cb();
});
}
}; };
}; };

View File

@ -34,8 +34,10 @@ describe('Memory connector', function() {
}); });
describe('with file', function() { describe('with file', function() {
var ds;
function createUserModel() { function createUserModel() {
var ds = new DataSource({ ds = new DataSource({
connector: 'memory', connector: 'memory',
file: file, file: file,
}); });
@ -62,6 +64,13 @@ describe('Memory connector', function() {
User = createUserModel(); User = createUserModel();
}); });
it('should allow multiple connects', function(done) {
ds.connected = false; // Change the state to force reconnect
async.times(10, function(n, next) {
ds.connect(next);
}, done);
});
it('should persist create', function(done) { it('should persist create', function(done) {
var count = 0; var count = 0;
async.eachSeries(['John1', 'John2', 'John3'], function(item, cb) { async.eachSeries(['John1', 'John2', 'John3'], function(item, cb) {
@ -76,6 +85,41 @@ describe('Memory connector', function() {
}, done); }, done);
}); });
/**
* This test depends on the `should persist create`, which creates 3
* records and saves into the `memory.json`. The following test makes
* sure existing records won't be loaded out of sequence to override
* newly created ones.
*/
it('should not have out of sequence read/write', function(done) {
ds.connected = false;
ds.connector.connected = false; // Change the state to force reconnect
ds.connector.cache = {};
async.times(10, function(n, next) {
if (n === 10) {
// Make sure the connect finishes
return ds.connect(next);
}
ds.connect();
next();
}, function(err) {
async.eachSeries(['John4', 'John5'], function(item, cb) {
var count = 0;
User.create({name: item}, function(err, result) {
ids.push(result.id);
cb(err);
});
}, function(err) {
if (err) return done(err);
readModels(function(err, json) {
assert.equal(Object.keys(json.models.User).length, 5);
done();
});
});
});
});
it('should persist delete', function(done) { it('should persist delete', function(done) {
// Now try to delete one // Now try to delete one
User.deleteById(ids[0], function(err) { User.deleteById(ids[0], function(err) {
@ -86,7 +130,7 @@ describe('Memory connector', function() {
if (err) { if (err) {
return done(err); return done(err);
} }
assert.equal(Object.keys(json.models.User).length, 2); assert.equal(Object.keys(json.models.User).length, 4);
done(); done();
}); });
}); });
@ -101,7 +145,7 @@ describe('Memory connector', function() {
if (err) { if (err) {
return done(err); return done(err);
} }
assert.equal(Object.keys(json.models.User).length, 2); assert.equal(Object.keys(json.models.User).length, 4);
var user = JSON.parse(json.models.User[ids[1]]); var user = JSON.parse(json.models.User[ids[1]]);
assert.equal(user.name, 'John'); assert.equal(user.name, 'John');
assert(user.id === ids[1]); assert(user.id === ids[1]);
@ -120,7 +164,7 @@ describe('Memory connector', function() {
if (err) { if (err) {
return done(err); return done(err);
} }
assert.equal(Object.keys(json.models.User).length, 2); assert.equal(Object.keys(json.models.User).length, 4);
var user = JSON.parse(json.models.User[ids[1]]); var user = JSON.parse(json.models.User[ids[1]]);
assert.equal(user.name, 'John1'); assert.equal(user.name, 'John1');
assert(user.id === ids[1]); assert(user.id === ids[1]);
@ -133,7 +177,7 @@ describe('Memory connector', function() {
it('should load from the json file', function(done) { it('should load from the json file', function(done) {
User.find(function(err, users) { User.find(function(err, users) {
// There should be 2 records // There should be 2 records
assert.equal(users.length, 2); assert.equal(users.length, 4);
done(err); done(err);
}); });
}); });