From 702ecc6f72f47c38fbcf3a0f78a4bbf23b21f6d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Fri, 20 Feb 2015 19:28:33 +0100 Subject: [PATCH] Fix change detection & tracking Add unit-tests to verify that all DAO methods correctly create change records. Rework the change detection to use the new operation hooks, this fixes the bugs where operations like "updateOrCreate" did not update change records. --- .jscsrc | 1 + common/models/change.js | 8 +- lib/persisted-model.js | 62 +++++++++++++--- test/replication.test.js | 157 ++++++++++++++++++++++++++++++++++++++- 4 files changed, 212 insertions(+), 16 deletions(-) diff --git a/.jscsrc b/.jscsrc index ed1f2e3e..8deeea1c 100644 --- a/.jscsrc +++ b/.jscsrc @@ -8,6 +8,7 @@ "try", "catch" ], + "disallowMultipleVarDecl": "exceptUndefined", "disallowSpacesInsideObjectBrackets": null, "maximumLineLength": { "value": 150, diff --git a/common/models/change.js b/common/models/change.js index 8c6b7ff9..1ea01cee 100644 --- a/common/models/change.js +++ b/common/models/change.js @@ -189,6 +189,7 @@ module.exports = function(Change) { function updateCheckpoint(cb) { change.constructor.getCheckpointModel().current(function(err, checkpoint) { if (err) return Change.handleError(err); + debug('updated checkpoint to', checkpoint); change.checkpoint = checkpoint; cb(); }); @@ -408,9 +409,10 @@ module.exports = function(Change) { // this should be optimized this.find(function(err, changes) { if (err) return cb(err); - changes.forEach(function(change) { - change.rectify(); - }); + async.each( + changes, + function(c, next) { c.rectify(next); }, + cb); }); }; diff --git a/lib/persisted-model.js b/lib/persisted-model.js index 490ebe7e..6e9d07e5 100644 --- a/lib/persisted-model.js +++ b/lib/persisted-model.js @@ -755,10 +755,10 @@ PersistedModel.changes = function(since, filter, callback) { filter.fields[idName] = true; // TODO(ritch) this whole thing could be optimized a bit more - Change.find({ + Change.find({ where: { checkpoint: {gt: since}, modelName: this.modelName - }, function(err, changes) { + }}, function(err, changes) { if (err) return callback(err); if (!Array.isArray(changes) || changes.length === 0) return callback(null, []); var ids = changes.map(function(change) { @@ -1044,15 +1044,9 @@ PersistedModel.enableChangeTracking = function() { Change.attachTo(this.dataSource); Change.getCheckpointModel().attachTo(this.dataSource); - Model.afterSave = function afterSave(next) { - Model.rectifyChange(this.getId(), next); - }; + Model.observe('after save', rectifyOnSave); - Model.afterDestroy = function afterDestroy(next) { - Model.rectifyChange(this.getId(), next); - }; - - Model.on('deletedAll', cleanup); + Model.observe('after delete', rectifyOnDelete); if (runtime.isServer) { // initial cleanup @@ -1072,6 +1066,54 @@ PersistedModel.enableChangeTracking = function() { } }; +function rectifyOnSave(ctx, next) { + if (ctx.instance) { + ctx.Model.rectifyChange(ctx.instance.getId(), reportErrorAndNext); + } else { + ctx.Model.rectifyAllChanges(reportErrorAndNext); + } + + function reportErrorAndNext(err) { + if (err) { + console.error( + ctx.Model.modelName + '.rectifyChange(s) after save failed:' + err); + } + next(); + } +} + +function rectifyOnDelete(ctx, next) { + var id = getIdFromWhereByModelId(ctx.Model, ctx.where); + if (id) { + ctx.Model.rectifyChange(id, reportErrorAndNext); + } else { + ctx.Model.rectifyAllChanges(reportErrorAndNext); + } + + function reportErrorAndNext(err) { + if (err) { + console.error( + ctx.Model.modelName + '.rectifyChange(s) after delete failed:' + err); + } + next(); + } +} + +function getIdFromWhereByModelId(Model, where) { + var whereKeys = Object.keys(where); + if (whereKeys.length != 1) return undefined; + + var idName = Model.getIdName(); + if (whereKeys[0] !== idName) return undefined; + + var id = where[idName]; + // TODO(bajtos) support object values that are not LB conditions + if (typeof id === 'string' || typeof id === 'number') { + return id; + } + return undefined; +} + PersistedModel._defineChangeModel = function() { var BaseChangeModel = registry.getModel('Change'); assert(BaseChangeModel, diff --git a/test/replication.test.js b/test/replication.test.js index 787a9314..260e75d6 100644 --- a/test/replication.test.js +++ b/test/replication.test.js @@ -4,23 +4,28 @@ var ACL = loopback.ACL; var Change = loopback.Change; var defineModelTestsWithDataSource = require('./util/model-tests'); var PersistedModel = loopback.PersistedModel; +var expect = require('chai').expect; describe('Replication / Change APIs', function() { + var dataSource, SourceModel, TargetModel; + beforeEach(function() { var test = this; - var dataSource = this.dataSource = loopback.createDataSource({ + dataSource = this.dataSource = loopback.createDataSource({ connector: loopback.Memory }); - var SourceModel = this.SourceModel = PersistedModel.extend('SourceModel', {}, { + SourceModel = this.SourceModel = PersistedModel.extend('SourceModel', {}, { trackChanges: true }); SourceModel.attachTo(dataSource); - var TargetModel = this.TargetModel = PersistedModel.extend('TargetModel', {}, { + TargetModel = this.TargetModel = PersistedModel.extend('TargetModel', {}, { trackChanges: true }); TargetModel.attachTo(dataSource); + test.startingCheckpoint = -1; + this.createInitalData = function(cb) { SourceModel.create({name: 'foo'}, function(err, inst) { if (err) return cb(err); @@ -48,6 +53,19 @@ describe('Replication / Change APIs', function() { }, 1); }); }); + + it('excludes changes from older checkpoints', function(done) { + var FUTURE_CHECKPOINT = 999; + + SourceModel.create({ name: 'foo' }, function(err) { + if (err) return done(err); + SourceModel.changes(FUTURE_CHECKPOINT, {}, function(err, changes) { + if (err) return done(err); + expect(changes).to.be.empty(); + done(); + }); + }); + }); }); describe('Model.replicate(since, targetModel, options, callback)', function() { @@ -340,4 +358,137 @@ describe('Replication / Change APIs', function() { assert(!this.conflict); }); }); + + describe('change detection', function() { + it('detects "create"', function(done) { + SourceModel.create({}, function(err, inst) { + if (err) return done(err); + assertChangeRecordedForId(inst.id, done); + }); + }); + + it('detects "updateOrCreate"', function(done) { + givenReplicatedInstance(function(err, created) { + if (err) return done(err); + var data = created.toObject(); + created.name = 'updated'; + SourceModel.updateOrCreate(created, function(err, inst) { + if (err) return done(err); + assertChangeRecordedForId(inst.id, done); + }); + }); + }); + + it('detects "findOrCreate"', function(done) { + // make sure we bypass find+create and call the connector directly + SourceModel.dataSource.connector.findOrCreate = + function(model, query, data, callback) { + this.all(model, query, function(err, list) { + if (err || (list && list[0])) + return callback(err, list && list[0], false); + this.create(model, data, function(err) { + callback(err, data, true); + }); + }.bind(this)); + }; + + SourceModel.findOrCreate( + { where: { name: 'does-not-exist' } }, + { name: 'created' }, + function(err, inst) { + if (err) return done(err); + assertChangeRecordedForId(inst.id, done); + }); + }); + + it('detects "deleteById"', function(done) { + givenReplicatedInstance(function(err, inst) { + if (err) return done(err); + SourceModel.deleteById(inst.id, function(err) { + assertChangeRecordedForId(inst.id, done); + }); + }); + }); + + it('detects "deleteAll"', function(done) { + givenReplicatedInstance(function(err, inst) { + if (err) return done(err); + SourceModel.deleteAll({ name: inst.name }, function(err) { + if (err) return done(err); + assertChangeRecordedForId(inst.id, done); + }); + }); + }); + + it('detects "updateAll"', function(done) { + givenReplicatedInstance(function(err, inst) { + if (err) return done(err); + SourceModel.updateAll( + { name: inst.name }, + { name: 'updated' }, + function(err) { + if (err) return done(err); + assertChangeRecordedForId(inst.id, done); + }); + }); + }); + + it('detects "prototype.save"', function(done) { + givenReplicatedInstance(function(err, inst) { + if (err) return done(err); + inst.name = 'updated'; + inst.save(function(err) { + if (err) return done(err); + assertChangeRecordedForId(inst.id, done); + }); + }); + }); + + it('detects "prototype.updateAttributes"', function(done) { + givenReplicatedInstance(function(err, inst) { + if (err) return done(err); + inst.updateAttributes({ name: 'updated' }, function(err) { + if (err) return done(err); + assertChangeRecordedForId(inst.id, done); + }); + }); + }); + + it('detects "prototype.delete"', function(done) { + givenReplicatedInstance(function(err, inst) { + if (err) return done(err); + inst.delete(function(err) { + assertChangeRecordedForId(inst.id, done); + }); + }); + }); + + function givenReplicatedInstance(cb) { + SourceModel.create({ name: 'a-name' }, function(err, inst) { + if (err) return cb(err); + SourceModel.checkpoint(function(err) { + if (err) return cb(err); + cb(null, inst); + }); + }); + } + + function assertChangeRecordedForId(id, cb) { + SourceModel.getChangeModel().getCheckpointModel() + .current(function(err, cp) { + if (err) return cb(err); + SourceModel.changes(cp - 1, {}, function(err, pendingChanges) { + if (err) return cb(err); + expect(pendingChanges, 'list of changes').to.have.length(1); + var change = pendingChanges[0].toObject(); + expect(change).to.have.property('checkpoint', cp); // sanity check + expect(change).to.have.property('modelName', SourceModel.modelName); + // NOTE(bajtos) Change.modelId is always String + // regardless of the type of the changed model's id property + expect(change).to.have.property('modelId', '' + id); + cb(); + }); + }); + } + }); });