Fix issues when using MongoDB for replication

This commit is contained in:
Ritchie Martori 2014-05-09 17:19:32 -07:00
parent 5bf1f76762
commit 908221416e
3 changed files with 118 additions and 36 deletions

View File

@ -179,7 +179,7 @@ Change.prototype.rectify = function(cb) {
function updateCheckpoint(cb) {
change.constructor.getCheckpointModel().current(function(err, checkpoint) {
if(err) return Change.handleError(err);
change.checkpoint = ++checkpoint;
change.checkpoint = checkpoint;
cb();
});
}
@ -328,7 +328,7 @@ Change.diff = function(modelName, since, remoteChanges, callback) {
where: {
modelName: modelName,
modelId: {inq: modelIds},
checkpoint: {gt: since}
checkpoint: {gte: since}
}
}, function(err, localChanges) {
if(err) return callback(err);
@ -400,6 +400,15 @@ Change.handleError = function(err) {
}
}
Change.prototype.getModelId = function() {
// TODO(ritch) get rid of the need to create an instance
var Model = this.constructor.settings.model;
var id = this.modelId;
var m = new Model();
m.setId(id);
return m.getId();
}
/**
* When two changes conflict a conflict is created.
*

View File

@ -11,8 +11,8 @@ var DataModel = require('../loopback').DataModel
*/
var properties = {
id: {type: Number, generated: true, id: true},
time: {type: Number, generated: true, default: Date.now},
seq: {type: Number},
time: {type: Date, default: Date},
sourceId: {type: String}
};
@ -45,13 +45,33 @@ var Checkpoint = module.exports = DataModel.extend('Checkpoint', properties, opt
*/
Checkpoint.current = function(cb) {
var Checkpoint = this;
this.find({
limit: 1,
sort: 'id DESC'
sort: 'seq DESC'
}, function(err, checkpoints) {
if(err) return cb(err);
var checkpoint = checkpoints[0] || {id: 0};
cb(null, checkpoint.id);
var checkpoint = checkpoints[0];
if(checkpoint) {
cb(null, checkpoint.seq);
} else {
Checkpoint.create({seq: 0}, function(err, checkpoint) {
if(err) return cb(err);
cb(null, checkpoint.seq);
});
}
});
}
Checkpoint.beforeSave = function(next, model) {
if(!model.getId() && model.seq === undefined) {
model.constructor.current(function(err, seq) {
if(err) return next(err);
model.seq = seq + 1;
next();
});
} else {
next();
}
}

View File

@ -3,6 +3,7 @@
*/
var Model = require('./model');
var loopback = require('../loopback');
var RemoteObjects = require('strong-remoting');
var assert = require('assert');
var async = require('async');
@ -505,7 +506,7 @@ DataModel.setupRemoting = function() {
{arg: 'remoteChanges', type: 'array', description: 'an array of change objects',
http: {source: 'body'}}
],
returns: {arg: 'deltas', type: 'array', root: true},
returns: {arg: 'result', type: 'object', root: true},
http: {verb: 'post', path: '/diff'}
});
@ -532,11 +533,29 @@ DataModel.setupRemoting = function() {
http: {verb: 'get', path: '/checkpoint'}
});
setRemoting(DataModel.createUpdates, {
description: 'Create an update list from a delta list',
accepts: {arg: 'deltas', type: 'array', http: {source: 'body'}},
returns: {arg: 'updates', type: 'array', root: true},
http: {verb: 'post', path: '/create-updates'}
});
setRemoting(DataModel.bulkUpdate, {
description: 'Run multiple updates at once. Note: this is not atomic.',
accepts: {arg: 'updates', type: 'array'},
http: {verb: 'post', path: '/bulk-update'}
});
setRemoting(DataModel.rectifyAllChanges, {
description: 'Rectify all Model changes.',
http: {verb: 'post', path: '/rectify-all'}
});
setRemoting(DataModel.rectifyChange, {
description: 'Tell loopback that a change to the model with the given id has occurred.',
accepts: {arg: 'id', type: 'any', http: {source: 'path'}},
http: {verb: 'post', path: '/:id/rectify-change'}
});
}
}
@ -568,6 +587,12 @@ DataModel.diff = function(since, remoteChanges, callback) {
*/
DataModel.changes = function(since, filter, callback) {
if(typeof since === 'function') {
filter = {};
callback = since;
since = -1;
}
var idName = this.dataSource.idName(this.modelName);
var Change = this.getChangeModel();
var model = this;
@ -577,14 +602,14 @@ DataModel.changes = function(since, filter, callback) {
filter.where = filter.where || {};
filter.fields[idName] = true;
// this whole thing could be optimized a bit more
// TODO(ritch) this whole thing could be optimized a bit more
Change.find({
checkpoint: {gt: since},
modelName: this.modelName
}, function(err, changes) {
if(err) return cb(err);
var ids = changes.map(function(change) {
return change.modelId.toString();
return change.getModelId();
});
filter.where[idName] = {inq: ids};
model.find(filter, function(err, models) {
@ -709,7 +734,7 @@ DataModel.replicate = function(since, targetModel, options, callback) {
if(diff && diff.deltas && diff.deltas.length) {
sourceModel.createUpdates(diff.deltas, cb);
} else {
// done
// nothing to replicate
callback(null, []);
}
}
@ -863,44 +888,72 @@ DataModel.enableChangeTracking = function() {
Change.getCheckpointModel().attachTo(this.dataSource);
Model.on('changed', function(obj) {
Change.rectifyModelChanges(Model.modelName, [obj.id], function(err) {
if(err) {
console.error(Model.modelName + ' Change Tracking Error:');
console.error(err);
}
});
Model.rectifyChange(obj.getId(), Model.handleChangeError);
});
Model.on('deleted', function(obj) {
Change.rectifyModelChanges(Model.modelName, [obj.id], function(err) {
if(err) {
console.error(Model.modelName + ' Change Tracking Error:');
console.error(err);
}
});
Model.on('deleted', function(id) {
Model.rectifyChange(id, Model.handleChangeError);
});
Model.on('deletedAll', cleanup);
// initial cleanup
cleanup();
if(loopback.isServer) {
// initial cleanup
cleanup();
// cleanup
setInterval(cleanup, cleanupInterval);
// cleanup
setInterval(cleanup, cleanupInterval);
function cleanup() {
Change.rectifyAll(function(err) {
if(err) {
console.error(Model.modelName + ' Change Cleanup Error:');
console.error(err);
}
});
function cleanup() {
Model.rectifyAllChanges(function(err) {
if(err) {
console.error(Model.modelName + ' Change Cleanup Error:');
console.error(err);
}
});
}
}
}
DataModel._defineChangeModel = function() {
var BaseChangeModel = require('./change');
return this.Change = BaseChangeModel.extend(this.modelName + '-change');
return this.Change = BaseChangeModel.extend(this.modelName + '-change',
{},
{
model: this
}
);
}
DataModel.rectifyAllChanges = function(callback) {
this.getChangeModel().rectifyAll(callback);
}
/**
* Handle a change error. Override this method in a subclassing model to customize
* change error handling.
*
* @param {Error} err
*/
DataModel.handleChangeError = function(err) {
if(err) {
console.error(Model.modelName + ' Change Tracking Error:');
console.error(err);
}
}
/**
* Tell loopback that a change to the model with the given id has occurred.
*
* @param {*} id The id of the model that has changed
* @callback {Function} callback
* @param {Error} err
*/
DataModel.rectifyChange = function(id, callback) {
var Change = this.getChangeModel();
Change.rectifyModelChanges(this.modelName, [id], callback);
}
DataModel.setup();