diff --git a/lib/models/change.js b/lib/models/change.js index 7a09373d..410bab7a 100644 --- a/lib/models/change.js +++ b/lib/models/change.js @@ -179,7 +179,7 @@ Change.prototype.rectify = function(cb) { function updateCheckpoint(cb) { change.constructor.getCheckpointModel().current(function(err, checkpoint) { if(err) return Change.handleError(err); - change.checkpoint = ++checkpoint; + change.checkpoint = checkpoint; cb(); }); } @@ -328,7 +328,7 @@ Change.diff = function(modelName, since, remoteChanges, callback) { where: { modelName: modelName, modelId: {inq: modelIds}, - checkpoint: {gt: since} + checkpoint: {gte: since} } }, function(err, localChanges) { if(err) return callback(err); @@ -400,6 +400,15 @@ Change.handleError = function(err) { } } +Change.prototype.getModelId = function() { + // TODO(ritch) get rid of the need to create an instance + var Model = this.constructor.settings.model; + var id = this.modelId; + var m = new Model(); + m.setId(id); + return m.getId(); +} + /** * When two changes conflict a conflict is created. * diff --git a/lib/models/checkpoint.js b/lib/models/checkpoint.js index 71d4797e..c1fa7d70 100644 --- a/lib/models/checkpoint.js +++ b/lib/models/checkpoint.js @@ -11,8 +11,8 @@ var DataModel = require('../loopback').DataModel */ var properties = { - id: {type: Number, generated: true, id: true}, - time: {type: Number, generated: true, default: Date.now}, + seq: {type: Number}, + time: {type: Date, default: Date}, sourceId: {type: String} }; @@ -45,13 +45,33 @@ var Checkpoint = module.exports = DataModel.extend('Checkpoint', properties, opt */ Checkpoint.current = function(cb) { + var Checkpoint = this; this.find({ limit: 1, - sort: 'id DESC' + sort: 'seq DESC' }, function(err, checkpoints) { if(err) return cb(err); - var checkpoint = checkpoints[0] || {id: 0}; - cb(null, checkpoint.id); + var checkpoint = checkpoints[0]; + if(checkpoint) { + cb(null, checkpoint.seq); + } else { + Checkpoint.create({seq: 0}, function(err, checkpoint) { + if(err) return cb(err); + cb(null, checkpoint.seq); + }); + } }); } +Checkpoint.beforeSave = function(next, model) { + if(!model.getId() && model.seq === undefined) { + model.constructor.current(function(err, seq) { + if(err) return next(err); + model.seq = seq + 1; + next(); + }); + } else { + next(); + } +} + diff --git a/lib/models/data-model.js b/lib/models/data-model.js index 5aae7536..ade8d8bf 100644 --- a/lib/models/data-model.js +++ b/lib/models/data-model.js @@ -3,6 +3,7 @@ */ var Model = require('./model'); +var loopback = require('../loopback'); var RemoteObjects = require('strong-remoting'); var assert = require('assert'); var async = require('async'); @@ -505,7 +506,7 @@ DataModel.setupRemoting = function() { {arg: 'remoteChanges', type: 'array', description: 'an array of change objects', http: {source: 'body'}} ], - returns: {arg: 'deltas', type: 'array', root: true}, + returns: {arg: 'result', type: 'object', root: true}, http: {verb: 'post', path: '/diff'} }); @@ -532,11 +533,29 @@ DataModel.setupRemoting = function() { http: {verb: 'get', path: '/checkpoint'} }); + setRemoting(DataModel.createUpdates, { + description: 'Create an update list from a delta list', + accepts: {arg: 'deltas', type: 'array', http: {source: 'body'}}, + returns: {arg: 'updates', type: 'array', root: true}, + http: {verb: 'post', path: '/create-updates'} + }); + setRemoting(DataModel.bulkUpdate, { description: 'Run multiple updates at once. Note: this is not atomic.', accepts: {arg: 'updates', type: 'array'}, http: {verb: 'post', path: '/bulk-update'} }); + + setRemoting(DataModel.rectifyAllChanges, { + description: 'Rectify all Model changes.', + http: {verb: 'post', path: '/rectify-all'} + }); + + setRemoting(DataModel.rectifyChange, { + description: 'Tell loopback that a change to the model with the given id has occurred.', + accepts: {arg: 'id', type: 'any', http: {source: 'path'}}, + http: {verb: 'post', path: '/:id/rectify-change'} + }); } } @@ -568,6 +587,12 @@ DataModel.diff = function(since, remoteChanges, callback) { */ DataModel.changes = function(since, filter, callback) { + if(typeof since === 'function') { + filter = {}; + callback = since; + since = -1; + } + var idName = this.dataSource.idName(this.modelName); var Change = this.getChangeModel(); var model = this; @@ -577,14 +602,14 @@ DataModel.changes = function(since, filter, callback) { filter.where = filter.where || {}; filter.fields[idName] = true; - // this whole thing could be optimized a bit more + // TODO(ritch) this whole thing could be optimized a bit more Change.find({ checkpoint: {gt: since}, modelName: this.modelName }, function(err, changes) { if(err) return cb(err); var ids = changes.map(function(change) { - return change.modelId.toString(); + return change.getModelId(); }); filter.where[idName] = {inq: ids}; model.find(filter, function(err, models) { @@ -709,7 +734,7 @@ DataModel.replicate = function(since, targetModel, options, callback) { if(diff && diff.deltas && diff.deltas.length) { sourceModel.createUpdates(diff.deltas, cb); } else { - // done + // nothing to replicate callback(null, []); } } @@ -863,44 +888,72 @@ DataModel.enableChangeTracking = function() { Change.getCheckpointModel().attachTo(this.dataSource); Model.on('changed', function(obj) { - Change.rectifyModelChanges(Model.modelName, [obj.id], function(err) { - if(err) { - console.error(Model.modelName + ' Change Tracking Error:'); - console.error(err); - } - }); + Model.rectifyChange(obj.getId(), Model.handleChangeError); }); - Model.on('deleted', function(obj) { - Change.rectifyModelChanges(Model.modelName, [obj.id], function(err) { - if(err) { - console.error(Model.modelName + ' Change Tracking Error:'); - console.error(err); - } - }); + Model.on('deleted', function(id) { + Model.rectifyChange(id, Model.handleChangeError); }); Model.on('deletedAll', cleanup); - // initial cleanup - cleanup(); + if(loopback.isServer) { + // initial cleanup + cleanup(); - // cleanup - setInterval(cleanup, cleanupInterval); + // cleanup + setInterval(cleanup, cleanupInterval); - function cleanup() { - Change.rectifyAll(function(err) { - if(err) { - console.error(Model.modelName + ' Change Cleanup Error:'); - console.error(err); - } - }); + function cleanup() { + Model.rectifyAllChanges(function(err) { + if(err) { + console.error(Model.modelName + ' Change Cleanup Error:'); + console.error(err); + } + }); + } } } DataModel._defineChangeModel = function() { var BaseChangeModel = require('./change'); - return this.Change = BaseChangeModel.extend(this.modelName + '-change'); + return this.Change = BaseChangeModel.extend(this.modelName + '-change', + {}, + { + model: this + } + ); +} + +DataModel.rectifyAllChanges = function(callback) { + this.getChangeModel().rectifyAll(callback); +} + +/** + * Handle a change error. Override this method in a subclassing model to customize + * change error handling. + * + * @param {Error} err + */ + +DataModel.handleChangeError = function(err) { + if(err) { + console.error(Model.modelName + ' Change Tracking Error:'); + console.error(err); + } +} + +/** + * Tell loopback that a change to the model with the given id has occurred. + * + * @param {*} id The id of the model that has changed + * @callback {Function} callback + * @param {Error} err + */ + +DataModel.rectifyChange = function(id, callback) { + var Change = this.getChangeModel(); + Change.rectifyModelChanges(this.modelName, [id], callback); } DataModel.setup();