Add replication example

This commit is contained in:
Ritchie Martori 2014-01-28 12:54:41 -08:00
parent 1a13a8d95e
commit 2582c3fc65
4 changed files with 425 additions and 22 deletions

138
example/replication/app.js Normal file
View File

@ -0,0 +1,138 @@
var loopback = require('../../');
var app = loopback();
var db = app.dataSource('db', {connector: loopback.Memory});
var Color = app.model('color', {dataSource: 'db', options: {trackChanges: true}});
var Color2 = app.model('color2', {dataSource: 'db', options: {trackChanges: true}});
var target = Color2;
var source = Color;
var SPEED = process.env.SPEED || 100;
var conflicts;
var steps = [
createSomeInitialSourceData,
replicateSourceToTarget,
list.bind(this, source, 'current SOURCE data'),
list.bind(this, target, 'current TARGET data'),
updateSomeTargetData,
replicateSourceToTarget,
list.bind(this, source, 'current SOURCE data '),
list.bind(this, target, 'current TARGET data (includes conflicting update)'),
updateSomeSourceDataCausingAConflict,
replicateSourceToTarget,
list.bind(this, source, 'current SOURCE data (now has a conflict)'),
list.bind(this, target, 'current TARGET data (includes conflicting update)'),
resolveAllConflicts,
replicateSourceToTarget,
list.bind(this, source, 'current SOURCE data (conflict resolved)'),
list.bind(this, target, 'current TARGET data (conflict resolved)'),
createMoreSourceData,
replicateSourceToTarget,
list.bind(this, source, 'current SOURCE data'),
list.bind(this, target, 'current TARGET data'),
createEvenMoreSourceData,
replicateSourceToTarget,
list.bind(this, source, 'current SOURCE data'),
list.bind(this, target, 'current TARGET data'),
deleteAllSourceData,
replicateSourceToTarget,
list.bind(this, source, 'current SOURCE data (empty)'),
list.bind(this, target, 'current TARGET data (empty)'),
createSomeNewSourceData,
replicateSourceToTarget,
list.bind(this, source, 'current SOURCE data'),
list.bind(this, target, 'current TARGET data')
];
run(steps);
function createSomeInitialSourceData() {
Color.create([
{name: 'red'},
{name: 'blue'},
{name: 'green'}
]);
}
function replicateSourceToTarget() {
Color.replicate(0, Color2, {}, function(err, replicationConflicts) {
conflicts = replicationConflicts;
});
}
function resolveAllConflicts() {
if(conflicts.length) {
conflicts.forEach(function(conflict) {
conflict.resolve();
});
}
}
function updateSomeTargetData() {
Color2.findById(1, function(err, color) {
color.name = 'conflict';
color.save();
});
}
function createMoreSourceData() {
Color.create({name: 'orange'});
}
function createEvenMoreSourceData() {
Color.create({name: 'black'});
}
function updateSomeSourceDataCausingAConflict() {
Color.findById(1, function(err, color) {
color.name = 'red!!!!';
color.save();
});
}
function deleteAllSourceData() {
Color.destroyAll();
}
function createSomeNewSourceData() {
Color.create([
{name: 'violet'},
{name: 'amber'},
{name: 'olive'}
]);
}
function list(model, msg) {
console.log(msg);
model.find(function(err, items) {
items.forEach(function(item) {
console.log(' -', item.name);
});
console.log();
});
}
function run(steps) {
setInterval(function() {
var step = steps.shift();
if(step) {
console.log(step.name);
step();
}
}, SPEED);
}

View File

