Merge pull request #1300 from strongloop/feature/fix-mem-connector

Fix in-mem connector file operation racing condition
This commit is contained in:
Raymond Feng 2017-04-04 08:41:36 -07:00 committed by GitHub
commit 5981197299
3 changed files with 181 additions and 80 deletions

View File

@ -13,6 +13,7 @@ var geo = require('../geo');
var utils = require('../utils');
var fs = require('fs');
var async = require('async');
var debug = require('debug')('loopback:connector:memory');
/**
* Initialize the Memory connector against the given data source
@ -104,44 +105,91 @@ Memory.prototype.collectionSeq = function(model, val) {
return this.ids[model];
};
Memory.prototype.loadFromFile = function(callback) {
/**
* Create a queue to serialize file read/write operations
* @returns {*} The file operation queue
*/
Memory.prototype.setupFileQueue = function() {
var self = this;
var hasLocalStorage = typeof window !== 'undefined' && window.localStorage;
var localStorage = hasLocalStorage && this.settings.localStorage;
if (self.settings.file) {
fs.readFile(self.settings.file, {encoding: 'utf8', flag: 'r'}, function(err, data) {
if (!this.fileQueue) {
// Create a queue for writes
this.fileQueue = async.queue(function(task, done) {
var callback = task.callback || function() {};
var file = self.settings.file;
if (task.operation === 'write') {
// Flush out the models/ids
var data = JSON.stringify({
ids: self.ids,
models: self.cache,
}, null, ' ');
debug('Writing cache to %s: %s', file, data);
fs.writeFile(file, data, function(err) {
debug('Cache has been written to %s', file);
done(err);
callback(err, task.data);
});
} else if (task.operation === 'read') {
debug('Reading cache from %s: %s', file, data);
fs.readFile(file, {
encoding: 'utf8',
flag: 'r',
}, function(err, data) {
if (err && err.code !== 'ENOENT') {
callback && callback(err);
done(err);
callback(err);
} else {
parseAndLoad(data);
debug('Cache has been read from %s: %s', file, data);
self.parseAndLoad(data, function(err) {
done(err);
callback(err);
});
}
});
} else if (localStorage) {
var data = window.localStorage.getItem(localStorage);
data = data || '{}';
parseAndLoad(data);
} else {
process.nextTick(callback);
var err = new Error('Unknown type of task');
done(err);
callback(err);
}
}, 1);
}
return this.fileQueue;
};
function parseAndLoad(data) {
Memory.prototype.parseAndLoad = function(data, callback) {
if (data) {
try {
data = JSON.parse(data.toString());
} catch (e) {
return callback(e);
return callback && callback(e);
}
self.ids = data.ids || {};
self.cache = data.models || {};
this.ids = data.ids || {};
this.cache = data.models || {};
} else {
if (!self.cache) {
self.ids = {};
self.cache = {};
if (!this.cache) {
this.ids = {};
this.cache = {};
}
}
callback && callback();
};
Memory.prototype.loadFromFile = function(callback) {
var hasLocalStorage = typeof window !== 'undefined' && window.localStorage;
var localStorage = hasLocalStorage && this.settings.localStorage;
if (this.settings.file) {
debug('Queueing read %s', this.settings.file);
this.setupFileQueue().push({
operation: 'read',
callback: callback,
});
} else if (localStorage) {
var data = window.localStorage.getItem(localStorage);
data = data || '{}';
this.parseAndLoad(data, callback);
} else {
process.nextTick(callback);
}
};
@ -150,36 +198,22 @@ Memory.prototype.loadFromFile = function(callback) {
* @param {Function} callback
*/
Memory.prototype.saveToFile = function(result, callback) {
var self = this;
var file = this.settings.file;
var hasLocalStorage = typeof window !== 'undefined' && window.localStorage;
var localStorage = hasLocalStorage && this.settings.localStorage;
if (file) {
if (!self.writeQueue) {
// Create a queue for writes
self.writeQueue = async.queue(function(task, cb) {
// Flush out the models/ids
var data = JSON.stringify({
ids: self.ids,
models: self.cache,
}, null, ' ');
fs.writeFile(self.settings.file, data, function(err) {
cb(err);
task.callback && task.callback(err, task.data);
});
}, 1);
}
debug('Queueing write %s', this.settings.file);
// Enqueue the write
self.writeQueue.push({
this.setupFileQueue().push({
operation: 'write',
data: result,
callback: callback,
});
} else if (localStorage) {
// Flush out the models/ids
var data = JSON.stringify({
ids: self.ids,
models: self.cache,
ids: this.ids,
models: this.cache,
}, null, ' ');
window.localStorage.setItem(localStorage, data);
process.nextTick(function() {

View File

@ -365,36 +365,59 @@ DataSource.prototype.setup = function(name, settings) {
}
}
dataSource.connect = function(cb) {
var dataSource = this;
if (dataSource.connected || dataSource.connecting) {
process.nextTick(function() {
cb && cb();
});
return;
dataSource.connect = function(callback) {
callback = callback || utils.createPromiseCallback();
var self = this;
if (this.connected) {
// The data source is already connected, return immediately
process.nextTick(callback);
return callback.promise;
}
dataSource.connecting = true;
if (dataSource.connector.connect) {
dataSource.connector.connect(function(err, result) {
if (typeof dataSource.connector.connect !== 'function') {
// Connector doesn't have the connect function
// Assume no connect is needed
self.connected = true;
self.connecting = false;
process.nextTick(function() {
self.emit('connected');
callback();
});
return callback.promise;
}
// Queue the callback
this.pendingConnectCallbacks = this.pendingConnectCallbacks || [];
this.pendingConnectCallbacks.push(callback);
// The connect is already in progress
if (this.connecting) return callback.promise;
// Set connecting flag to be true
this.connecting = true;
this.connector.connect(function(err, result) {
self.connecting = false;
if (!err) self.connected = true;
var cbs = self.pendingConnectCallbacks;
self.pendingConnectCallbacks = [];
if (!err) {
dataSource.connected = true;
dataSource.connecting = false;
dataSource.emit('connected');
self.emit('connected');
} else {
dataSource.connected = false;
dataSource.connecting = false;
dataSource.emit('error', err);
self.emit('error', err);
}
cb && cb(err, result);
});
} else {
process.nextTick(function() {
dataSource.connected = true;
dataSource.connecting = false;
dataSource.emit('connected');
cb && cb();
});
// Invoke all pending callbacks
async.each(cbs, function(cb, done) {
try {
cb(err);
} catch (e) {
// Ignore error to make sure all callbacks are invoked
debug('Uncaught error raised by connect callback function: ', e);
} finally {
done();
}
}, function(err) {
if (err) throw err; // It should not happen
});
});
return callback.promise;
};
};

View File

@ -34,8 +34,10 @@ describe('Memory connector', function() {
});
describe('with file', function() {
var ds;
function createUserModel() {
var ds = new DataSource({
ds = new DataSource({
connector: 'memory',
file: file,
});
@ -62,6 +64,13 @@ describe('Memory connector', function() {
User = createUserModel();
});
it('should allow multiple connects', function(done) {
ds.connected = false; // Change the state to force reconnect
async.times(10, function(n, next) {
ds.connect(next);
}, done);
});
it('should persist create', function(done) {
var count = 0;
async.eachSeries(['John1', 'John2', 'John3'], function(item, cb) {
@ -76,6 +85,41 @@ describe('Memory connector', function() {
}, done);
});
/**
* This test depends on the `should persist create`, which creates 3
* records and saves into the `memory.json`. The following test makes
* sure existing records won't be loaded out of sequence to override
* newly created ones.
*/
it('should not have out of sequence read/write', function(done) {
ds.connected = false;
ds.connector.connected = false; // Change the state to force reconnect
ds.connector.cache = {};
async.times(10, function(n, next) {
if (n === 10) {
// Make sure the connect finishes
return ds.connect(next);
}
ds.connect();
next();
}, function(err) {
async.eachSeries(['John4', 'John5'], function(item, cb) {
var count = 0;
User.create({name: item}, function(err, result) {
ids.push(result.id);
cb(err);
});
}, function(err) {
if (err) return done(err);
readModels(function(err, json) {
assert.equal(Object.keys(json.models.User).length, 5);
done();
});
});
});
});
it('should persist delete', function(done) {
// Now try to delete one
User.deleteById(ids[0], function(err) {
@ -86,7 +130,7 @@ describe('Memory connector', function() {
if (err) {
return done(err);
}
assert.equal(Object.keys(json.models.User).length, 2);
assert.equal(Object.keys(json.models.User).length, 4);
done();
});
});
@ -101,7 +145,7 @@ describe('Memory connector', function() {
if (err) {
return done(err);
}
assert.equal(Object.keys(json.models.User).length, 2);
assert.equal(Object.keys(json.models.User).length, 4);
var user = JSON.parse(json.models.User[ids[1]]);
assert.equal(user.name, 'John');
assert(user.id === ids[1]);
@ -120,7 +164,7 @@ describe('Memory connector', function() {
if (err) {
return done(err);
}
assert.equal(Object.keys(json.models.User).length, 2);
assert.equal(Object.keys(json.models.User).length, 4);
var user = JSON.parse(json.models.User[ids[1]]);
assert.equal(user.name, 'John1');
assert(user.id === ids[1]);
@ -133,7 +177,7 @@ describe('Memory connector', function() {
it('should load from the json file', function(done) {
User.find(function(err, users) {
// There should be 2 records
assert.equal(users.length, 2);
assert.equal(users.length, 4);
done(err);
});
});