diff --git a/.jscsrc b/.jscsrc index ed1f2e3e..8deeea1c 100644 --- a/.jscsrc +++ b/.jscsrc @@ -8,6 +8,7 @@ "try", "catch" ], + "disallowMultipleVarDecl": "exceptUndefined", "disallowSpacesInsideObjectBrackets": null, "maximumLineLength": { "value": 150, diff --git a/common/models/change.js b/common/models/change.js index 8c6b7ff9..1ea01cee 100644 --- a/common/models/change.js +++ b/common/models/change.js @@ -189,6 +189,7 @@ module.exports = function(Change) { function updateCheckpoint(cb) { change.constructor.getCheckpointModel().current(function(err, checkpoint) { if (err) return Change.handleError(err); + debug('updated checkpoint to', checkpoint); change.checkpoint = checkpoint; cb(); }); @@ -408,9 +409,10 @@ module.exports = function(Change) { // this should be optimized this.find(function(err, changes) { if (err) return cb(err); - changes.forEach(function(change) { - change.rectify(); - }); + async.each( + changes, + function(c, next) { c.rectify(next); }, + cb); }); }; diff --git a/lib/persisted-model.js b/lib/persisted-model.js index 490ebe7e..6e9d07e5 100644 --- a/lib/persisted-model.js +++ b/lib/persisted-model.js @@ -755,10 +755,10 @@ PersistedModel.changes = function(since, filter, callback) { filter.fields[idName] = true; // TODO(ritch) this whole thing could be optimized a bit more - Change.find({ + Change.find({ where: { checkpoint: {gt: since}, modelName: this.modelName - }, function(err, changes) { + }}, function(err, changes) { if (err) return callback(err); if (!Array.isArray(changes) || changes.length === 0) return callback(null, []); var ids = changes.map(function(change) { @@ -1044,15 +1044,9 @@ PersistedModel.enableChangeTracking = function() { Change.attachTo(this.dataSource); Change.getCheckpointModel().attachTo(this.dataSource); - Model.afterSave = function afterSave(next) { - Model.rectifyChange(this.getId(), next); - }; + Model.observe('after save', rectifyOnSave); - Model.afterDestroy = function afterDestroy(next) { - Model.rectifyChange(this.getId(), next); - }; - - Model.on('deletedAll', cleanup); + Model.observe('after delete', rectifyOnDelete); if (runtime.isServer) { // initial cleanup @@ -1072,6 +1066,54 @@ PersistedModel.enableChangeTracking = function() { } }; +function rectifyOnSave(ctx, next) { + if (ctx.instance) { + ctx.Model.rectifyChange(ctx.instance.getId(), reportErrorAndNext); + } else { + ctx.Model.rectifyAllChanges(reportErrorAndNext); + } + + function reportErrorAndNext(err) { + if (err) { + console.error( + ctx.Model.modelName + '.rectifyChange(s) after save failed:' + err); + } + next(); + } +} + +function rectifyOnDelete(ctx, next) { + var id = getIdFromWhereByModelId(ctx.Model, ctx.where); + if (id) { + ctx.Model.rectifyChange(id, reportErrorAndNext); + } else { + ctx.Model.rectifyAllChanges(reportErrorAndNext); + } + + function reportErrorAndNext(err) { + if (err) { + console.error( + ctx.Model.modelName + '.rectifyChange(s) after delete failed:' + err); + } + next(); + } +} + +function getIdFromWhereByModelId(Model, where) { + var whereKeys = Object.keys(where); + if (whereKeys.length != 1) return undefined; + + var idName = Model.getIdName(); + if (whereKeys[0] !== idName) return undefined; + + var id = where[idName]; + // TODO(bajtos) support object values that are not LB conditions + if (typeof id === 'string' || typeof id === 'number') { + return id; + } + return undefined; +} + PersistedModel._defineChangeModel = function() { var BaseChangeModel = registry.getModel('Change'); assert(BaseChangeModel, diff --git a/test/replication.test.js b/test/replication.test.js index 787a9314..260e75d6 100644 --- a/test/replication.test.js +++ b/test/replication.test.js @@ -4,23 +4,28 @@ var ACL = loopback.ACL; var Change = loopback.Change; var defineModelTestsWithDataSource = require('./util/model-tests'); var PersistedModel = loopback.PersistedModel; +var expect = require('chai').expect; describe('Replication / Change APIs', function() { + var dataSource, SourceModel, TargetModel; + beforeEach(function() { var test = this; - var dataSource = this.dataSource = loopback.createDataSource({ + dataSource = this.dataSource = loopback.createDataSource({ connector: loopback.Memory }); - var SourceModel = this.SourceModel = PersistedModel.extend('SourceModel', {}, { + SourceModel = this.SourceModel = PersistedModel.extend('SourceModel', {}, { trackChanges: true }); SourceModel.attachTo(dataSource); - var TargetModel = this.TargetModel = PersistedModel.extend('TargetModel', {}, { + TargetModel = this.TargetModel = PersistedModel.extend('TargetModel', {}, { trackChanges: true }); TargetModel.attachTo(dataSource); + test.startingCheckpoint = -1; + this.createInitalData = function(cb) { SourceModel.create({name: 'foo'}, function(err, inst) { if (err) return cb(err); @@ -48,6 +53,19 @@ describe('Replication / Change APIs', function() { }, 1); }); }); + + it('excludes changes from older checkpoints', function(done) { + var FUTURE_CHECKPOINT = 999; + + SourceModel.create({ name: 'foo' }, function(err) { + if (err) return done(err); + SourceModel.changes(FUTURE_CHECKPOINT, {}, function(err, changes) { + if (err) return done(err); + expect(changes).to.be.empty(); + done(); + }); + }); + }); }); describe('Model.replicate(since, targetModel, options, callback)', function() { @@ -340,4 +358,137 @@ describe('Replication / Change APIs', function() { assert(!this.conflict); }); }); + + describe('change detection', function() { + it('detects "create"', function(done) { + SourceModel.create({}, function(err, inst) { + if (err) return done(err); + assertChangeRecordedForId(inst.id, done); + }); + }); + + it('detects "updateOrCreate"', function(done) { + givenReplicatedInstance(function(err, created) { + if (err) return done(err); + var data = created.toObject(); + created.name = 'updated'; + SourceModel.updateOrCreate(created, function(err, inst) { + if (err) return done(err); + assertChangeRecordedForId(inst.id, done); + }); + }); + }); + + it('detects "findOrCreate"', function(done) { + // make sure we bypass find+create and call the connector directly + SourceModel.dataSource.connector.findOrCreate = + function(model, query, data, callback) { + this.all(model, query, function(err, list) { + if (err || (list && list[0])) + return callback(err, list && list[0], false); + this.create(model, data, function(err) { + callback(err, data, true); + }); + }.bind(this)); + }; + + SourceModel.findOrCreate( + { where: { name: 'does-not-exist' } }, + { name: 'created' }, + function(err, inst) { + if (err) return done(err); + assertChangeRecordedForId(inst.id, done); + }); + }); + + it('detects "deleteById"', function(done) { + givenReplicatedInstance(function(err, inst) { + if (err) return done(err); + SourceModel.deleteById(inst.id, function(err) { + assertChangeRecordedForId(inst.id, done); + }); + }); + }); + + it('detects "deleteAll"', function(done) { + givenReplicatedInstance(function(err, inst) { + if (err) return done(err); + SourceModel.deleteAll({ name: inst.name }, function(err) { + if (err) return done(err); + assertChangeRecordedForId(inst.id, done); + }); + }); + }); + + it('detects "updateAll"', function(done) { + givenReplicatedInstance(function(err, inst) { + if (err) return done(err); + SourceModel.updateAll( + { name: inst.name }, + { name: 'updated' }, + function(err) { + if (err) return done(err); + assertChangeRecordedForId(inst.id, done); + }); + }); + }); + + it('detects "prototype.save"', function(done) { + givenReplicatedInstance(function(err, inst) { + if (err) return done(err); + inst.name = 'updated'; + inst.save(function(err) { + if (err) return done(err); + assertChangeRecordedForId(inst.id, done); + }); + }); + }); + + it('detects "prototype.updateAttributes"', function(done) { + givenReplicatedInstance(function(err, inst) { + if (err) return done(err); + inst.updateAttributes({ name: 'updated' }, function(err) { + if (err) return done(err); + assertChangeRecordedForId(inst.id, done); + }); + }); + }); + + it('detects "prototype.delete"', function(done) { + givenReplicatedInstance(function(err, inst) { + if (err) return done(err); + inst.delete(function(err) { + assertChangeRecordedForId(inst.id, done); + }); + }); + }); + + function givenReplicatedInstance(cb) { + SourceModel.create({ name: 'a-name' }, function(err, inst) { + if (err) return cb(err); + SourceModel.checkpoint(function(err) { + if (err) return cb(err); + cb(null, inst); + }); + }); + } + + function assertChangeRecordedForId(id, cb) { + SourceModel.getChangeModel().getCheckpointModel() + .current(function(err, cp) { + if (err) return cb(err); + SourceModel.changes(cp - 1, {}, function(err, pendingChanges) { + if (err) return cb(err); + expect(pendingChanges, 'list of changes').to.have.length(1); + var change = pendingChanges[0].toObject(); + expect(change).to.have.property('checkpoint', cp); // sanity check + expect(change).to.have.property('modelName', SourceModel.modelName); + // NOTE(bajtos) Change.modelId is always String + // regardless of the type of the changed model's id property + expect(change).to.have.property('modelId', '' + id); + cb(); + }); + }); + } + }); });