Move replication implementation to DataModel

This commit is contained in:
Ritchie Martori 2014-05-06 13:31:23 -07:00
parent f8b5fa11ec
commit a3a6828709
7 changed files with 535 additions and 370 deletions

View File

@ -5,6 +5,7 @@
var assert = require('assert');
var remoting = require('strong-remoting');
var compat = require('../compat');
var DataAccessObject = require('loopback-datasource-juggler/lib/dao');
/**
* Export the RemoteConnector class.
@ -26,6 +27,9 @@ function RemoteConnector(settings) {
this.port = settings.port || 3000;
this.remotes = remoting.create();
// TODO(ritch) make sure this name works with Model.getSourceId()
this.name = 'remote-connector';
if(settings.url) {
this.url = settings.url;
} else {

View File

@ -77,7 +77,6 @@ Change.setup = function() {
}
Change.setup();
/**
* Track the recent change of the given modelIds.
*
@ -369,10 +368,12 @@ Change.diff = function(modelName, since, remoteChanges, callback) {
*/
Change.rectifyAll = function(cb) {
var Change = this;
// this should be optimized
this.find(function(err, changes) {
if(err) return cb(err);
changes.forEach(function(change) {
change = new Change(change);
change.rectify();
});
});
@ -393,6 +394,11 @@ Change.getCheckpointModel = function() {
return checkpointModel;
}
Change.handleError = function(err) {
if(!this.settings.ignoreErrors) {
throw err;
}
}
/**
* When two changes conflict a conflict is created.

View File

@ -4,7 +4,8 @@
var Model = require('./model');
var RemoteObjects = require('strong-remoting');
var DataAccess = require('loopback-datasource-juggler/lib/dao');
var assert = require('assert');
var async = require('async');
/**
* Extends Model with basic query and CRUD support.
@ -42,6 +43,15 @@ DataModel.setup = function setupDataModel() {
return val ? new DataModel(val) : val;
});
// enable change tracking (usually for replication)
if(this.settings.trackChanges) {
DataModel._defineChangeModel();
DataModel.once('dataSourceAttached', function() {
DataModel.enableChangeTracking();
});
}
DataModel.setupRemoting();
}
@ -232,9 +242,66 @@ DataModel.count = function (where, cb) {
*/
DataModel.prototype.save = function (options, callback) {
throwNotAttached(this.constructor.modelName, 'save');
var Model = this.constructor;
if (typeof options == 'function') {
callback = options;
options = {};
}
callback = callback || function () {
};
options = options || {};
if (!('validate' in options)) {
options.validate = true;
}
if (!('throws' in options)) {
options.throws = false;
}
var inst = this;
var data = inst.toObject(true);
var id = this.getId();
if (!id) {
return Model.create(this, callback);
}
// validate first
if (!options.validate) {
return save();
}
inst.isValid(function (valid) {
if (valid) {
save();
} else {
var err = new ValidationError(inst);
// throws option is dangerous for async usage
if (options.throws) {
throw err;
}
callback(err, inst);
}
});
// then save
function save() {
inst.trigger('save', function (saveDone) {
inst.trigger('update', function (updateDone) {
Model.upsert(inst, function(err) {
inst._initProperties(data);
updateDone.call(inst, function () {
saveDone.call(inst, function () {
callback(err, inst);
});
});
});
}, data);
}, data);
}
};
DataModel.prototype.save._delegate = true;
/**
* Determine if the data model is new.
@ -354,6 +421,7 @@ DataModel.getIdName = function() {
DataModel.setupRemoting = function() {
var DataModel = this;
var typeName = DataModel.modelName;
var options = DataModel.settings;
setRemoting(DataModel.create, {
description: 'Create a new instance of the model and persist it into the data source',
@ -428,6 +496,411 @@ DataModel.setupRemoting = function() {
returns: {arg: 'data', type: typeName, root: true},
http: {verb: 'put', path: '/'}
});
if(options.trackChanges) {
setRemoting(DataModel.diff, {
description: 'Get a set of deltas and conflicts since the given checkpoint',
accepts: [
{arg: 'since', type: 'number', description: 'Find deltas since this checkpoint'},
{arg: 'remoteChanges', type: 'array', description: 'an array of change objects',
http: {source: 'body'}}
],
returns: {arg: 'deltas', type: 'array', root: true},
http: {verb: 'post', path: '/diff'}
});
setRemoting(DataModel.changes, {
description: 'Get the changes to a model since a given checkpoint.'
+ 'Provide a filter object to reduce the number of results returned.',
accepts: [
{arg: 'since', type: 'number', description: 'Only return changes since this checkpoint'},
{arg: 'filter', type: 'object', description: 'Only include changes that match this filter'}
],
returns: {arg: 'changes', type: 'array', root: true},
http: {verb: 'get', path: '/changes'}
});
setRemoting(DataModel.checkpoint, {
description: 'Create a checkpoint.',
returns: {arg: 'checkpoint', type: 'object', root: true},
http: {verb: 'post', path: '/checkpoint'}
});
setRemoting(DataModel.currentCheckpoint, {
description: 'Get the current checkpoint.',
returns: {arg: 'checkpoint', type: 'object', root: true},
http: {verb: 'get', path: '/checkpoint'}
});
setRemoting(DataModel.bulkUpdate, {
description: 'Run multiple updates at once. Note: this is not atomic.',
accepts: {arg: 'updates', type: 'array'},
http: {verb: 'post', path: '/bulk-update'}
});
}
}
/**
* Get a set of deltas and conflicts since the given checkpoint.
*
* See `Change.diff()` for details.
*
* @param {Number} since Find deltas since this checkpoint
* @param {Array} remoteChanges An array of change objects
* @param {Function} callback
*/
DataModel.diff = function(since, remoteChanges, callback) {
var Change = this.getChangeModel();
Change.diff(this.modelName, since, remoteChanges, callback);
}
/**
* Get the changes to a model since a given checkpoint. Provide a filter object
* to reduce the number of results returned.
* @param {Number} since Only return changes since this checkpoint
* @param {Object} filter Only include changes that match this filter
* (same as `Model.find(filter, ...)`)
* @callback {Function} callback
* @param {Error} err
* @param {Array} changes An array of `Change` objects
* @end
*/
DataModel.changes = function(since, filter, callback) {
var idName = this.dataSource.idName(this.modelName);
var Change = this.getChangeModel();
var model = this;
filter = filter || {};
filter.fields = {};
filter.where = filter.where || {};
filter.fields[idName] = true;
// this whole thing could be optimized a bit more
Change.find({
checkpoint: {gt: since},
modelName: this.modelName
}, function(err, changes) {
if(err) return cb(err);
var ids = changes.map(function(change) {
return change.modelId.toString();
});
filter.where[idName] = {inq: ids};
model.find(filter, function(err, models) {
if(err) return cb(err);
var modelIds = models.map(function(m) {
return m[idName].toString();
});
callback(null, changes.filter(function(ch) {
if(ch.type() === Change.DELETE) return true;
return modelIds.indexOf(ch.modelId) > -1;
}));
});
});
}
/**
* Create a checkpoint.
*
* @param {Function} callback
*/
DataModel.checkpoint = function(cb) {
var Checkpoint = this.getChangeModel().getCheckpointModel();
this.getSourceId(function(err, sourceId) {
if(err) return cb(err);
Checkpoint.create({
sourceId: sourceId
}, cb);
});
}
/**
* Get the current checkpoint id.
*
* @callback {Function} callback
* @param {Error} err
* @param {Number} currentCheckpointId
* @end
*/
DataModel.currentCheckpoint = function(cb) {
var Checkpoint = this.getChangeModel().getCheckpointModel();
Checkpoint.current(cb);
}
/**
* Replicate changes since the given checkpoint to the given target model.
*
* @param {Number} [since] Since this checkpoint
* @param {Model} targetModel Target this model class
* @param {Object} [options]
* @param {Object} [options.filter] Replicate models that match this filter
* @callback {Function} [callback]
* @param {Error} err
* @param {Conflict[]} conflicts A list of changes that could not be replicated
* due to conflicts.
*/
DataModel.replicate = function(since, targetModel, options, callback) {
var lastArg = arguments[arguments.length - 1];
if(typeof lastArg === 'function' && arguments.length > 1) {
callback = lastArg;
}
if(typeof since === 'function' && since.modelName) {
targetModel = since;
since = -1;
}
options = options || {};
var sourceModel = this;
var diff;
var updates;
var Change = this.getChangeModel();
var TargetChange = targetModel.getChangeModel();
var changeTrackingEnabled = Change && TargetChange;
assert(
changeTrackingEnabled,
'You must enable change tracking before replicating'
);
callback = callback || function defaultReplicationCallback(err) {
if(err) throw err;
}
var tasks = [
getLocalChanges,
getDiffFromTarget,
createSourceUpdates,
bulkUpdate,
checkpoint
];
async.waterfall(tasks, function(err) {
if(err) return callback(err);
var conflicts = diff.conflicts.map(function(change) {
var sourceChange = new Change({
modelName: sourceModel.modelName,
modelId: change.modelId
});
var targetChange = new TargetChange(change);
return new Change.Conflict(sourceChange, targetChange);
});
callback && callback(null, conflicts);
});
function getLocalChanges(cb) {
sourceModel.changes(since, options.filter, cb);
}
function getDiffFromTarget(sourceChanges, cb) {
targetModel.diff(since, sourceChanges, cb);
}
function createSourceUpdates(_diff, cb) {
diff = _diff;
diff.conflicts = diff.conflicts || [];
if(diff && diff.deltas && diff.deltas.length) {
sourceModel.createUpdates(diff.deltas, cb);
} else {
// done
callback(null, []);
}
}
function bulkUpdate(updates, cb) {
targetModel.bulkUpdate(updates, cb);
}
function checkpoint() {
var cb = arguments[arguments.length - 1];
sourceModel.checkpoint(cb);
}
}
/**
* Create an update list (for `Model.bulkUpdate()`) from a delta list
* (result of `Change.diff()`).
*
* @param {Array} deltas
* @param {Function} callback
*/
DataModel.createUpdates = function(deltas, cb) {
var Change = this.getChangeModel();
var updates = [];
var Model = this;
var tasks = [];
deltas.forEach(function(change) {
var change = new Change(change);
var type = change.type();
var update = {type: type, change: change};
switch(type) {
case Change.CREATE:
case Change.UPDATE:
tasks.push(function(cb) {
Model.findById(change.modelId, function(err, inst) {
if(err) return cb(err);
if(inst.toObject) {
update.data = inst.toObject();
} else {
update.data = inst;
}
updates.push(update);
cb();
});
});
break;
case Change.DELETE:
updates.push(update);
break;
}
});
async.parallel(tasks, function(err) {
if(err) return cb(err);
cb(null, updates);
});
}
/**
* Apply an update list.
*
* **Note: this is not atomic**
*
* @param {Array} updates An updates list (usually from Model.createUpdates())
* @param {Function} callback
*/
DataModel.bulkUpdate = function(updates, callback) {
var tasks = [];
var Model = this;
var idName = this.dataSource.idName(this.modelName);
var Change = this.getChangeModel();
updates.forEach(function(update) {
switch(update.type) {
case Change.UPDATE:
case Change.CREATE:
// var model = new Model(update.data);
// tasks.push(model.save.bind(model));
tasks.push(function(cb) {
var model = new Model(update.data);
model.save(cb);
});
break;
case Change.DELETE:
var data = {};
data[idName] = update.change.modelId;
var model = new Model(data);
tasks.push(model.destroy.bind(model));
break;
}
});
async.parallel(tasks, callback);
}
/**
* Get the `Change` model.
*
* @throws {Error} Throws an error if the change model is not correctly setup.
* @return {Change}
*/
DataModel.getChangeModel = function() {
var changeModel = this.Change;
var isSetup = changeModel && changeModel.dataSource;
assert(isSetup, 'Cannot get a setup Change model');
return changeModel;
}
/**
* Get the source identifier for this model / dataSource.
*
* @callback {Function} callback
* @param {Error} err
* @param {String} sourceId
*/
DataModel.getSourceId = function(cb) {
var dataSource = this.dataSource;
if(!dataSource) {
this.once('dataSourceAttached', this.getSourceId.bind(this, cb));
}
assert(
dataSource.connector.name,
'Model.getSourceId: cannot get id without dataSource.connector.name'
);
var id = [dataSource.connector.name, this.modelName].join('-');
cb(null, id);
}
/**
* Enable the tracking of changes made to the model. Usually for replication.
*/
DataModel.enableChangeTracking = function() {
// console.log('THIS SHOULD NOT RUN ON A MODEL CONNECTED TO A REMOTE DATASOURCE');
var Model = this;
var Change = this.Change || this._defineChangeModel();
var cleanupInterval = Model.settings.changeCleanupInterval || 30000;
assert(this.dataSource, 'Cannot enableChangeTracking(): ' + this.modelName
+ ' is not attached to a dataSource');
Change.attachTo(this.dataSource);
Change.getCheckpointModel().attachTo(this.dataSource);
Model.on('changed', function(obj) {
Change.rectifyModelChanges(Model.modelName, [obj.id], function(err) {
if(err) {
console.error(Model.modelName + ' Change Tracking Error:');
console.error(err);
}
});
});
Model.on('deleted', function(obj) {
Change.rectifyModelChanges(Model.modelName, [obj.id], function(err) {
if(err) {
console.error(Model.modelName + ' Change Tracking Error:');
console.error(err);
}
});
});
Model.on('deletedAll', cleanup);
// initial cleanup
cleanup();
// cleanup
setInterval(cleanup, cleanupInterval);
function cleanup() {
Change.rectifyAll(function(err) {
if(err) {
console.error(Model.modelName + ' Change Cleanup Error:');
console.error(err);
}
});
}
}
DataModel._defineChangeModel = function() {
var BaseChangeModel = require('./change');
return this.Change = BaseChangeModel.extend(this.modelName + '-change');
}
DataModel.setup();

View File

@ -92,10 +92,6 @@ Model.setup = function () {
var ModelCtor = this;
var options = this.settings;
if(options.trackChanges) {
this._defineChangeModel();
}
ModelCtor.sharedCtor = function (data, id, fn) {
if(typeof data === 'function') {
fn = data;
@ -183,13 +179,6 @@ Model.setup = function () {
ModelCtor.sharedCtor.returns = {root: true};
// enable change tracking (usually for replication)
if(options.trackChanges) {
ModelCtor.once('dataSourceAttached', function() {
ModelCtor.enableChangeTracking();
});
}
return ModelCtor;
};
@ -303,355 +292,3 @@ Model.getApp = function(callback) {
// setup the initial model
Model.setup();
/**
* Get a set of deltas and conflicts since the given checkpoint.
*
* See `Change.diff()` for details.
*
* @param {Number} since Find changes since this checkpoint
* @param {Array} remoteChanges An array of change objects
* @param {Function} callback
*/
Model.diff = function(since, remoteChanges, callback) {
var Change = this.getChangeModel();
Change.diff(this.modelName, since, remoteChanges, callback);
}
/**
* Get the changes to a model since a given checkpoing. Provide a filter object
* to reduce the number of results returned.
* @param {Number} since Only return changes since this checkpoint
* @param {Object} filter Only include changes that match this filter
* (same as `Model.find(filter, ...)`)
* @callback {Function} callback
* @param {Error} err
* @param {Array} changes An array of `Change` objects
* @end
*/
Model.changes = function(since, filter, callback) {
var idName = this.dataSource.idName(this.modelName);
var Change = this.getChangeModel();
var model = this;
filter = filter || {};
filter.fields = {};
filter.where = filter.where || {};
filter.fields[idName] = true;
// this whole thing could be optimized a bit more
Change.find({
checkpoint: {gt: since},
modelName: this.modelName
}, function(err, changes) {
if(err) return cb(err);
var ids = changes.map(function(change) {
return change.modelId.toString();
});
filter.where[idName] = {inq: ids};
model.find(filter, function(err, models) {
if(err) return cb(err);
var modelIds = models.map(function(m) {
return m[idName].toString();
});
callback(null, changes.filter(function(ch) {
if(ch.type() === Change.DELETE) return true;
return modelIds.indexOf(ch.modelId) > -1;
}));
});
});
}
/**
* Create a checkpoint.
*
* @param {Function} callback
*/
Model.checkpoint = function(cb) {
var Checkpoint = this.getChangeModel().getCheckpointModel();
this.getSourceId(function(err, sourceId) {
if(err) return cb(err);
Checkpoint.create({
sourceId: sourceId
}, cb);
});
}
/**
* Get the current checkpoint id.
*
* @callback {Function} callback
* @param {Error} err
* @param {Number} currentCheckpointId
* @end
*/
Model.currentCheckpoint = function(cb) {
var Checkpoint = this.getChangeModel().getCheckpointModel();
Checkpoint.current(cb);
}
/**
* Replicate changes since the given checkpoint to the given target model.
*
* @param {Number} [since] Since this checkpoint
* @param {Model} targetModel Target this model class
* @param {Object} [options]
* @param {Object} [options.filter] Replicate models that match this filter
* @callback {Function} [callback]
* @param {Error} err
* @param {Conflict[]} conflicts A list of changes that could not be replicated
* due to conflicts.
*/
Model.replicate = function(since, targetModel, options, callback) {
var lastArg = arguments[arguments.length - 1];
if(typeof lastArg === 'function' && arguments.length > 1) {
callback = lastArg;
}
if(typeof since === 'function' && since.modelName) {
targetModel = since;
since = -1;
}
options = options || {};
var sourceModel = this;
var diff;
var updates;
var Change = this.getChangeModel();
var TargetChange = targetModel.getChangeModel();
var changeTrackingEnabled = Change && TargetChange;
assert(
changeTrackingEnabled,
'You must enable change tracking before replicating'
);
var tasks = [
getLocalChanges,
getDiffFromTarget,
createSourceUpdates,
bulkUpdate,
checkpoint
];
async.waterfall(tasks, function(err) {
if(err) return callback(err);
var conflicts = diff.conflicts.map(function(change) {
var sourceChange = new Change({
modelName: sourceModel.modelName,
modelId: change.modelId
});
var targetChange = new TargetChange(change);
return new Change.Conflict(sourceChange, targetChange);
});
callback && callback(null, conflicts);
});
function getLocalChanges(cb) {
sourceModel.changes(since, options.filter, cb);
}
function getDiffFromTarget(sourceChanges, cb) {
targetModel.diff(since, sourceChanges, cb);
}
function createSourceUpdates(_diff, cb) {
diff = _diff;
diff.conflicts = diff.conflicts || [];
sourceModel.createUpdates(diff.deltas, cb);
}
function bulkUpdate(updates, cb) {
targetModel.bulkUpdate(updates, cb);
}
function checkpoint() {
var cb = arguments[arguments.length - 1];
sourceModel.checkpoint(cb);
}
}
/**
* Create an update list (for `Model.bulkUpdate()`) from a delta list
* (result of `Change.diff()`).
*
* @param {Array} deltas
* @param {Function} callback
*/
Model.createUpdates = function(deltas, cb) {
var Change = this.getChangeModel();
var updates = [];
var Model = this;
var tasks = [];
deltas.forEach(function(change) {
var change = new Change(change);
var type = change.type();
var update = {type: type, change: change};
switch(type) {
case Change.CREATE:
case Change.UPDATE:
tasks.push(function(cb) {
Model.findById(change.modelId, function(err, inst) {
if(err) return cb(err);
if(inst.toObject) {
update.data = inst.toObject();
} else {
update.data = inst;
}
updates.push(update);
cb();
});
});
break;
case Change.DELETE:
updates.push(update);
break;
}
});
async.parallel(tasks, function(err) {
if(err) return cb(err);
cb(null, updates);
});
}
/**
* Apply an update list.
*
* **Note: this is not atomic**
*
* @param {Array} updates An updates list (usually from Model.createUpdates())
* @param {Function} callback
*/
Model.bulkUpdate = function(updates, callback) {
var tasks = [];
var Model = this;
var idName = this.dataSource.idName(this.modelName);
var Change = this.getChangeModel();
updates.forEach(function(update) {
switch(update.type) {
case Change.UPDATE:
case Change.CREATE:
// var model = new Model(update.data);
// tasks.push(model.save.bind(model));
tasks.push(function(cb) {
var model = new Model(update.data);
model.save(cb);
});
break;
case Change.DELETE:
var data = {};
data[idName] = update.change.modelId;
var model = new Model(data);
tasks.push(model.destroy.bind(model));
break;
}
});
async.parallel(tasks, callback);
}
/**
* Get the `Change` model.
*
* @throws {Error} Throws an error if the change model is not correctly setup.
* @return {Change}
*/
Model.getChangeModel = function() {
var changeModel = this.Change;
var isSetup = changeModel && changeModel.dataSource;
assert(isSetup, 'Cannot get a setup Change model');
return changeModel;
}
/**
* Get the source identifier for this model / dataSource.
*
* @callback {Function} callback
* @param {Error} err
* @param {String} sourceId
*/
Model.getSourceId = function(cb) {
var dataSource = this.dataSource;
if(!dataSource) {
this.once('dataSourceAttached', this.getSourceId.bind(this, cb));
}
assert(
dataSource.connector.name,
'Model.getSourceId: cannot get id without dataSource.connector.name'
);
var id = [dataSource.connector.name, this.modelName].join('-');
cb(null, id);
}
/**
* Enable the tracking of changes made to the model. Usually for replication.
*/
Model.enableChangeTracking = function() {
var Model = this;
var Change = this.Change || this._defineChangeModel();
var cleanupInterval = Model.settings.changeCleanupInterval || 30000;
assert(this.dataSource, 'Cannot enableChangeTracking(): ' + this.modelName
+ ' is not attached to a dataSource');
Change.attachTo(this.dataSource);
Change.getCheckpointModel().attachTo(this.dataSource);
Model.on('changed', function(obj) {
Change.rectifyModelChanges(Model.modelName, [obj.id], function(err) {
if(err) {
console.error(Model.modelName + ' Change Tracking Error:');
console.error(err);
}
});
});
Model.on('deleted', function(obj) {
Change.rectifyModelChanges(Model.modelName, [obj.id], function(err) {
if(err) {
console.error(Model.modelName + ' Change Tracking Error:');
console.error(err);
}
});
});
Model.on('deletedAll', cleanup);
// initial cleanup
cleanup();
// cleanup
setInterval(cleanup, cleanupInterval);
function cleanup() {
Change.rectifyAll(function(err) {
if(err) {
console.error(Model.modelName + ' Change Cleanup Error:');
console.error(err);
}
});
}
}
Model._defineChangeModel = function() {
var BaseChangeModel = require('./change');
return this.Change = BaseChangeModel.extend(this.modelName + '-change');
}

View File

@ -30,7 +30,7 @@ describe('RemoteConnector', function() {
});
m.save(function(err, data) {
if(err) return done(err);
assert(m.id);
assert(data.foo === 'bar');
done();
});
});

View File

@ -512,12 +512,15 @@ describe.onServer('Remote Methods', function(){
beforeEach(function(done) {
var test = this;
this.dataSource = dataSource;
var SourceModel = this.SourceModel = this.dataSource.createModel('SourceModel', {}, {
var SourceModel = this.SourceModel = DataModel.extend('SourceModel', {}, {
trackChanges: true
});
var TargetModel = this.TargetModel = this.dataSource.createModel('TargetModel', {}, {
SourceModel.attachTo(dataSource);
var TargetModel = this.TargetModel = DataModel.extend('TargetModel', {}, {
trackChanges: true
});
TargetModel.attachTo(dataSource);
var createOne = SourceModel.create.bind(SourceModel, {
name: 'baz'

View File

@ -27,4 +27,46 @@ describe('RemoteConnector', function() {
remoteApp.model(RemoteModel);
}
});
beforeEach(function(done) {
var test = this;
remoteApp = this.remoteApp = loopback();
remoteApp.use(loopback.rest());
var ServerModel = this.ServerModel = loopback.DataModel.extend('TestModel');
remoteApp.model(ServerModel);
remoteApp.listen(0, function() {
test.remote = loopback.createDataSource({
host: remoteApp.get('host'),
port: remoteApp.get('port'),
connector: loopback.Remote
});
done();
});
});
it('should support the save method', function (done) {
var calledServerCreate = false;
var RemoteModel = loopback.DataModel.extend('TestModel');
RemoteModel.attachTo(this.remote);
var ServerModel = this.ServerModel;
ServerModel.create = function(data, cb) {
calledServerCreate = true;
data.id = 1;
cb(null, data);
}
ServerModel.setupRemoting();
var m = new RemoteModel({foo: 'bar'});
console.log(m.save.toString());
m.save(function(err, inst) {
assert(inst instanceof RemoteModel);
assert(calledServerCreate);
done();
});
});
});