From 91f59e1ccda6605b70256362ce8187d6500fa69b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Thu, 12 Mar 2015 10:47:11 +0100 Subject: [PATCH 1/2] Remove unnecessary delay in tests. --- test/replication.test.js | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/test/replication.test.js b/test/replication.test.js index 8bf5dcbd..b11bd374 100644 --- a/test/replication.test.js +++ b/test/replication.test.js @@ -88,10 +88,7 @@ describe('Replication / Change APIs', function() { var targetData; this.SourceModel.create({name: 'foo'}, function(err) { - setTimeout(replicate, 100); - }); - - function replicate() { + if (err) return done(err); test.SourceModel.replicate(test.startingCheckpoint, test.TargetModel, options, function(err, conflicts) { assert(conflicts.length === 0); @@ -117,7 +114,7 @@ describe('Replication / Change APIs', function() { done(); }); }); - } + }); }); it('applies "since" filter on source changes', function(done) { From 87940a4b58532b187ff967311435777c89941b16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Mon, 16 Mar 2015 13:38:37 +0100 Subject: [PATCH 2/2] Detect 3rd-party changes made during replication Modify `Change.diff()` to include current data revision in each delta reported back. The current data revision is stored in `delta.prev`. Modify `PersistedModel.bulkUpdate()` to check that the current data revision matches `delta.prev` and report a conflict if a third party has modified the database under our hands. Fix `Change` implementation and tests so that they are no longer attempting to create instances with duplicate ids. (This used to work because the memory connector was silently converting such requests to updateOrCreate/findOrCreate.) --- common/models/change.js | 27 +++-- lib/persisted-model.js | 223 +++++++++++++++++++++++++++++++++++---- test/change.test.js | 95 ++++++++++++++++- test/replication.test.js | 196 +++++++++++++++++++++++++++++----- 4 files changed, 480 insertions(+), 61 deletions(-) diff --git a/common/models/change.js b/common/models/change.js index 572ed43f..afd6625d 100644 --- a/common/models/change.js +++ b/common/models/change.js @@ -126,7 +126,7 @@ module.exports = function(Change) { modelId: modelId }); ch.debug('creating change'); - ch.save(callback); + Change.updateOrCreate(ch, callback); } }); }; @@ -248,6 +248,7 @@ module.exports = function(Change) { */ Change.revisionForInst = function(inst) { + assert(inst, 'Change.revisionForInst() requires an instance object.'); return this.hash(CJSON.stringify(inst)); }; @@ -370,15 +371,18 @@ module.exports = function(Change) { this.find({ where: { modelName: modelName, - modelId: {inq: modelIds}, - checkpoint: {gte: since} + modelId: {inq: modelIds} } - }, function(err, localChanges) { + }, function(err, allLocalChanges) { if (err) return callback(err); var deltas = []; var conflicts = []; var localModelIds = []; + var localChanges = allLocalChanges.filter(function(c) { + return c.checkpoint >= since; + }); + localChanges.forEach(function(localChange) { localChange = new Change(localChange); localModelIds.push(localChange.modelId); @@ -396,9 +400,20 @@ module.exports = function(Change) { }); modelIds.forEach(function(id) { - if (localModelIds.indexOf(id) === -1) { - deltas.push(remoteChangeIndex[id]); + if (localModelIds.indexOf(id) !== -1) return; + + var d = remoteChangeIndex[id]; + var oldChange = allLocalChanges.filter(function(c) { + return c.modelId === id; + })[0]; + + if (oldChange) { + d.prev = oldChange.rev; + } else { + d.prev = null; } + + deltas.push(d); }); callback(null, { diff --git a/lib/persisted-model.js b/lib/persisted-model.js index 35c60a84..b33a0cb0 100644 --- a/lib/persisted-model.js +++ b/lib/persisted-model.js @@ -952,7 +952,20 @@ function tryReplicate(sourceModel, targetModel, since, options, callback) { function bulkUpdate(_updates, cb) { debug('\tstarting bulk update'); updates = _updates; - targetModel.bulkUpdate(updates, cb); + targetModel.bulkUpdate(updates, function(err) { + var conflicts = err && err.details && err.details.conflicts; + if (conflicts && err.statusCode == 409) { + diff.conflicts = conflicts; + // filter out updates that were not applied + updates = updates.filter(function(u) { + return conflicts + .filter(function(d) { return d.modelId === u.change.modelId; }) + .length === 0; + }); + return cb(); + } + cb(err); + }); } function checkpoints() { @@ -974,7 +987,7 @@ function tryReplicate(sourceModel, targetModel, since, options, callback) { debug('\treplication finished'); debug('\t\t%s conflict(s) detected', diff.conflicts.length); - debug('\t\t%s change(s) applied', updates && updates.length); + debug('\t\t%s change(s) applied', updates ? updates.length : 0); debug('\t\tnew checkpoints: { source: %j, target: %j }', newSourceCp, newTargetCp); @@ -1058,31 +1071,197 @@ PersistedModel.createUpdates = function(deltas, cb) { PersistedModel.bulkUpdate = function(updates, callback) { var tasks = []; var Model = this; - var idName = this.dataSource.idName(this.modelName); var Change = this.getChangeModel(); + var conflicts = []; - updates.forEach(function(update) { - switch (update.type) { - case Change.UPDATE: - case Change.CREATE: - // var model = new Model(update.data); - // tasks.push(model.save.bind(model)); - tasks.push(function(cb) { - var model = new Model(update.data); - model.save(cb); - }); - break; - case Change.DELETE: - var data = {}; - data[idName] = update.change.modelId; - var model = new Model(data); - tasks.push(model.destroy.bind(model)); - break; + buildLookupOfAffectedModelData(Model, updates, function(err, currentMap) { + if (err) return callback(err); + + updates.forEach(function(update) { + var id = update.change.modelId; + var current = currentMap[id]; + switch (update.type) { + case Change.UPDATE: + tasks.push(function(cb) { + applyUpdate(Model, id, current, update.data, update.change, conflicts, cb); + }); + break; + + case Change.CREATE: + tasks.push(function(cb) { + applyCreate(Model, id, current, update.data, update.change, conflicts, cb); + }); + break; + case Change.DELETE: + tasks.push(function(cb) { + applyDelete(Model, id, current, update.change, conflicts, cb); + }); + break; + } + }); + + async.parallel(tasks, function(err) { + if (err) return callback(err); + if (conflicts.length) { + err = new Error('Conflict'); + err.statusCode = 409; + err.details = { conflicts: conflicts }; + return callback(err); + } + callback(); + }); + }); +}; + +function buildLookupOfAffectedModelData(Model, updates, callback) { + var idName = Model.dataSource.idName(Model.modelName); + var affectedIds = updates.map(function(u) { return u.change.modelId; }); + var whereAffected = {}; + whereAffected[idName] = { inq: affectedIds }; + Model.find({ where: whereAffected }, function(err, affectedList) { + if (err) return callback(err); + var dataLookup = {}; + affectedList.forEach(function(it) { + dataLookup[it[idName]] = it; + }); + callback(null, dataLookup); + }); +} + +function applyUpdate(Model, id, current, data, change, conflicts, cb) { + var Change = Model.getChangeModel(); + var rev = current ? Change.revisionForInst(current) : null; + + if (rev !== change.prev) { + debug('Detected non-rectified change of %s %j', + Model.modelName, id); + debug('\tExpected revision: %s', change.rev); + debug('\tActual revision: %s', rev); + conflicts.push(change); + return Change.rectifyModelChanges(Model.modelName, [id], cb); + } + + // TODO(bajtos) modify `data` so that it instructs + // the connector to remove any properties included in "inst" + // but not included in `data` + // See https://github.com/strongloop/loopback/issues/1215 + + Model.updateAll(current.toObject(), data, function(err, result) { + if (err) return cb(err); + + var count = result && result.count; + switch (count) { + case 1: + // The happy path, exactly one record was updated + return cb(); + + case 0: + debug('UpdateAll detected non-rectified change of %s %j', + Model.modelName, id); + conflicts.push(change); + // NOTE(bajtos) updateAll triggers change rectification + // for all model instances, even when no records were updated, + // thus we don't need to rectify explicitly ourselves + return cb(); + + case undefined: + case null: + return cb(new Error( + 'Cannot apply bulk updates, ' + + 'the connector does not correctly report ' + + 'the number of updated records.')); + + default: + debug('%s.updateAll modified unexpected number of instances: %j', + Model.modelName, count); + return cb(new Error( + 'Bulk update failed, the connector has modified unexpected ' + + 'number of records: ' + JSON.stringify(count))); } }); +} - async.parallel(tasks, callback); -}; +function applyCreate(Model, id, current, data, change, conflicts, cb) { + Model.create(data, function(createErr) { + if (!createErr) return cb(); + + // We don't have a reliable way how to detect the situation + // where he model was not create because of a duplicate id + // The workaround is to query the DB to check if the model already exists + Model.findById(id, function(findErr, inst) { + if (findErr || !inst) { + // There isn't any instance with the same id, thus there isn't + // any conflict and we just report back the original error. + return cb(createErr); + } + + return conflict(); + }); + }); + + function conflict() { + // The instance already exists - report a conflict + debug('Detected non-rectified new instance of %s %j', + Model.modelName, id); + conflicts.push(change); + + var Change = Model.getChangeModel(); + return Change.rectifyModelChanges(Model.modelName, [id], cb); + } +} + +function applyDelete(Model, id, current, change, conflicts, cb) { + if (!current) { + // The instance was either already deleted or not created at all, + // we are done. + return cb(); + } + + var Change = Model.getChangeModel(); + var rev = Change.revisionForInst(current); + if (rev !== change.prev) { + debug('Detected non-rectified change of %s %j', + Model.modelName, id); + debug('\tExpected revision: %s', change.rev); + debug('\tActual revision: %s', rev); + conflicts.push(change); + return Change.rectifyModelChanges(Model.modelName, [id], cb); + } + + Model.deleteAll(current.toObject(), function(err, result) { + if (err) return cb(err); + + var count = result && result.count; + switch (count) { + case 1: + // The happy path, exactly one record was updated + return cb(); + + case 0: + debug('DeleteAll detected non-rectified change of %s %j', + Model.modelName, id); + conflicts.push(change); + // NOTE(bajtos) deleteAll triggers change rectification + // for all model instances, even when no records were updated, + // thus we don't need to rectify explicitly ourselves + return cb(); + + case undefined: + case null: + return cb(new Error( + 'Cannot apply bulk updates, ' + + 'the connector does not correctly report ' + + 'the number of deleted records.')); + + default: + debug('%s.deleteAll modified unexpected number of instances: %j', + Model.modelName, count); + return cb(new Error( + 'Bulk update failed, the connector has deleted unexpected ' + + 'number of records: ' + JSON.stringify(count))); + } + }); +} /** * Get the `Change` model. diff --git a/test/change.test.js b/test/change.test.js index 74055b52..02a059df 100644 --- a/test/change.test.js +++ b/test/change.test.js @@ -1,4 +1,5 @@ var async = require('async'); +var expect = require('chai').expect; var Change; var TestModel; @@ -134,11 +135,16 @@ describe('Change', function() { describe('change.rectify(callback)', function() { var change; - beforeEach(function() { - change = new Change({ - modelName: this.modelName, - modelId: this.modelId - }); + beforeEach(function(done) { + Change.findOrCreate( + { + modelName: this.modelName, + modelId: this.modelId + }, + function(err, ch) { + change = ch; + done(err); + }); }); it('should create a new change with the correct revision', function(done) { @@ -344,10 +350,89 @@ describe('Change', function() { ]; Change.diff(this.modelName, 0, remoteChanges, function(err, diff) { + if (err) return done(err); assert.equal(diff.deltas.length, 1); assert.equal(diff.conflicts.length, 1); done(); }); }); + + it('should set "prev" to local revision in non-conflicting delta', function(done) { + var updateRecord = { + rev: 'foo-new', + prev: 'foo', + modelName: this.modelName, + modelId: '9', + checkpoint: 2 + }; + Change.diff(this.modelName, 0, [updateRecord], function(err, diff) { + if (err) return done(err); + expect(diff.conflicts, 'conflicts').to.have.length(0); + expect(diff.deltas, 'deltas').to.have.length(1); + var actual = diff.deltas[0].toObject(); + delete actual.id; + expect(actual).to.eql({ + checkpoint: 2, + modelId: '9', + modelName: updateRecord.modelName, + prev: 'foo', // this is the current local revision + rev: 'foo-new', + }); + done(); + }); + }); + + it('should set "prev" to local revision in remote-only delta', function(done) { + var updateRecord = { + rev: 'foo-new', + prev: 'foo-prev', + modelName: this.modelName, + modelId: '9', + checkpoint: 2 + }; + // IMPORTANT: the diff call excludes the local change + // with rev=foo CP=1 + Change.diff(this.modelName, 2, [updateRecord], function(err, diff) { + if (err) return done(err); + expect(diff.conflicts, 'conflicts').to.have.length(0); + expect(diff.deltas, 'deltas').to.have.length(1); + var actual = diff.deltas[0].toObject(); + delete actual.id; + expect(actual).to.eql({ + checkpoint: 2, + modelId: '9', + modelName: updateRecord.modelName, + prev: 'foo', // this is the current local revision + rev: 'foo-new', + }); + done(); + }); + }); + + it('should set "prev" to null for a new instance', function(done) { + var updateRecord = { + rev: 'new-rev', + prev: 'new-prev', + modelName: this.modelName, + modelId: 'new-id', + checkpoint: 2 + }; + + Change.diff(this.modelName, 0, [updateRecord], function(err, diff) { + if (err) return done(err); + expect(diff.conflicts).to.have.length(0); + expect(diff.deltas).to.have.length(1); + var actual = diff.deltas[0].toObject(); + delete actual.id; + expect(actual).to.eql({ + checkpoint: 2, + modelId: 'new-id', + modelName: updateRecord.modelName, + prev: null, // this is the current local revision + rev: 'new-rev', + }); + done(); + }); + }); }); }); diff --git a/test/replication.test.js b/test/replication.test.js index b11bd374..6c875d25 100644 --- a/test/replication.test.js +++ b/test/replication.test.js @@ -91,6 +91,7 @@ describe('Replication / Change APIs', function() { if (err) return done(err); test.SourceModel.replicate(test.startingCheckpoint, test.TargetModel, options, function(err, conflicts) { + if (err) return done(err); assert(conflicts.length === 0); async.parallel([ function(cb) { @@ -176,18 +177,11 @@ describe('Replication / Change APIs', function() { }); it('picks up changes made during replication', function(done) { - var bulkUpdate = TargetModel.bulkUpdate; - TargetModel.bulkUpdate = function(data, cb) { - var self = this; + setupRaceConditionInReplication(function(cb) { // simulate the situation when another model is created // while a replication run is in progress - SourceModel.create({ id: 'racer' }, function(err) { - if (err) return cb(err); - bulkUpdate.call(self, data, cb); - }); - // create the new model only once - TargetModel.bulkUpdate = bulkUpdate; - }; + SourceModel.create({ id: 'racer' }, cb); + }); var lastCp; async.series([ @@ -288,6 +282,128 @@ describe('Replication / Change APIs', function() { } ], done); }); + + describe('with 3rd-party changes', function() { + it('detects UPDATE made during UPDATE', function(done) { + async.series([ + createModel(SourceModel, { id: '1' }), + replicateExpectingSuccess(), + function updateModel(next) { + SourceModel.updateAll({ id: '1' }, { name: 'source' }, next); + }, + function replicateWith3rdPartyModifyingData(next) { + setupRaceConditionInReplication(function(cb) { + TargetModel.dataSource.connector.updateAttributes( + TargetModel.modelName, + '1', + { name: '3rd-party' }, + cb); + }); + + SourceModel.replicate( + TargetModel, + function(err, conflicts, cps, updates) { + if (err) return next(err); + + var conflictedIds = getPropValue(conflicts || [], 'modelId'); + expect(conflictedIds).to.eql(['1']); + + // resolve the conflict using ours + conflicts[0].resolve(next); + }); + }, + + replicateExpectingSuccess(), + verifyInstanceWasReplicated(SourceModel, TargetModel, '1') + ], done); + }); + + it('detects CREATE made during CREATE', function(done) { + async.series([ + // FIXME(bajtos) Remove the 'name' property once the implementation + // of UPDATE is fixed to correctly remove properties + createModel(SourceModel, { id: '1', name: 'source' }), + function replicateWith3rdPartyModifyingData(next) { + setupRaceConditionInReplication(function(cb) { + TargetModel.dataSource.connector.create( + TargetModel.modelName, + { id: '1', name: '3rd-party' }, + cb); + }); + + SourceModel.replicate( + TargetModel, + function(err, conflicts, cps, updates) { + if (err) return next(err); + + var conflictedIds = getPropValue(conflicts || [], 'modelId'); + expect(conflictedIds).to.eql(['1']); + + // resolve the conflict using ours + conflicts[0].resolve(next); + }); + }, + + replicateExpectingSuccess(), + verifyInstanceWasReplicated(SourceModel, TargetModel, '1') + ], done); + }); + + it('detects UPDATE made during DELETE', function(done) { + async.series([ + createModel(SourceModel, { id: '1' }), + replicateExpectingSuccess(), + function deleteModel(next) { + SourceModel.deleteById('1', next); + }, + function replicateWith3rdPartyModifyingData(next) { + setupRaceConditionInReplication(function(cb) { + TargetModel.dataSource.connector.updateAttributes( + TargetModel.modelName, + '1', + { name: '3rd-party' }, + cb); + }); + + SourceModel.replicate( + TargetModel, + function(err, conflicts, cps, updates) { + if (err) return next(err); + + var conflictedIds = getPropValue(conflicts || [], 'modelId'); + expect(conflictedIds).to.eql(['1']); + + // resolve the conflict using ours + conflicts[0].resolve(next); + }); + }, + + replicateExpectingSuccess(), + verifyInstanceWasReplicated(SourceModel, TargetModel, '1') + ], done); + }); + + it('handles DELETE made during DELETE', function(done) { + async.series([ + createModel(SourceModel, { id: '1' }), + replicateExpectingSuccess(), + function deleteModel(next) { + SourceModel.deleteById('1', next); + }, + function setup3rdPartyModifyingData(next) { + setupRaceConditionInReplication(function(cb) { + TargetModel.dataSource.connector.destroy( + TargetModel.modelName, + '1', + cb); + }); + next(); + }, + replicateExpectingSuccess(), + verifyInstanceWasReplicated(SourceModel, TargetModel, '1') + ], done); + }); + }); }); describe('conflict detection - both updated', function() { @@ -875,7 +991,7 @@ describe('Replication / Change APIs', function() { function resolveManually(conflict, cb) { conflict.models(function(err, source, target) { if (err) return cb(err); - var m = new conflict.SourceModel(source || target); + var m = source || new conflict.SourceModel(target); m.name = 'manual'; m.save(function(err) { if (err) return cb(err); @@ -894,7 +1010,7 @@ describe('Replication / Change APIs', function() { async.series([ // sync the new model to ClientB sync(ClientB, Server), - verifyInstanceWasReplicated(ClientA, ClientB), + verifyInstanceWasReplicated(ClientA, ClientB, sourceInstanceId), // ClientA makes a change updateSourceInstanceNameTo('a'), @@ -921,7 +1037,7 @@ describe('Replication / Change APIs', function() { // and sync back to ClientA too sync(ClientA, Server), - verifyInstanceWasReplicated(ClientB, ClientA) + verifyInstanceWasReplicated(ClientB, ClientA, sourceInstanceId) ], cb); } @@ -950,6 +1066,7 @@ describe('Replication / Change APIs', function() { function updateSourceInstanceNameTo(value) { return function updateInstance(next) { + debug('update source instance name to %j', value); sourceInstance.name = value; sourceInstance.save(next); }; @@ -957,6 +1074,7 @@ describe('Replication / Change APIs', function() { function deleteSourceInstance(value) { return function deleteInstance(next) { + debug('delete source instance', value); sourceInstance.remove(function(err) { sourceInstance = null; next(err); @@ -975,21 +1093,6 @@ describe('Replication / Change APIs', function() { }); }; } - - function verifyInstanceWasReplicated(source, target) { - return function verify(next) { - source.findById(sourceInstanceId, function(err, expected) { - if (err) return next(err); - target.findById(sourceInstanceId, function(err, actual) { - if (err) return next(err); - expect(actual && actual.toObject()) - .to.eql(expected && expected.toObject()); - debug('replicated instance: %j', actual); - next(); - }); - }); - }; - } }); var _since = {}; @@ -1018,6 +1121,12 @@ describe('Replication / Change APIs', function() { }); } + function createModel(Model, data) { + return function create(next) { + Model.create(data, next); + }; + } + function replicateExpectingSuccess(source, target, since) { if (!source) source = SourceModel; if (!target) target = TargetModel; @@ -1034,6 +1143,37 @@ describe('Replication / Change APIs', function() { }; } + function setupRaceConditionInReplication(fn) { + var bulkUpdate = TargetModel.bulkUpdate; + TargetModel.bulkUpdate = function(data, cb) { + // simulate the situation when a 3rd party modifies the database + // while a replication run is in progress + var self = this; + fn(function(err) { + if (err) return cb(err); + bulkUpdate.call(self, data, cb); + }); + + // apply the 3rd party modification only once + TargetModel.bulkUpdate = bulkUpdate; + }; + } + + function verifyInstanceWasReplicated(source, target, id) { + return function verify(next) { + source.findById(id, function(err, expected) { + if (err) return next(err); + target.findById(id, function(err, actual) { + if (err) return next(err); + expect(actual && actual.toObject()) + .to.eql(expected && expected.toObject()); + debug('replicated instance: %j', actual); + next(); + }); + }); + }; + } + function spyAndStoreSinceArg(Model, methodName, store) { var orig = Model[methodName]; Model[methodName] = function(since) {