diff --git a/lib/connectors/memory.js b/lib/connectors/memory.js index ae8b260d..e3eea27d 100644 --- a/lib/connectors/memory.js +++ b/lib/connectors/memory.js @@ -13,6 +13,7 @@ var geo = require('../geo'); var utils = require('../utils'); var fs = require('fs'); var async = require('async'); +var debug = require('debug')('loopback:connector:memory'); /** * Initialize the Memory connector against the given data source @@ -104,45 +105,92 @@ Memory.prototype.collectionSeq = function(model, val) { return this.ids[model]; }; -Memory.prototype.loadFromFile = function(callback) { +/** + * Create a queue to serialize file read/write operations + * @returns {*} The file operation queue + */ +Memory.prototype.setupFileQueue = function() { var self = this; + if (!this.fileQueue) { + // Create a queue for writes + this.fileQueue = async.queue(function(task, done) { + var callback = task.callback || function() {}; + var file = self.settings.file; + if (task.operation === 'write') { + // Flush out the models/ids + var data = JSON.stringify({ + ids: self.ids, + models: self.cache, + }, null, ' '); + debug('Writing cache to %s: %s', file, data); + fs.writeFile(file, data, function(err) { + debug('Cache has been written to %s', file); + done(err); + callback(err, task.data); + }); + } else if (task.operation === 'read') { + debug('Reading cache from %s: %s', file, data); + fs.readFile(file, { + encoding: 'utf8', + flag: 'r', + }, function(err, data) { + if (err && err.code !== 'ENOENT') { + done(err); + callback(err); + } else { + debug('Cache has been read from %s: %s', file, data); + self.parseAndLoad(data, function(err) { + done(err); + callback(err); + }); + } + }); + } else { + var err = new Error('Unknown type of task'); + done(err); + callback(err); + } + }, 1); + } + return this.fileQueue; +}; + +Memory.prototype.parseAndLoad = function(data, callback) { + if (data) { + try { + data = JSON.parse(data.toString()); + } catch (e) { + return callback && callback(e); + } + + this.ids = data.ids || {}; + this.cache = data.models || {}; + } else { + if (!this.cache) { + this.ids = {}; + this.cache = {}; + } + } + callback && callback(); +}; + +Memory.prototype.loadFromFile = function(callback) { var hasLocalStorage = typeof window !== 'undefined' && window.localStorage; var localStorage = hasLocalStorage && this.settings.localStorage; - if (self.settings.file) { - fs.readFile(self.settings.file, {encoding: 'utf8', flag: 'r'}, function(err, data) { - if (err && err.code !== 'ENOENT') { - callback && callback(err); - } else { - parseAndLoad(data); - } + if (this.settings.file) { + debug('Queueing read %s', this.settings.file); + this.setupFileQueue().push({ + operation: 'read', + callback: callback, }); } else if (localStorage) { var data = window.localStorage.getItem(localStorage); data = data || '{}'; - parseAndLoad(data); + this.parseAndLoad(data, callback); } else { process.nextTick(callback); } - - function parseAndLoad(data) { - if (data) { - try { - data = JSON.parse(data.toString()); - } catch (e) { - return callback(e); - } - - self.ids = data.ids || {}; - self.cache = data.models || {}; - } else { - if (!self.cache) { - self.ids = {}; - self.cache = {}; - } - } - callback && callback(); - } }; /*! @@ -150,36 +198,22 @@ Memory.prototype.loadFromFile = function(callback) { * @param {Function} callback */ Memory.prototype.saveToFile = function(result, callback) { - var self = this; var file = this.settings.file; var hasLocalStorage = typeof window !== 'undefined' && window.localStorage; var localStorage = hasLocalStorage && this.settings.localStorage; if (file) { - if (!self.writeQueue) { - // Create a queue for writes - self.writeQueue = async.queue(function(task, cb) { - // Flush out the models/ids - var data = JSON.stringify({ - ids: self.ids, - models: self.cache, - }, null, ' '); - - fs.writeFile(self.settings.file, data, function(err) { - cb(err); - task.callback && task.callback(err, task.data); - }); - }, 1); - } + debug('Queueing write %s', this.settings.file); // Enqueue the write - self.writeQueue.push({ + this.setupFileQueue().push({ + operation: 'write', data: result, callback: callback, }); } else if (localStorage) { // Flush out the models/ids var data = JSON.stringify({ - ids: self.ids, - models: self.cache, + ids: this.ids, + models: this.cache, }, null, ' '); window.localStorage.setItem(localStorage, data); process.nextTick(function() { @@ -506,7 +540,7 @@ function applyFilter(filter) { // Support referencesMany and other embedded relations // Also support array types. Mongo, possibly PostgreSQL if (Array.isArray(value)) { - var matcher = where[key]; + var matcher = where[key]; // The following condition is for the case where we are querying with // a neq filter, and when the value is an empty array ([]). if (matcher.neq !== undefined && value.length <= 0) { diff --git a/lib/datasource.js b/lib/datasource.js index 67786f98..51e92615 100644 --- a/lib/datasource.js +++ b/lib/datasource.js @@ -365,36 +365,59 @@ DataSource.prototype.setup = function(name, settings) { } } - dataSource.connect = function(cb) { - var dataSource = this; - if (dataSource.connected || dataSource.connecting) { - process.nextTick(function() { - cb && cb(); - }); - return; + dataSource.connect = function(callback) { + callback = callback || utils.createPromiseCallback(); + var self = this; + if (this.connected) { + // The data source is already connected, return immediately + process.nextTick(callback); + return callback.promise; } - dataSource.connecting = true; - if (dataSource.connector.connect) { - dataSource.connector.connect(function(err, result) { - if (!err) { - dataSource.connected = true; - dataSource.connecting = false; - dataSource.emit('connected'); - } else { - dataSource.connected = false; - dataSource.connecting = false; - dataSource.emit('error', err); + if (typeof dataSource.connector.connect !== 'function') { + // Connector doesn't have the connect function + // Assume no connect is needed + self.connected = true; + self.connecting = false; + process.nextTick(function() { + self.emit('connected'); + callback(); + }); + return callback.promise; + } + + // Queue the callback + this.pendingConnectCallbacks = this.pendingConnectCallbacks || []; + this.pendingConnectCallbacks.push(callback); + + // The connect is already in progress + if (this.connecting) return callback.promise; + // Set connecting flag to be true + this.connecting = true; + this.connector.connect(function(err, result) { + self.connecting = false; + if (!err) self.connected = true; + var cbs = self.pendingConnectCallbacks; + self.pendingConnectCallbacks = []; + if (!err) { + self.emit('connected'); + } else { + self.emit('error', err); + } + // Invoke all pending callbacks + async.each(cbs, function(cb, done) { + try { + cb(err); + } catch (e) { + // Ignore error to make sure all callbacks are invoked + debug('Uncaught error raised by connect callback function: ', e); + } finally { + done(); } - cb && cb(err, result); + }, function(err) { + if (err) throw err; // It should not happen }); - } else { - process.nextTick(function() { - dataSource.connected = true; - dataSource.connecting = false; - dataSource.emit('connected'); - cb && cb(); - }); - } + }); + return callback.promise; }; }; diff --git a/test/memory.test.js b/test/memory.test.js index df4cefed..b6a4fd88 100644 --- a/test/memory.test.js +++ b/test/memory.test.js @@ -34,8 +34,10 @@ describe('Memory connector', function() { }); describe('with file', function() { + var ds; + function createUserModel() { - var ds = new DataSource({ + ds = new DataSource({ connector: 'memory', file: file, }); @@ -62,6 +64,13 @@ describe('Memory connector', function() { User = createUserModel(); }); + it('should allow multiple connects', function(done) { + ds.connected = false; // Change the state to force reconnect + async.times(10, function(n, next) { + ds.connect(next); + }, done); + }); + it('should persist create', function(done) { var count = 0; async.eachSeries(['John1', 'John2', 'John3'], function(item, cb) { @@ -76,6 +85,41 @@ describe('Memory connector', function() { }, done); }); + /** + * This test depends on the `should persist create`, which creates 3 + * records and saves into the `memory.json`. The following test makes + * sure existing records won't be loaded out of sequence to override + * newly created ones. + */ + it('should not have out of sequence read/write', function(done) { + ds.connected = false; + ds.connector.connected = false; // Change the state to force reconnect + ds.connector.cache = {}; + + async.times(10, function(n, next) { + if (n === 10) { + // Make sure the connect finishes + return ds.connect(next); + } + ds.connect(); + next(); + }, function(err) { + async.eachSeries(['John4', 'John5'], function(item, cb) { + var count = 0; + User.create({name: item}, function(err, result) { + ids.push(result.id); + cb(err); + }); + }, function(err) { + if (err) return done(err); + readModels(function(err, json) { + assert.equal(Object.keys(json.models.User).length, 5); + done(); + }); + }); + }); + }); + it('should persist delete', function(done) { // Now try to delete one User.deleteById(ids[0], function(err) { @@ -86,7 +130,7 @@ describe('Memory connector', function() { if (err) { return done(err); } - assert.equal(Object.keys(json.models.User).length, 2); + assert.equal(Object.keys(json.models.User).length, 4); done(); }); }); @@ -101,7 +145,7 @@ describe('Memory connector', function() { if (err) { return done(err); } - assert.equal(Object.keys(json.models.User).length, 2); + assert.equal(Object.keys(json.models.User).length, 4); var user = JSON.parse(json.models.User[ids[1]]); assert.equal(user.name, 'John'); assert(user.id === ids[1]); @@ -120,7 +164,7 @@ describe('Memory connector', function() { if (err) { return done(err); } - assert.equal(Object.keys(json.models.User).length, 2); + assert.equal(Object.keys(json.models.User).length, 4); var user = JSON.parse(json.models.User[ids[1]]); assert.equal(user.name, 'John1'); assert(user.id === ids[1]); @@ -133,7 +177,7 @@ describe('Memory connector', function() { it('should load from the json file', function(done) { User.find(function(err, users) { // There should be 2 records - assert.equal(users.length, 2); + assert.equal(users.length, 4); done(err); }); });