@ -27,7 +27,7 @@ var properties = {
*/
var options = {
trackChanges: false
};
/**
@ -55,6 +55,12 @@ Change.CREATE = 'create';
Change.DELETE = 'delete';
Change.UNKNOWN = 'unknown';
/*!
* Conflict Class
*/
Change.Conflict = Conflict;
/*!
* Setup the extended model.
*/
@ -149,13 +155,34 @@ Change.findOrCreate = function(modelName, modelId, callback) {
Change.prototype.rectify = function(cb) {
var change = this;
this.prev = this.rev;
// get the current revision
this.currentRevision(function(err, rev) {
if(err) return Change.handleError(err, cb);
change.rev = rev;
var tasks = [
updateRevision,
updateCheckpoint
];
if(this.rev) this.prev = this.rev;
async.parallel(tasks, function(err) {
if(err) return cb(err);
change.save(cb);
});
function updateRevision(cb) {
// get the current revision
change.currentRevision(function(err, rev) {
if(err) return Change.handleError(err, cb);
change.rev = rev;
cb();
});
}
function updateCheckpoint(cb) {
change.constructor.getCheckpointModel().current(function(err, checkpoint) {
if(err) return Change.handleError(err);
change.checkpoint = ++checkpoint;
cb();
});
}
}
/**
@ -233,7 +260,7 @@ Change.prototype.type = function() {
Change.prototype.getModelCtor = function() {
// todo - not sure if this works with multiple data sources
return this.constructor.modelBuilder.models[this.modelName];
return loopback.getModel(this.modelName);
}
/**
@ -306,7 +333,10 @@ Change.diff = function(modelName, since, remoteChanges, callback) {
if(err) return callback(err);
var deltas = [];
var conflicts = [];
var localModelIds = [];
localChanges.forEach(function(localChange) {
localModelIds.push(localChange.modelId);
var remoteChange = remoteChangeIndex[localChange.modelId];
if(!localChange.equals(remoteChange)) {
if(remoteChange.isBasedOn(localChange)) {
@ -317,9 +347,91 @@ Change.diff = function(modelName, since, remoteChanges, callback) {
}
});
modelIds.forEach(function(id) {
if(localModelIds.indexOf(id) === -1) {
deltas.push(remoteChangeIndex[id]);
}
});
callback(null, {
deltas: deltas,
conflicts: conflicts
});
});
}
/**
* Correct all change list entries.
* @param {Function} callback
*/
Change.rectifyAll = function(cb) {
// this should be optimized
this.find(function(err, changes) {
if(err) return cb(err);
changes.forEach(function(change) {
change.rectify();
});
});
}
/**
* Get the checkpoint model.
* @return {Checkpoint}
*/
Change.getCheckpointModel = function() {
var checkpointModel = this.Checkpoint;
if(checkpointModel) return checkpointModel;
this.checkpoint = checkpointModel = require('./checkpoint').extend('checkpoint');
checkpointModel.attachTo(this.dataSource);
return checkpointModel;
}
/**
* When two changes conflict a conflict is created.
*
* **Note: call `conflict.fetch()` to get the `target` and `source` models.
*
* @param {Change} sourceChange The change object for the source model
* @param {Change} targetChange The conflicting model's change object
* @property {Model} source The source model instance
* @property {Model} target The target model instance
*/
function Conflict(sourceChange, targetChange) {
this.sourceChange = sourceChange;
this.targetChange = targetChange;
}
Conflict.prototype.fetch = function(cb) {
var conflict = this;
var tasks = [
getSourceModel,
getTargetModel
];
async.parallel(tasks, cb);
function getSourceModel(change, cb) {
conflict.sourceModel.getModel(function(err, model) {
if(err) return cb(err);
conflict.source = model;
cb();
});
}
function getTargetModel(cb) {
conflict.targetModel.getModel(function(err, model) {
if(err) return cb(err);
conflict.target = model;
cb();
});
}
}
Conflict.prototype.resolve = function(cb) {
this.sourceChange.prev = this.targetChange.rev;
this.sourceChange.save(cb);
}

View File

@ -48,8 +48,9 @@ Checkpoint.current = function(cb) {
this.find({
limit: 1,
sort: 'id DESC'
}, function(err, checkpoint) {
}, function(err, checkpoints) {
if(err) return cb(err);
var checkpoint = checkpoints[0] || {id: 0};
cb(null, checkpoint.id);
});
}

View File

@ -4,13 +4,76 @@
var loopback = require('../loopback');
var ModelBuilder = require('loopback-datasource-juggler').ModelBuilder;
var modeler = new ModelBuilder();
var async = require('async');
var assert = require('assert');
/**
* The built in loopback.Model.
* The base class for **all models**.
*
* **Inheriting from `Model`**
*
* ```js
* var properties = {...};
* var options = {...};
* var MyModel = loopback.Model.extend('MyModel', properties, options);
* ```
*
* **Options**
*
* - `trackChanges` - If true, changes to the model will be tracked. **Required
* for replication.**
*
* **Events**
*
* #### Event: `changed`
*
* Emitted after a model has been successfully created, saved, or updated.
*
* ```js
* MyModel.on('changed', function(inst) {
* console.log('model with id %s has been changed', inst.id);
* // => model with id 1 has been changed
* });
* ```
*
* #### Event: `deleted`
*
* Emitted after an individual model has been deleted.
*
* ```js
* MyModel.on('deleted', function(inst) {
* console.log('model with id %s has been deleted', inst.id);
* // => model with id 1 has been deleted
* });
* ```
*
* #### Event: `deletedAll`
*
* Emitted after an individual model has been deleted.
*
* ```js
* MyModel.on('deletedAll', function(where) {
* if(where) {
* console.log('all models where', where, 'have been deleted');
* // => all models where
* // => {price: {gt: 100}}
* // => have been deleted
* }
* });
* ```
*
* #### Event: `attached`
*
* Emitted after a `Model` has been attached to an `app`.
*
* #### Event: `dataSourceAttached`
*
* Emitted after a `Model` has been attached to a `DataSource`.
*
* @class
* @param {Object} data
* @property {String} modelName The name of the model
* @property {DataSource} dataSource
*/
var Model = module.exports = modeler.define('Model');
@ -23,6 +86,7 @@ Model.shared = true;
Model.setup = function () {
var ModelCtor = this;
var options = this.settings;
ModelCtor.sharedCtor = function (data, id, fn) {
if(typeof data === 'function') {
@ -108,6 +172,13 @@ Model.setup = function () {
ModelCtor.sharedCtor.returns = {root: true};
ModelCtor.once('dataSourceAttached', function() {
// enable change tracking (usually for replication)
if(options.trackChanges) {
ModelCtor.enableChangeTracking();
}
});
return ModelCtor;
};
@ -219,7 +290,7 @@ Model.diff = function(since, remoteChanges, callback) {
*/
Model.changes = function(since, filter, callback) {
var idName = this.idName();
var idName = this.dataSource.idName(this.modelName);
var Change = this.getChangeModel();
var model = this;
@ -235,15 +306,16 @@ Model.changes = function(since, filter, callback) {
}, function(err, changes) {
if(err) return cb(err);
var ids = changes.map(function(change) {
return change.modelId;
return change.modelId.toString();
});
filter.where[idName] = {inq: ids};
model.find(filter, function(err, models) {
if(err) return cb(err);
var modelIds = models.map(function(m) {
return m[idName];
return m[idName].toString();
});
callback(null, changes.filter(function(ch) {
if(ch.type() === Change.DELETE) return true;
return modelIds.indexOf(ch.modelId) > -1;
}));
});
@ -257,7 +329,7 @@ Model.changes = function(since, filter, callback) {
*/
Model.checkpoint = function(cb) {
var Checkpoint = this.getChangeModel().Checkpoint;
var Checkpoint = this.getChangeModel().getCheckpointModel();
this.getSourceId(function(err, sourceId) {
if(err) return cb(err);
Checkpoint.create({
@ -283,18 +355,29 @@ Model.replicate = function(since, targetModel, options, callback) {
var sourceModel = this;
var diff;
var updates;
var Change = this.getChangeModel();
var TargetChange = targetModel.getChangeModel();
var tasks = [
getLocalChanges,
getDiffFromTarget,
createSourceUpdates,
bulkUpdate,
sourceModel.checkpoint.bind(sourceModel)
checkpoint
];
async.waterfall(tasks, function(err) {
if(err) return callback(err);
callback(null, diff.conflicts);
var conflicts = diff.conflicts.map(function(change) {
var sourceChange = new Change({
modelName: sourceModel.modelName,
modelId: change.modelId
});
var targetChange = new TargetChange(change);
return new Change.Conflict(sourceChange, targetChange);
});
callback(null, conflicts);
});
function getLocalChanges(cb) {
@ -307,12 +390,18 @@ Model.replicate = function(since, targetModel, options, callback) {
function createSourceUpdates(_diff, cb) {
diff = _diff;
diff.conflicts = diff.conflicts || [];
sourceModel.createUpdates(diff.deltas, cb);
}
function bulkUpdate(updates, cb) {
targetModel.bulkUpdate(updates, cb);
}
function checkpoint() {
var cb = arguments[arguments.length - 1];
sourceModel.checkpoint(cb);
}
}
/**
@ -328,10 +417,10 @@ Model.createUpdates = function(deltas, cb) {
var updates = [];
var Model = this;
var tasks = [];
var type = change.type();
deltas.forEach(function(change) {
change = new Change(change);
var change = new Change(change);
var type = change.type();
var update = {type: type, change: change};
switch(type) {
case Change.CREATE:
@ -339,7 +428,11 @@ Model.createUpdates = function(deltas, cb) {
tasks.push(function(cb) {
Model.findById(change.modelId, function(err, inst) {
if(err) return cb(err);
update.data = inst;
if(inst.toObject) {
update.data = inst.toObject();
} else {
update.data = inst;
}
updates.push(update);
cb();
});
@ -369,16 +462,22 @@ Model.createUpdates = function(deltas, cb) {
Model.bulkUpdate = function(updates, callback) {
var tasks = [];
var Model = this;
var idName = Model.idName();
var idName = this.dataSource.idName(this.modelName);
var Change = this.getChangeModel();
updates.forEach(function(update) {
switch(update.type) {
case Change.UPDATE:
case Change.CREATE:
tasks.push(Model.upsert.bind(Model, update.data));
// var model = new Model(update.data);
// tasks.push(model.save.bind(model));
tasks.push(function(cb) {
var model = new Model(update.data);
debugger;
model.save(cb);
});
break;
case: Change.DELETE:
case Change.DELETE:
var data = {};
data[idName] = update.change.modelId;
var model = new Model(data);
@ -391,5 +490,58 @@ Model.bulkUpdate = function(updates, callback) {
}
Model.getChangeModel = function() {
var changeModel = this.Change;
if(changeModel) return changeModel;
this.Change = changeModel = require('./change').extend(this.modelName + '-change');
changeModel.attachTo(this.dataSource);
return changeModel;
}
Model.getSourceId = function(cb) {
cb(null, 'foo')
}
/**
* Enable the tracking of changes made to the model. Usually for replication.
*/
Model.enableChangeTracking = function() {
var Model = this;
var Change = Model.getChangeModel();
var cleanupInterval = Model.settings.changeCleanupInterval || 30000;
Model.on('changed', function(obj) {
Change.track(Model.modelName, [obj.id], function(err) {
if(err) {
console.error(Model.modelName + ' Change Tracking Error:');
console.error(err);
}
});
});
Model.on('deleted', function(obj) {
Change.track(Model.modelName, [obj.id], function(err) {
if(err) {
console.error(Model.modelName + ' Change Tracking Error:');
console.error(err);
}
});
});
Model.on('deletedAll', cleanup);
// initial cleanup
cleanup();
// cleanup
setInterval(cleanup, cleanupInterval);
function cleanup() {
Change.rectifyAll(function(err) {
if(err) {
console.error(Model.modelName + ' Change Cleanup Error:');
console.error(err);
}
});
}
}