Merge pull request #1214 from strongloop/fix/bulkUpdate-race-condition

Detect 3rd-party changes made during replication
This commit is contained in:
Miroslav Bajtoš 2015-03-20 08:30:31 +01:00
commit 7454462526
4 changed files with 482 additions and 66 deletions

View File

@ -126,7 +126,7 @@ module.exports = function(Change) {
modelId: modelId
});
ch.debug('creating change');
ch.save(callback);
Change.updateOrCreate(ch, callback);
}
});
};
@ -248,6 +248,7 @@ module.exports = function(Change) {
*/
Change.revisionForInst = function(inst) {
assert(inst, 'Change.revisionForInst() requires an instance object.');
return this.hash(CJSON.stringify(inst));
};
@ -370,15 +371,18 @@ module.exports = function(Change) {
this.find({
where: {
modelName: modelName,
modelId: {inq: modelIds},
checkpoint: {gte: since}
modelId: {inq: modelIds}
}
}, function(err, localChanges) {
}, function(err, allLocalChanges) {
if (err) return callback(err);
var deltas = [];
var conflicts = [];
var localModelIds = [];
var localChanges = allLocalChanges.filter(function(c) {
return c.checkpoint >= since;
});
localChanges.forEach(function(localChange) {
localChange = new Change(localChange);
localModelIds.push(localChange.modelId);
@ -396,9 +400,20 @@ module.exports = function(Change) {
});
modelIds.forEach(function(id) {
if (localModelIds.indexOf(id) === -1) {
deltas.push(remoteChangeIndex[id]);
if (localModelIds.indexOf(id) !== -1) return;
var d = remoteChangeIndex[id];
var oldChange = allLocalChanges.filter(function(c) {
return c.modelId === id;
})[0];
if (oldChange) {
d.prev = oldChange.rev;
} else {
d.prev = null;
}
deltas.push(d);
});
callback(null, {

View File

@ -952,7 +952,20 @@ function tryReplicate(sourceModel, targetModel, since, options, callback) {
function bulkUpdate(_updates, cb) {
debug('\tstarting bulk update');
updates = _updates;
targetModel.bulkUpdate(updates, cb);
targetModel.bulkUpdate(updates, function(err) {
var conflicts = err && err.details && err.details.conflicts;
if (conflicts && err.statusCode == 409) {
diff.conflicts = conflicts;
// filter out updates that were not applied
updates = updates.filter(function(u) {
return conflicts
.filter(function(d) { return d.modelId === u.change.modelId; })
.length === 0;
});
return cb();
}
cb(err);
});
}
function checkpoints() {
@ -974,7 +987,7 @@ function tryReplicate(sourceModel, targetModel, since, options, callback) {
debug('\treplication finished');
debug('\t\t%s conflict(s) detected', diff.conflicts.length);
debug('\t\t%s change(s) applied', updates && updates.length);
debug('\t\t%s change(s) applied', updates ? updates.length : 0);
debug('\t\tnew checkpoints: { source: %j, target: %j }',
newSourceCp, newTargetCp);
@ -1058,31 +1071,197 @@ PersistedModel.createUpdates = function(deltas, cb) {
PersistedModel.bulkUpdate = function(updates, callback) {
var tasks = [];
var Model = this;
var idName = this.dataSource.idName(this.modelName);
var Change = this.getChangeModel();
var conflicts = [];
updates.forEach(function(update) {
switch (update.type) {
case Change.UPDATE:
case Change.CREATE:
// var model = new Model(update.data);
// tasks.push(model.save.bind(model));
tasks.push(function(cb) {
var model = new Model(update.data);
model.save(cb);
});
break;
case Change.DELETE:
var data = {};
data[idName] = update.change.modelId;
var model = new Model(data);
tasks.push(model.destroy.bind(model));
break;
buildLookupOfAffectedModelData(Model, updates, function(err, currentMap) {
if (err) return callback(err);
updates.forEach(function(update) {
var id = update.change.modelId;
var current = currentMap[id];
switch (update.type) {
case Change.UPDATE:
tasks.push(function(cb) {
applyUpdate(Model, id, current, update.data, update.change, conflicts, cb);
});
break;
case Change.CREATE:
tasks.push(function(cb) {
applyCreate(Model, id, current, update.data, update.change, conflicts, cb);
});
break;
case Change.DELETE:
tasks.push(function(cb) {
applyDelete(Model, id, current, update.change, conflicts, cb);
});
break;
}
});
async.parallel(tasks, function(err) {
if (err) return callback(err);
if (conflicts.length) {
err = new Error('Conflict');
err.statusCode = 409;
err.details = { conflicts: conflicts };
return callback(err);
}
callback();
});
});
};
function buildLookupOfAffectedModelData(Model, updates, callback) {
var idName = Model.dataSource.idName(Model.modelName);
var affectedIds = updates.map(function(u) { return u.change.modelId; });
var whereAffected = {};
whereAffected[idName] = { inq: affectedIds };
Model.find({ where: whereAffected }, function(err, affectedList) {
if (err) return callback(err);
var dataLookup = {};
affectedList.forEach(function(it) {
dataLookup[it[idName]] = it;
});
callback(null, dataLookup);
});
}
function applyUpdate(Model, id, current, data, change, conflicts, cb) {
var Change = Model.getChangeModel();
var rev = current ? Change.revisionForInst(current) : null;
if (rev !== change.prev) {
debug('Detected non-rectified change of %s %j',
Model.modelName, id);
debug('\tExpected revision: %s', change.rev);
debug('\tActual revision: %s', rev);
conflicts.push(change);
return Change.rectifyModelChanges(Model.modelName, [id], cb);
}
// TODO(bajtos) modify `data` so that it instructs
// the connector to remove any properties included in "inst"
// but not included in `data`
// See https://github.com/strongloop/loopback/issues/1215
Model.updateAll(current.toObject(), data, function(err, result) {
if (err) return cb(err);
var count = result && result.count;
switch (count) {
case 1:
// The happy path, exactly one record was updated
return cb();
case 0:
debug('UpdateAll detected non-rectified change of %s %j',
Model.modelName, id);
conflicts.push(change);
// NOTE(bajtos) updateAll triggers change rectification
// for all model instances, even when no records were updated,
// thus we don't need to rectify explicitly ourselves
return cb();
case undefined:
case null:
return cb(new Error(
'Cannot apply bulk updates, ' +
'the connector does not correctly report ' +
'the number of updated records.'));
default:
debug('%s.updateAll modified unexpected number of instances: %j',
Model.modelName, count);
return cb(new Error(
'Bulk update failed, the connector has modified unexpected ' +
'number of records: ' + JSON.stringify(count)));
}
});
}
async.parallel(tasks, callback);
};
function applyCreate(Model, id, current, data, change, conflicts, cb) {
Model.create(data, function(createErr) {
if (!createErr) return cb();
// We don't have a reliable way how to detect the situation
// where he model was not create because of a duplicate id
// The workaround is to query the DB to check if the model already exists
Model.findById(id, function(findErr, inst) {
if (findErr || !inst) {
// There isn't any instance with the same id, thus there isn't
// any conflict and we just report back the original error.
return cb(createErr);
}
return conflict();
});
});
function conflict() {
// The instance already exists - report a conflict
debug('Detected non-rectified new instance of %s %j',
Model.modelName, id);
conflicts.push(change);
var Change = Model.getChangeModel();
return Change.rectifyModelChanges(Model.modelName, [id], cb);
}
}
function applyDelete(Model, id, current, change, conflicts, cb) {
if (!current) {
// The instance was either already deleted or not created at all,
// we are done.
return cb();
}
var Change = Model.getChangeModel();
var rev = Change.revisionForInst(current);
if (rev !== change.prev) {
debug('Detected non-rectified change of %s %j',
Model.modelName, id);
debug('\tExpected revision: %s', change.rev);
debug('\tActual revision: %s', rev);
conflicts.push(change);
return Change.rectifyModelChanges(Model.modelName, [id], cb);
}
Model.deleteAll(current.toObject(), function(err, result) {
if (err) return cb(err);
var count = result && result.count;
switch (count) {
case 1:
// The happy path, exactly one record was updated
return cb();
case 0:
debug('DeleteAll detected non-rectified change of %s %j',
Model.modelName, id);
conflicts.push(change);
// NOTE(bajtos) deleteAll triggers change rectification
// for all model instances, even when no records were updated,
// thus we don't need to rectify explicitly ourselves
return cb();
case undefined:
case null:
return cb(new Error(
'Cannot apply bulk updates, ' +
'the connector does not correctly report ' +
'the number of deleted records.'));
default:
debug('%s.deleteAll modified unexpected number of instances: %j',
Model.modelName, count);
return cb(new Error(
'Bulk update failed, the connector has deleted unexpected ' +
'number of records: ' + JSON.stringify(count)));
}
});
}
/**
* Get the `Change` model.

View File

@ -1,4 +1,5 @@
var async = require('async');
var expect = require('chai').expect;
var Change;
var TestModel;
@ -134,11 +135,16 @@ describe('Change', function() {
describe('change.rectify(callback)', function() {
var change;
beforeEach(function() {
change = new Change({
modelName: this.modelName,
modelId: this.modelId
});
beforeEach(function(done) {
Change.findOrCreate(
{
modelName: this.modelName,
modelId: this.modelId
},
function(err, ch) {
change = ch;
done(err);
});
});
it('should create a new change with the correct revision', function(done) {
@ -344,10 +350,89 @@ describe('Change', function() {
];
Change.diff(this.modelName, 0, remoteChanges, function(err, diff) {
if (err) return done(err);
assert.equal(diff.deltas.length, 1);
assert.equal(diff.conflicts.length, 1);
done();
});
});
it('should set "prev" to local revision in non-conflicting delta', function(done) {
var updateRecord = {
rev: 'foo-new',
prev: 'foo',
modelName: this.modelName,
modelId: '9',
checkpoint: 2
};
Change.diff(this.modelName, 0, [updateRecord], function(err, diff) {
if (err) return done(err);
expect(diff.conflicts, 'conflicts').to.have.length(0);
expect(diff.deltas, 'deltas').to.have.length(1);
var actual = diff.deltas[0].toObject();
delete actual.id;
expect(actual).to.eql({
checkpoint: 2,
modelId: '9',
modelName: updateRecord.modelName,
prev: 'foo', // this is the current local revision
rev: 'foo-new',
});
done();
});
});
it('should set "prev" to local revision in remote-only delta', function(done) {
var updateRecord = {
rev: 'foo-new',
prev: 'foo-prev',
modelName: this.modelName,
modelId: '9',
checkpoint: 2
};
// IMPORTANT: the diff call excludes the local change
// with rev=foo CP=1
Change.diff(this.modelName, 2, [updateRecord], function(err, diff) {
if (err) return done(err);
expect(diff.conflicts, 'conflicts').to.have.length(0);
expect(diff.deltas, 'deltas').to.have.length(1);
var actual = diff.deltas[0].toObject();
delete actual.id;
expect(actual).to.eql({
checkpoint: 2,
modelId: '9',
modelName: updateRecord.modelName,
prev: 'foo', // this is the current local revision
rev: 'foo-new',
});
done();
});
});
it('should set "prev" to null for a new instance', function(done) {
var updateRecord = {
rev: 'new-rev',
prev: 'new-prev',
modelName: this.modelName,
modelId: 'new-id',
checkpoint: 2
};
Change.diff(this.modelName, 0, [updateRecord], function(err, diff) {
if (err) return done(err);
expect(diff.conflicts).to.have.length(0);
expect(diff.deltas).to.have.length(1);
var actual = diff.deltas[0].toObject();
delete actual.id;
expect(actual).to.eql({
checkpoint: 2,
modelId: 'new-id',
modelName: updateRecord.modelName,
prev: null, // this is the current local revision
rev: 'new-rev',
});
done();
});
});
});
});

View File

@ -88,12 +88,10 @@ describe('Replication / Change APIs', function() {
var targetData;
this.SourceModel.create({name: 'foo'}, function(err) {
setTimeout(replicate, 100);
});
function replicate() {
if (err) return done(err);
test.SourceModel.replicate(test.startingCheckpoint, test.TargetModel,
options, function(err, conflicts) {
if (err) return done(err);
assert(conflicts.length === 0);
async.parallel([
function(cb) {
@ -117,7 +115,7 @@ describe('Replication / Change APIs', function() {
done();
});
});
}
});
});
it('applies "since" filter on source changes', function(done) {
@ -179,18 +177,11 @@ describe('Replication / Change APIs', function() {
});
it('picks up changes made during replication', function(done) {
var bulkUpdate = TargetModel.bulkUpdate;
TargetModel.bulkUpdate = function(data, cb) {
var self = this;
setupRaceConditionInReplication(function(cb) {
// simulate the situation when another model is created
// while a replication run is in progress
SourceModel.create({ id: 'racer' }, function(err) {
if (err) return cb(err);
bulkUpdate.call(self, data, cb);
});
// create the new model only once
TargetModel.bulkUpdate = bulkUpdate;
};
SourceModel.create({ id: 'racer' }, cb);
});
var lastCp;
async.series([
@ -291,6 +282,128 @@ describe('Replication / Change APIs', function() {
}
], done);
});
describe('with 3rd-party changes', function() {
it('detects UPDATE made during UPDATE', function(done) {
async.series([
createModel(SourceModel, { id: '1' }),
replicateExpectingSuccess(),
function updateModel(next) {
SourceModel.updateAll({ id: '1' }, { name: 'source' }, next);
},
function replicateWith3rdPartyModifyingData(next) {
setupRaceConditionInReplication(function(cb) {
TargetModel.dataSource.connector.updateAttributes(
TargetModel.modelName,
'1',
{ name: '3rd-party' },
cb);
});
SourceModel.replicate(
TargetModel,
function(err, conflicts, cps, updates) {
if (err) return next(err);
var conflictedIds = getPropValue(conflicts || [], 'modelId');
expect(conflictedIds).to.eql(['1']);
// resolve the conflict using ours
conflicts[0].resolve(next);
});
},
replicateExpectingSuccess(),
verifyInstanceWasReplicated(SourceModel, TargetModel, '1')
], done);
});
it('detects CREATE made during CREATE', function(done) {
async.series([
// FIXME(bajtos) Remove the 'name' property once the implementation
// of UPDATE is fixed to correctly remove properties
createModel(SourceModel, { id: '1', name: 'source' }),
function replicateWith3rdPartyModifyingData(next) {
setupRaceConditionInReplication(function(cb) {
TargetModel.dataSource.connector.create(
TargetModel.modelName,
{ id: '1', name: '3rd-party' },
cb);
});
SourceModel.replicate(
TargetModel,
function(err, conflicts, cps, updates) {
if (err) return next(err);
var conflictedIds = getPropValue(conflicts || [], 'modelId');
expect(conflictedIds).to.eql(['1']);
// resolve the conflict using ours
conflicts[0].resolve(next);
});
},
replicateExpectingSuccess(),
verifyInstanceWasReplicated(SourceModel, TargetModel, '1')
], done);
});
it('detects UPDATE made during DELETE', function(done) {
async.series([
createModel(SourceModel, { id: '1' }),
replicateExpectingSuccess(),
function deleteModel(next) {
SourceModel.deleteById('1', next);
},
function replicateWith3rdPartyModifyingData(next) {
setupRaceConditionInReplication(function(cb) {
TargetModel.dataSource.connector.updateAttributes(
TargetModel.modelName,
'1',
{ name: '3rd-party' },
cb);
});
SourceModel.replicate(
TargetModel,
function(err, conflicts, cps, updates) {
if (err) return next(err);
var conflictedIds = getPropValue(conflicts || [], 'modelId');
expect(conflictedIds).to.eql(['1']);
// resolve the conflict using ours
conflicts[0].resolve(next);
});
},
replicateExpectingSuccess(),
verifyInstanceWasReplicated(SourceModel, TargetModel, '1')
], done);
});
it('handles DELETE made during DELETE', function(done) {
async.series([
createModel(SourceModel, { id: '1' }),
replicateExpectingSuccess(),
function deleteModel(next) {
SourceModel.deleteById('1', next);
},
function setup3rdPartyModifyingData(next) {
setupRaceConditionInReplication(function(cb) {
TargetModel.dataSource.connector.destroy(
TargetModel.modelName,
'1',
cb);
});
next();
},
replicateExpectingSuccess(),
verifyInstanceWasReplicated(SourceModel, TargetModel, '1')
], done);
});
});
});
describe('conflict detection - both updated', function() {
@ -878,7 +991,7 @@ describe('Replication / Change APIs', function() {
function resolveManually(conflict, cb) {
conflict.models(function(err, source, target) {
if (err) return cb(err);
var m = new conflict.SourceModel(source || target);
var m = source || new conflict.SourceModel(target);
m.name = 'manual';
m.save(function(err) {
if (err) return cb(err);
@ -897,7 +1010,7 @@ describe('Replication / Change APIs', function() {
async.series([
// sync the new model to ClientB
sync(ClientB, Server),
verifyInstanceWasReplicated(ClientA, ClientB),
verifyInstanceWasReplicated(ClientA, ClientB, sourceInstanceId),
// ClientA makes a change
updateSourceInstanceNameTo('a'),
@ -924,7 +1037,7 @@ describe('Replication / Change APIs', function() {
// and sync back to ClientA too
sync(ClientA, Server),
verifyInstanceWasReplicated(ClientB, ClientA)
verifyInstanceWasReplicated(ClientB, ClientA, sourceInstanceId)
], cb);
}
@ -953,6 +1066,7 @@ describe('Replication / Change APIs', function() {
function updateSourceInstanceNameTo(value) {
return function updateInstance(next) {
debug('update source instance name to %j', value);
sourceInstance.name = value;
sourceInstance.save(next);
};
@ -960,6 +1074,7 @@ describe('Replication / Change APIs', function() {
function deleteSourceInstance(value) {
return function deleteInstance(next) {
debug('delete source instance', value);
sourceInstance.remove(function(err) {
sourceInstance = null;
next(err);
@ -978,21 +1093,6 @@ describe('Replication / Change APIs', function() {
});
};
}
function verifyInstanceWasReplicated(source, target) {
return function verify(next) {
source.findById(sourceInstanceId, function(err, expected) {
if (err) return next(err);
target.findById(sourceInstanceId, function(err, actual) {
if (err) return next(err);
expect(actual && actual.toObject())
.to.eql(expected && expected.toObject());
debug('replicated instance: %j', actual);
next();
});
});
};
}
});
var _since = {};
@ -1021,6 +1121,12 @@ describe('Replication / Change APIs', function() {
});
}
function createModel(Model, data) {
return function create(next) {
Model.create(data, next);
};
}
function replicateExpectingSuccess(source, target, since) {
if (!source) source = SourceModel;
if (!target) target = TargetModel;
@ -1037,6 +1143,37 @@ describe('Replication / Change APIs', function() {
};
}
function setupRaceConditionInReplication(fn) {
var bulkUpdate = TargetModel.bulkUpdate;
TargetModel.bulkUpdate = function(data, cb) {
// simulate the situation when a 3rd party modifies the database
// while a replication run is in progress
var self = this;
fn(function(err) {
if (err) return cb(err);
bulkUpdate.call(self, data, cb);
});
// apply the 3rd party modification only once
TargetModel.bulkUpdate = bulkUpdate;
};
}
function verifyInstanceWasReplicated(source, target, id) {
return function verify(next) {
source.findById(id, function(err, expected) {
if (err) return next(err);
target.findById(id, function(err, actual) {
if (err) return next(err);
expect(actual && actual.toObject())
.to.eql(expected && expected.toObject());
debug('replicated instance: %j', actual);
next();
});
});
};
}
function spyAndStoreSinceArg(Model, methodName, store) {
var orig = Model[methodName];
Model[methodName] = function(since) {