Add Checkpoint model and Model replication methods
This commit is contained in:
parent
cc49d675ce
commit
e3d80058dc
|
@ -0,0 +1,56 @@
|
|||
/**
|
||||
* Module Dependencies.
|
||||
*/
|
||||
|
||||
var Model = require('../loopback').Model
|
||||
, loopback = require('../loopback')
|
||||
, assert = require('assert');
|
||||
|
||||
/**
|
||||
* Properties
|
||||
*/
|
||||
|
||||
var properties = {
|
||||
id: {type: Number, generated: true, id: true},
|
||||
time: {type: Number, generated: true, default: Date.now},
|
||||
sourceId: {type: String}
|
||||
};
|
||||
|
||||
/**
|
||||
* Options
|
||||
*/
|
||||
|
||||
var options = {
|
||||
|
||||
};
|
||||
|
||||
/**
|
||||
* Checkpoint list entry.
|
||||
*
|
||||
* @property id {Number} the sequencial identifier of a checkpoint
|
||||
* @property time {Number} the time when the checkpoint was created
|
||||
* @property sourceId {String} the source identifier
|
||||
*
|
||||
* @class
|
||||
* @inherits {Model}
|
||||
*/
|
||||
|
||||
var Checkpoint = module.exports = Model.extend('Checkpoint', properties, options);
|
||||
|
||||
/**
|
||||
* Get the current checkpoint id
|
||||
* @callback {Function} callback
|
||||
* @param {Error} err
|
||||
* @param {Number} checkpointId The current checkpoint id
|
||||
*/
|
||||
|
||||
Checkpoint.current = function(cb) {
|
||||
this.find({
|
||||
limit: 1,
|
||||
sort: 'id DESC'
|
||||
}, function(err, checkpoint) {
|
||||
if(err) return cb(err);
|
||||
cb(null, checkpoint.id);
|
||||
});
|
||||
}
|
||||
|
|
@ -134,6 +134,7 @@ function getACL() {
|
|||
* @param {String|Error} err The error object
|
||||
* @param {Boolean} allowed is the request allowed
|
||||
*/
|
||||
|
||||
Model.checkAccess = function(token, modelId, method, callback) {
|
||||
var ANONYMOUS = require('./access-token').ANONYMOUS;
|
||||
token = token || ANONYMOUS;
|
||||
|
@ -193,3 +194,205 @@ Model._getAccessTypeForMethod = function(method) {
|
|||
// setup the initial model
|
||||
Model.setup();
|
||||
|
||||
/**
|
||||
* Get a set of deltas and conflicts since the given checkpoint.
|
||||
*
|
||||
* See `Change.diff()` for details.
|
||||
*
|
||||
* @param {Number} since Find changes since this checkpoint
|
||||
* @param {Array} remoteChanges An array of change objects
|
||||
* @param {Function} callback
|
||||
*/
|
||||
|
||||
Model.diff = function(since, remoteChanges, callback) {
|
||||
var Change = this.getChangeModel();
|
||||
Change.diff(this.modelName, since, remoteChanges, callback);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the changes to a model since a given checkpoing. Provide a filter object
|
||||
* to reduce the number of results returned.
|
||||
* @param {Number} since Only return changes since this checkpoint
|
||||
* @param {Object} filter Only include changes that match this filter
|
||||
* (same as `Model.find(filter, ...)`)
|
||||
* @callback {Function} callback
|
||||
* @param {Error} err
|
||||
* @param {Array} changes An array of `Change` objects
|
||||
* @end
|
||||
*/
|
||||
|
||||
Model.changes = function(since, filter, callback) {
|
||||
var idName = this.idName();
|
||||
var Change = this.getChangeModel();
|
||||
var model = this;
|
||||
|
||||
filter = filter || {};
|
||||
filter.fields = {};
|
||||
filter.where = filter.where || {};
|
||||
filter.fields[idName] = true;
|
||||
|
||||
// this whole thing could be optimized a bit more
|
||||
Change.find({
|
||||
checkpoint: {gt: since},
|
||||
modelName: this.modelName
|
||||
}, function(err, changes) {
|
||||
if(err) return cb(err);
|
||||
var ids = changes.map(function(change) {
|
||||
return change.modelId;
|
||||
});
|
||||
filter.where[idName] = {inq: ids};
|
||||
model.find(filter, function(err, models) {
|
||||
if(err) return cb(err);
|
||||
var modelIds = models.map(function(m) {
|
||||
return m[idName];
|
||||
});
|
||||
callback(null, changes.filter(function(ch) {
|
||||
return modelIds.indexOf(ch.modelId) > -1;
|
||||
}));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a checkpoint.
|
||||
*
|
||||
* @param {Function} callback
|
||||
*/
|
||||
|
||||
Model.checkpoint = function(cb) {
|
||||
var Checkpoint = this.getChangeModel().Checkpoint;
|
||||
this.getSourceId(function(err, sourceId) {
|
||||
if(err) return cb(err);
|
||||
Checkpoint.create({
|
||||
sourceId: sourceId
|
||||
}, cb);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Replicate changes since the given checkpoint to the given target model.
|
||||
*
|
||||
* @param {Number} since Since this checkpoint
|
||||
* @param {Model} targetModel Target this model class
|
||||
* @options {Object} options
|
||||
* @property {Object} filter Replicate models that match this filter
|
||||
* @callback {Function} callback
|
||||
* @param {Error} err
|
||||
* @param {Array} conflicts A list of changes that could not be replicated
|
||||
* due to conflicts.
|
||||
*/
|
||||
|
||||
Model.replicate = function(since, targetModel, options, callback) {
|
||||
var sourceModel = this;
|
||||
var diff;
|
||||
var updates;
|
||||
|
||||
var tasks = [
|
||||
getLocalChanges,
|
||||
getDiffFromTarget,
|
||||
createSourceUpdates,
|
||||
bulkUpdate,
|
||||
sourceModel.checkpoint.bind(sourceModel)
|
||||
];
|
||||
|
||||
async.waterfall(tasks, function(err) {
|
||||
if(err) return callback(err);
|
||||
callback(null, diff.conflicts);
|
||||
});
|
||||
|
||||
function getLocalChanges(cb) {
|
||||
sourceModel.changes(since, options.filter, cb);
|
||||
}
|
||||
|
||||
function getDiffFromTarget(sourceChanges, cb) {
|
||||
targetModel.diff(since, sourceChanges, cb);
|
||||
}
|
||||
|
||||
function createSourceUpdates(_diff, cb) {
|
||||
diff = _diff;
|
||||
sourceModel.createUpdates(diff.deltas, cb);
|
||||
}
|
||||
|
||||
function bulkUpdate(updates, cb) {
|
||||
targetModel.bulkUpdate(updates, cb);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an update list (for `Model.bulkUpdate()`) from a delta list
|
||||
* (result of `Change.diff()`).
|
||||
*
|
||||
* @param {Array} deltas
|
||||
* @param {Function} callback
|
||||
*/
|
||||
|
||||
Model.createUpdates = function(deltas, cb) {
|
||||
var Change = this.getChangeModel();
|
||||
var updates = [];
|
||||
var Model = this;
|
||||
var tasks = [];
|
||||
var type = change.type();
|
||||
|
||||
deltas.forEach(function(change) {
|
||||
change = new Change(change);
|
||||
var update = {type: type, change: change};
|
||||
switch(type) {
|
||||
case Change.CREATE:
|
||||
case Change.UPDATE:
|
||||
tasks.push(function(cb) {
|
||||
Model.findById(change.modelId, function(err, inst) {
|
||||
if(err) return cb(err);
|
||||
update.data = inst;
|
||||
updates.push(update);
|
||||
cb();
|
||||
});
|
||||
});
|
||||
break;
|
||||
case Change.DELETE:
|
||||
updates.push(update);
|
||||
break;
|
||||
}
|
||||
});
|
||||
|
||||
async.parallel(tasks, function(err) {
|
||||
if(err) return cb(err);
|
||||
cb(null, updates);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply an update list.
|
||||
*
|
||||
* **Note: this is not atomic**
|
||||
*
|
||||
* @param {Array} updates An updates list (usually from Model.createUpdates())
|
||||
* @param {Function} callback
|
||||
*/
|
||||
|
||||
Model.bulkUpdate = function(updates, callback) {
|
||||
var tasks = [];
|
||||
var Model = this;
|
||||
var idName = Model.idName();
|
||||
var Change = this.getChangeModel();
|
||||
|
||||
updates.forEach(function(update) {
|
||||
switch(update.type) {
|
||||
case Change.UPDATE:
|
||||
case Change.CREATE:
|
||||
tasks.push(Model.upsert.bind(Model, update.data));
|
||||
break;
|
||||
case: Change.DELETE:
|
||||
var data = {};
|
||||
data[idName] = update.change.modelId;
|
||||
var model = new Model(data);
|
||||
tasks.push(model.destroy.bind(model));
|
||||
break;
|
||||
}
|
||||
});
|
||||
|
||||
async.parallel(tasks, callback);
|
||||
}
|
||||
|
||||
Model.getChangeModel = function() {
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue