diff --git a/common/models/change.js b/common/models/change.js index 572ed43f..afd6625d 100644 --- a/common/models/change.js +++ b/common/models/change.js @@ -126,7 +126,7 @@ module.exports = function(Change) { modelId: modelId }); ch.debug('creating change'); - ch.save(callback); + Change.updateOrCreate(ch, callback); } }); }; @@ -248,6 +248,7 @@ module.exports = function(Change) { */ Change.revisionForInst = function(inst) { + assert(inst, 'Change.revisionForInst() requires an instance object.'); return this.hash(CJSON.stringify(inst)); }; @@ -370,15 +371,18 @@ module.exports = function(Change) { this.find({ where: { modelName: modelName, - modelId: {inq: modelIds}, - checkpoint: {gte: since} + modelId: {inq: modelIds} } - }, function(err, localChanges) { + }, function(err, allLocalChanges) { if (err) return callback(err); var deltas = []; var conflicts = []; var localModelIds = []; + var localChanges = allLocalChanges.filter(function(c) { + return c.checkpoint >= since; + }); + localChanges.forEach(function(localChange) { localChange = new Change(localChange); localModelIds.push(localChange.modelId); @@ -396,9 +400,20 @@ module.exports = function(Change) { }); modelIds.forEach(function(id) { - if (localModelIds.indexOf(id) === -1) { - deltas.push(remoteChangeIndex[id]); + if (localModelIds.indexOf(id) !== -1) return; + + var d = remoteChangeIndex[id]; + var oldChange = allLocalChanges.filter(function(c) { + return c.modelId === id; + })[0]; + + if (oldChange) { + d.prev = oldChange.rev; + } else { + d.prev = null; } + + deltas.push(d); }); callback(null, { diff --git a/lib/persisted-model.js b/lib/persisted-model.js index 35c60a84..b33a0cb0 100644 --- a/lib/persisted-model.js +++ b/lib/persisted-model.js @@ -952,7 +952,20 @@ function tryReplicate(sourceModel, targetModel, since, options, callback) { function bulkUpdate(_updates, cb) { debug('\tstarting bulk update'); updates = _updates; - targetModel.bulkUpdate(updates, cb); + targetModel.bulkUpdate(updates, function(err) { + var conflicts = err && err.details && err.details.conflicts; + if (conflicts && err.statusCode == 409) { + diff.conflicts = conflicts; + // filter out updates that were not applied + updates = updates.filter(function(u) { + return conflicts + .filter(function(d) { return d.modelId === u.change.modelId; }) + .length === 0; + }); + return cb(); + } + cb(err); + }); } function checkpoints() { @@ -974,7 +987,7 @@ function tryReplicate(sourceModel, targetModel, since, options, callback) { debug('\treplication finished'); debug('\t\t%s conflict(s) detected', diff.conflicts.length); - debug('\t\t%s change(s) applied', updates && updates.length); + debug('\t\t%s change(s) applied', updates ? updates.length : 0); debug('\t\tnew checkpoints: { source: %j, target: %j }', newSourceCp, newTargetCp); @@ -1058,31 +1071,197 @@ PersistedModel.createUpdates = function(deltas, cb) { PersistedModel.bulkUpdate = function(updates, callback) { var tasks = []; var Model = this; - var idName = this.dataSource.idName(this.modelName); var Change = this.getChangeModel(); + var conflicts = []; - updates.forEach(function(update) { - switch (update.type) { - case Change.UPDATE: - case Change.CREATE: - // var model = new Model(update.data); - // tasks.push(model.save.bind(model)); - tasks.push(function(cb) { - var model = new Model(update.data); - model.save(cb); - }); - break; - case Change.DELETE: - var data = {}; - data[idName] = update.change.modelId; - var model = new Model(data); - tasks.push(model.destroy.bind(model)); - break; + buildLookupOfAffectedModelData(Model, updates, function(err, currentMap) { + if (err) return callback(err); + + updates.forEach(function(update) { + var id = update.change.modelId; + var current = currentMap[id]; + switch (update.type) { + case Change.UPDATE: + tasks.push(function(cb) { + applyUpdate(Model, id, current, update.data, update.change, conflicts, cb); + }); + break; + + case Change.CREATE: + tasks.push(function(cb) { + applyCreate(Model, id, current, update.data, update.change, conflicts, cb); + }); + break; + case Change.DELETE: + tasks.push(function(cb) { + applyDelete(Model, id, current, update.change, conflicts, cb); + }); + break; + } + }); + + async.parallel(tasks, function(err) { + if (err) return callback(err); + if (conflicts.length) { + err = new Error('Conflict'); + err.statusCode = 409; + err.details = { conflicts: conflicts }; + return callback(err); + } + callback(); + }); + }); +}; + +function buildLookupOfAffectedModelData(Model, updates, callback) { + var idName = Model.dataSource.idName(Model.modelName); + var affectedIds = updates.map(function(u) { return u.change.modelId; }); + var whereAffected = {}; + whereAffected[idName] = { inq: affectedIds }; + Model.find({ where: whereAffected }, function(err, affectedList) { + if (err) return callback(err); + var dataLookup = {}; + affectedList.forEach(function(it) { + dataLookup[it[idName]] = it; + }); + callback(null, dataLookup); + }); +} + +function applyUpdate(Model, id, current, data, change, conflicts, cb) { + var Change = Model.getChangeModel(); + var rev = current ? Change.revisionForInst(current) : null; + + if (rev !== change.prev) { + debug('Detected non-rectified change of %s %j', + Model.modelName, id); + debug('\tExpected revision: %s', change.rev); + debug('\tActual revision: %s', rev); + conflicts.push(change); + return Change.rectifyModelChanges(Model.modelName, [id], cb); + } + + // TODO(bajtos) modify `data` so that it instructs + // the connector to remove any properties included in "inst" + // but not included in `data` + // See https://github.com/strongloop/loopback/issues/1215 + + Model.updateAll(current.toObject(), data, function(err, result) { + if (err) return cb(err); + + var count = result && result.count; + switch (count) { + case 1: + // The happy path, exactly one record was updated + return cb(); + + case 0: + debug('UpdateAll detected non-rectified change of %s %j', + Model.modelName, id); + conflicts.push(change); + // NOTE(bajtos) updateAll triggers change rectification + // for all model instances, even when no records were updated, + // thus we don't need to rectify explicitly ourselves + return cb(); + + case undefined: + case null: + return cb(new Error( + 'Cannot apply bulk updates, ' + + 'the connector does not correctly report ' + + 'the number of updated records.')); + + default: + debug('%s.updateAll modified unexpected number of instances: %j', + Model.modelName, count); + return cb(new Error( + 'Bulk update failed, the connector has modified unexpected ' + + 'number of records: ' + JSON.stringify(count))); } }); +} - async.parallel(tasks, callback); -}; +function applyCreate(Model, id, current, data, change, conflicts, cb) { + Model.create(data, function(createErr) { + if (!createErr) return cb(); + + // We don't have a reliable way how to detect the situation + // where he model was not create because of a duplicate id + // The workaround is to query the DB to check if the model already exists + Model.findById(id, function(findErr, inst) { + if (findErr || !inst) { + // There isn't any instance with the same id, thus there isn't + // any conflict and we just report back the original error. + return cb(createErr); + } + + return conflict(); + }); + }); + + function conflict() { + // The instance already exists - report a conflict + debug('Detected non-rectified new instance of %s %j', + Model.modelName, id); + conflicts.push(change); + + var Change = Model.getChangeModel(); + return Change.rectifyModelChanges(Model.modelName, [id], cb); + } +} + +function applyDelete(Model, id, current, change, conflicts, cb) { + if (!current) { + // The instance was either already deleted or not created at all, + // we are done. + return cb(); + } + + var Change = Model.getChangeModel(); + var rev = Change.revisionForInst(current); + if (rev !== change.prev) { + debug('Detected non-rectified change of %s %j', + Model.modelName, id); + debug('\tExpected revision: %s', change.rev); + debug('\tActual revision: %s', rev); + conflicts.push(change); + return Change.rectifyModelChanges(Model.modelName, [id], cb); + } + + Model.deleteAll(current.toObject(), function(err, result) { + if (err) return cb(err); + + var count = result && result.count; + switch (count) { + case 1: + // The happy path, exactly one record was updated + return cb(); + + case 0: + debug('DeleteAll detected non-rectified change of %s %j', + Model.modelName, id); + conflicts.push(change); + // NOTE(bajtos) deleteAll triggers change rectification + // for all model instances, even when no records were updated, + // thus we don't need to rectify explicitly ourselves + return cb(); + + case undefined: + case null: + return cb(new Error( + 'Cannot apply bulk updates, ' + + 'the connector does not correctly report ' + + 'the number of deleted records.')); + + default: + debug('%s.deleteAll modified unexpected number of instances: %j', + Model.modelName, count); + return cb(new Error( + 'Bulk update failed, the connector has deleted unexpected ' + + 'number of records: ' + JSON.stringify(count))); + } + }); +} /** * Get the `Change` model. diff --git a/test/change.test.js b/test/change.test.js index 74055b52..02a059df 100644 --- a/test/change.test.js +++ b/test/change.test.js @@ -1,4 +1,5 @@ var async = require('async'); +var expect = require('chai').expect; var Change; var TestModel; @@ -134,11 +135,16 @@ describe('Change', function() { describe('change.rectify(callback)', function() { var change; - beforeEach(function() { - change = new Change({ - modelName: this.modelName, - modelId: this.modelId - }); + beforeEach(function(done) { + Change.findOrCreate( + { + modelName: this.modelName, + modelId: this.modelId + }, + function(err, ch) { + change = ch; + done(err); + }); }); it('should create a new change with the correct revision', function(done) { @@ -344,10 +350,89 @@ describe('Change', function() { ]; Change.diff(this.modelName, 0, remoteChanges, function(err, diff) { + if (err) return done(err); assert.equal(diff.deltas.length, 1); assert.equal(diff.conflicts.length, 1); done(); }); }); + + it('should set "prev" to local revision in non-conflicting delta', function(done) { + var updateRecord = { + rev: 'foo-new', + prev: 'foo', + modelName: this.modelName, + modelId: '9', + checkpoint: 2 + }; + Change.diff(this.modelName, 0, [updateRecord], function(err, diff) { + if (err) return done(err); + expect(diff.conflicts, 'conflicts').to.have.length(0); + expect(diff.deltas, 'deltas').to.have.length(1); + var actual = diff.deltas[0].toObject(); + delete actual.id; + expect(actual).to.eql({ + checkpoint: 2, + modelId: '9', + modelName: updateRecord.modelName, + prev: 'foo', // this is the current local revision + rev: 'foo-new', + }); + done(); + }); + }); + + it('should set "prev" to local revision in remote-only delta', function(done) { + var updateRecord = { + rev: 'foo-new', + prev: 'foo-prev', + modelName: this.modelName, + modelId: '9', + checkpoint: 2 + }; + // IMPORTANT: the diff call excludes the local change + // with rev=foo CP=1 + Change.diff(this.modelName, 2, [updateRecord], function(err, diff) { + if (err) return done(err); + expect(diff.conflicts, 'conflicts').to.have.length(0); + expect(diff.deltas, 'deltas').to.have.length(1); + var actual = diff.deltas[0].toObject(); + delete actual.id; + expect(actual).to.eql({ + checkpoint: 2, + modelId: '9', + modelName: updateRecord.modelName, + prev: 'foo', // this is the current local revision + rev: 'foo-new', + }); + done(); + }); + }); + + it('should set "prev" to null for a new instance', function(done) { + var updateRecord = { + rev: 'new-rev', + prev: 'new-prev', + modelName: this.modelName, + modelId: 'new-id', + checkpoint: 2 + }; + + Change.diff(this.modelName, 0, [updateRecord], function(err, diff) { + if (err) return done(err); + expect(diff.conflicts).to.have.length(0); + expect(diff.deltas).to.have.length(1); + var actual = diff.deltas[0].toObject(); + delete actual.id; + expect(actual).to.eql({ + checkpoint: 2, + modelId: 'new-id', + modelName: updateRecord.modelName, + prev: null, // this is the current local revision + rev: 'new-rev', + }); + done(); + }); + }); }); }); diff --git a/test/replication.test.js b/test/replication.test.js index 8bf5dcbd..6c875d25 100644 --- a/test/replication.test.js +++ b/test/replication.test.js @@ -88,12 +88,10 @@ describe('Replication / Change APIs', function() { var targetData; this.SourceModel.create({name: 'foo'}, function(err) { - setTimeout(replicate, 100); - }); - - function replicate() { + if (err) return done(err); test.SourceModel.replicate(test.startingCheckpoint, test.TargetModel, options, function(err, conflicts) { + if (err) return done(err); assert(conflicts.length === 0); async.parallel([ function(cb) { @@ -117,7 +115,7 @@ describe('Replication / Change APIs', function() { done(); }); }); - } + }); }); it('applies "since" filter on source changes', function(done) { @@ -179,18 +177,11 @@ describe('Replication / Change APIs', function() { }); it('picks up changes made during replication', function(done) { - var bulkUpdate = TargetModel.bulkUpdate; - TargetModel.bulkUpdate = function(data, cb) { - var self = this; + setupRaceConditionInReplication(function(cb) { // simulate the situation when another model is created // while a replication run is in progress - SourceModel.create({ id: 'racer' }, function(err) { - if (err) return cb(err); - bulkUpdate.call(self, data, cb); - }); - // create the new model only once - TargetModel.bulkUpdate = bulkUpdate; - }; + SourceModel.create({ id: 'racer' }, cb); + }); var lastCp; async.series([ @@ -291,6 +282,128 @@ describe('Replication / Change APIs', function() { } ], done); }); + + describe('with 3rd-party changes', function() { + it('detects UPDATE made during UPDATE', function(done) { + async.series([ + createModel(SourceModel, { id: '1' }), + replicateExpectingSuccess(), + function updateModel(next) { + SourceModel.updateAll({ id: '1' }, { name: 'source' }, next); + }, + function replicateWith3rdPartyModifyingData(next) { + setupRaceConditionInReplication(function(cb) { + TargetModel.dataSource.connector.updateAttributes( + TargetModel.modelName, + '1', + { name: '3rd-party' }, + cb); + }); + + SourceModel.replicate( + TargetModel, + function(err, conflicts, cps, updates) { + if (err) return next(err); + + var conflictedIds = getPropValue(conflicts || [], 'modelId'); + expect(conflictedIds).to.eql(['1']); + + // resolve the conflict using ours + conflicts[0].resolve(next); + }); + }, + + replicateExpectingSuccess(), + verifyInstanceWasReplicated(SourceModel, TargetModel, '1') + ], done); + }); + + it('detects CREATE made during CREATE', function(done) { + async.series([ + // FIXME(bajtos) Remove the 'name' property once the implementation + // of UPDATE is fixed to correctly remove properties + createModel(SourceModel, { id: '1', name: 'source' }), + function replicateWith3rdPartyModifyingData(next) { + setupRaceConditionInReplication(function(cb) { + TargetModel.dataSource.connector.create( + TargetModel.modelName, + { id: '1', name: '3rd-party' }, + cb); + }); + + SourceModel.replicate( + TargetModel, + function(err, conflicts, cps, updates) { + if (err) return next(err); + + var conflictedIds = getPropValue(conflicts || [], 'modelId'); + expect(conflictedIds).to.eql(['1']); + + // resolve the conflict using ours + conflicts[0].resolve(next); + }); + }, + + replicateExpectingSuccess(), + verifyInstanceWasReplicated(SourceModel, TargetModel, '1') + ], done); + }); + + it('detects UPDATE made during DELETE', function(done) { + async.series([ + createModel(SourceModel, { id: '1' }), + replicateExpectingSuccess(), + function deleteModel(next) { + SourceModel.deleteById('1', next); + }, + function replicateWith3rdPartyModifyingData(next) { + setupRaceConditionInReplication(function(cb) { + TargetModel.dataSource.connector.updateAttributes( + TargetModel.modelName, + '1', + { name: '3rd-party' }, + cb); + }); + + SourceModel.replicate( + TargetModel, + function(err, conflicts, cps, updates) { + if (err) return next(err); + + var conflictedIds = getPropValue(conflicts || [], 'modelId'); + expect(conflictedIds).to.eql(['1']); + + // resolve the conflict using ours + conflicts[0].resolve(next); + }); + }, + + replicateExpectingSuccess(), + verifyInstanceWasReplicated(SourceModel, TargetModel, '1') + ], done); + }); + + it('handles DELETE made during DELETE', function(done) { + async.series([ + createModel(SourceModel, { id: '1' }), + replicateExpectingSuccess(), + function deleteModel(next) { + SourceModel.deleteById('1', next); + }, + function setup3rdPartyModifyingData(next) { + setupRaceConditionInReplication(function(cb) { + TargetModel.dataSource.connector.destroy( + TargetModel.modelName, + '1', + cb); + }); + next(); + }, + replicateExpectingSuccess(), + verifyInstanceWasReplicated(SourceModel, TargetModel, '1') + ], done); + }); + }); }); describe('conflict detection - both updated', function() { @@ -878,7 +991,7 @@ describe('Replication / Change APIs', function() { function resolveManually(conflict, cb) { conflict.models(function(err, source, target) { if (err) return cb(err); - var m = new conflict.SourceModel(source || target); + var m = source || new conflict.SourceModel(target); m.name = 'manual'; m.save(function(err) { if (err) return cb(err); @@ -897,7 +1010,7 @@ describe('Replication / Change APIs', function() { async.series([ // sync the new model to ClientB sync(ClientB, Server), - verifyInstanceWasReplicated(ClientA, ClientB), + verifyInstanceWasReplicated(ClientA, ClientB, sourceInstanceId), // ClientA makes a change updateSourceInstanceNameTo('a'), @@ -924,7 +1037,7 @@ describe('Replication / Change APIs', function() { // and sync back to ClientA too sync(ClientA, Server), - verifyInstanceWasReplicated(ClientB, ClientA) + verifyInstanceWasReplicated(ClientB, ClientA, sourceInstanceId) ], cb); } @@ -953,6 +1066,7 @@ describe('Replication / Change APIs', function() { function updateSourceInstanceNameTo(value) { return function updateInstance(next) { + debug('update source instance name to %j', value); sourceInstance.name = value; sourceInstance.save(next); }; @@ -960,6 +1074,7 @@ describe('Replication / Change APIs', function() { function deleteSourceInstance(value) { return function deleteInstance(next) { + debug('delete source instance', value); sourceInstance.remove(function(err) { sourceInstance = null; next(err); @@ -978,21 +1093,6 @@ describe('Replication / Change APIs', function() { }); }; } - - function verifyInstanceWasReplicated(source, target) { - return function verify(next) { - source.findById(sourceInstanceId, function(err, expected) { - if (err) return next(err); - target.findById(sourceInstanceId, function(err, actual) { - if (err) return next(err); - expect(actual && actual.toObject()) - .to.eql(expected && expected.toObject()); - debug('replicated instance: %j', actual); - next(); - }); - }); - }; - } }); var _since = {}; @@ -1021,6 +1121,12 @@ describe('Replication / Change APIs', function() { }); } + function createModel(Model, data) { + return function create(next) { + Model.create(data, next); + }; + } + function replicateExpectingSuccess(source, target, since) { if (!source) source = SourceModel; if (!target) target = TargetModel; @@ -1037,6 +1143,37 @@ describe('Replication / Change APIs', function() { }; } + function setupRaceConditionInReplication(fn) { + var bulkUpdate = TargetModel.bulkUpdate; + TargetModel.bulkUpdate = function(data, cb) { + // simulate the situation when a 3rd party modifies the database + // while a replication run is in progress + var self = this; + fn(function(err) { + if (err) return cb(err); + bulkUpdate.call(self, data, cb); + }); + + // apply the 3rd party modification only once + TargetModel.bulkUpdate = bulkUpdate; + }; + } + + function verifyInstanceWasReplicated(source, target, id) { + return function verify(next) { + source.findById(id, function(err, expected) { + if (err) return next(err); + target.findById(id, function(err, actual) { + if (err) return next(err); + expect(actual && actual.toObject()) + .to.eql(expected && expected.toObject()); + debug('replicated instance: %j', actual); + next(); + }); + }); + }; + } + function spyAndStoreSinceArg(Model, methodName, store) { var orig = Model[methodName]; Model[methodName] = function(since) {