Prevent more kinds of false replication conflicts
Rework the Change model to merge changes made within the same Checkpoint. Rework `replicate()` to run multiple iteration until there were no changes replicated. This ensures that the target model is left in a clean state with no pending changes associated with the latest (current) checkpoint.
This commit is contained in:
parent
b381c5df7e
commit
76d9244448
|
@ -141,10 +141,6 @@ module.exports = function(Change) {
|
|||
|
||||
Change.prototype.rectify = function(cb) {
|
||||
var change = this;
|
||||
var tasks = [
|
||||
updateRevision,
|
||||
updateCheckpoint
|
||||
];
|
||||
var currentRev = this.rev;
|
||||
|
||||
change.debug('rectify change');
|
||||
|
@ -153,8 +149,53 @@ module.exports = function(Change) {
|
|||
if (err) throw new Error(err);
|
||||
};
|
||||
|
||||
async.parallel(tasks, function(err) {
|
||||
async.parallel([
|
||||
function getCurrentCheckpoint(next) {
|
||||
change.constructor.getCheckpointModel().current(next);
|
||||
},
|
||||
function getCurrentRevision(next) {
|
||||
change.currentRevision(next);
|
||||
}
|
||||
], doRectify);
|
||||
|
||||
function doRectify(err, results) {
|
||||
if (err) return cb(err);
|
||||
var checkpoint = results[0];
|
||||
var rev = results[1];
|
||||
|
||||
if (rev) {
|
||||
// avoid setting rev and prev to the same value
|
||||
if (currentRev === rev) {
|
||||
change.debug('rev and prev are equal (not updating rev)');
|
||||
} else {
|
||||
change.rev = rev;
|
||||
change.debug('updated revision (was ' + currentRev + ')');
|
||||
if (change.checkpoint !== checkpoint) {
|
||||
// previous revision is updated only across checkpoints
|
||||
change.prev = currentRev;
|
||||
change.debug('updated prev');
|
||||
}
|
||||
}
|
||||
} else {
|
||||
change.rev = null;
|
||||
change.debug('updated revision (was ' + currentRev + ')');
|
||||
if (change.checkpoint !== checkpoint) {
|
||||
// previous revision is updated only across checkpoints
|
||||
if (currentRev) {
|
||||
change.prev = currentRev;
|
||||
} else if (!change.prev) {
|
||||
change.debug('ERROR - could not determing prev');
|
||||
change.prev = Change.UNKNOWN;
|
||||
}
|
||||
change.debug('updated prev');
|
||||
}
|
||||
}
|
||||
|
||||
if (change.checkpoint != checkpoint) {
|
||||
debug('update checkpoint to', checkpoint);
|
||||
change.checkpoint = checkpoint;
|
||||
}
|
||||
|
||||
if (change.prev === Change.UNKNOWN) {
|
||||
// this occurs when a record of a change doesn't exist
|
||||
// and its current revision is null (not found)
|
||||
|
@ -162,41 +203,6 @@ module.exports = function(Change) {
|
|||
} else {
|
||||
change.save(cb);
|
||||
}
|
||||
});
|
||||
|
||||
function updateRevision(cb) {
|
||||
// get the current revision
|
||||
change.currentRevision(function(err, rev) {
|
||||
if (err) return Change.handleError(err, cb);
|
||||
if (rev) {
|
||||
// avoid setting rev and prev to the same value
|
||||
if (currentRev !== rev) {
|
||||
change.rev = rev;
|
||||
change.prev = currentRev;
|
||||
} else {
|
||||
change.debug('rev and prev are equal (not updating rev)');
|
||||
}
|
||||
} else {
|
||||
change.rev = null;
|
||||
if (currentRev) {
|
||||
change.prev = currentRev;
|
||||
} else if (!change.prev) {
|
||||
change.debug('ERROR - could not determing prev');
|
||||
change.prev = Change.UNKNOWN;
|
||||
}
|
||||
}
|
||||
change.debug('updated revision (was ' + currentRev + ')');
|
||||
cb();
|
||||
});
|
||||
}
|
||||
|
||||
function updateCheckpoint(cb) {
|
||||
change.constructor.getCheckpointModel().current(function(err, checkpoint) {
|
||||
if (err) return Change.handleError(err);
|
||||
debug('updated checkpoint to', checkpoint);
|
||||
change.checkpoint = checkpoint;
|
||||
cb();
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -840,18 +840,6 @@ PersistedModel.replicate = function(since, targetModel, options, callback) {
|
|||
options = options || {};
|
||||
|
||||
var sourceModel = this;
|
||||
var diff;
|
||||
var updates;
|
||||
var Change = this.getChangeModel();
|
||||
var TargetChange = targetModel.getChangeModel();
|
||||
var changeTrackingEnabled = Change && TargetChange;
|
||||
var newSourceCp, newTargetCp;
|
||||
|
||||
assert(
|
||||
changeTrackingEnabled,
|
||||
'You must enable change tracking before replicating'
|
||||
);
|
||||
|
||||
callback = callback || function defaultReplicationCallback(err) {
|
||||
if (err) throw err;
|
||||
};
|
||||
|
@ -865,6 +853,52 @@ PersistedModel.replicate = function(since, targetModel, options, callback) {
|
|||
debug('\twith filter %j', options.filter);
|
||||
}
|
||||
|
||||
// In order to avoid a race condition between the replication and
|
||||
// other clients modifying the data, we must create the new target
|
||||
// checkpoint as the first step of the replication process.
|
||||
// As a side-effect of that, the replicated changes are associated
|
||||
// with the new target checkpoint. This is actually desired behaviour,
|
||||
// because that way clients replicating *from* the target model
|
||||
// since the new checkpoint will pick these changes up.
|
||||
// However, it increases the likelihood of (false) conflicts being detected.
|
||||
// In order to prevent that, we run the replication multiple times,
|
||||
// until no changes were replicated, but at most MAX_ATTEMPTS times
|
||||
// to prevent starvation. In most cases, the second run will find no changes
|
||||
// to replicate and we are done.
|
||||
var MAX_ATTEMPTS = 3;
|
||||
|
||||
run(1, since);
|
||||
|
||||
function run(attempt, since) {
|
||||
debug('\titeration #%s', attempt);
|
||||
tryReplicate(sourceModel, targetModel, since, options, next);
|
||||
|
||||
function next(err, conflicts, cps, updates) {
|
||||
var finished = err || conflicts.length ||
|
||||
!updates || updates.length === 0 ||
|
||||
attempt >= MAX_ATTEMPTS;
|
||||
|
||||
if (finished)
|
||||
return callback(err, conflicts, cps);
|
||||
run(attempt + 1, cps);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
function tryReplicate(sourceModel, targetModel, since, options, callback) {
|
||||
var Change = sourceModel.getChangeModel();
|
||||
var TargetChange = targetModel.getChangeModel();
|
||||
var changeTrackingEnabled = Change && TargetChange;
|
||||
|
||||
assert(
|
||||
changeTrackingEnabled,
|
||||
'You must enable change tracking before replicating'
|
||||
);
|
||||
|
||||
var diff;
|
||||
var updates;
|
||||
var newSourceCp, newTargetCp;
|
||||
|
||||
var tasks = [
|
||||
checkpoints,
|
||||
getSourceChanges,
|
||||
|
@ -956,10 +990,10 @@ PersistedModel.replicate = function(since, targetModel, options, callback) {
|
|||
|
||||
if (callback) {
|
||||
var newCheckpoints = { source: newSourceCp, target: newTargetCp };
|
||||
callback(null, conflicts, newCheckpoints);
|
||||
callback(null, conflicts, newCheckpoints, updates);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an update list (for `Model.bulkUpdate()`) from a delta list
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
var async = require('async');
|
||||
|
||||
var Change;
|
||||
var TestModel;
|
||||
|
||||
|
@ -131,18 +133,67 @@ describe('Change', function() {
|
|||
});
|
||||
|
||||
describe('change.rectify(callback)', function() {
|
||||
it('should create a new change with the correct revision', function(done) {
|
||||
var test = this;
|
||||
var change = new Change({
|
||||
var change;
|
||||
beforeEach(function() {
|
||||
change = new Change({
|
||||
modelName: this.modelName,
|
||||
modelId: this.modelId
|
||||
});
|
||||
});
|
||||
|
||||
it('should create a new change with the correct revision', function(done) {
|
||||
var test = this;
|
||||
change.rectify(function(err, ch) {
|
||||
assert.equal(ch.rev, test.revisionForModel);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
// This test is a low-level equivalent of the test in replication.test.js
|
||||
// called "replicates multiple updates within the same CP"
|
||||
it('should merge updates within the same checkpoint', function(done) {
|
||||
var test = this;
|
||||
var originalRev = this.revisionForModel;
|
||||
var cp;
|
||||
|
||||
async.series([
|
||||
rectify,
|
||||
checkpoint,
|
||||
update,
|
||||
rectify,
|
||||
update,
|
||||
rectify,
|
||||
function(next) {
|
||||
expect(change.checkpoint, 'checkpoint').to.equal(cp);
|
||||
expect(change.type(), 'type').to.equal('update');
|
||||
expect(change.prev, 'prev').to.equal(originalRev);
|
||||
expect(change.rev, 'rev').to.equal(test.revisionForModel);
|
||||
next();
|
||||
}
|
||||
], done);
|
||||
|
||||
function rectify(next) {
|
||||
change.rectify(next);
|
||||
}
|
||||
|
||||
function checkpoint(next) {
|
||||
TestModel.checkpoint(function(err, inst) {
|
||||
if (err) return next(err);
|
||||
cp = inst.seq;
|
||||
next();
|
||||
});
|
||||
}
|
||||
|
||||
function update(next) {
|
||||
var model = test.model;
|
||||
|
||||
model.name += 'updated';
|
||||
model.save(function(err) {
|
||||
test.revisionForModel = Change.revisionForInst(model);
|
||||
next(err);
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe('change.currentRevision(callback)', function() {
|
||||
|
|
|
@ -5,6 +5,7 @@ var Change = loopback.Change;
|
|||
var defineModelTestsWithDataSource = require('./util/model-tests');
|
||||
var PersistedModel = loopback.PersistedModel;
|
||||
var expect = require('chai').expect;
|
||||
var debug = require('debug')('test');
|
||||
|
||||
describe('Replication / Change APIs', function() {
|
||||
var dataSource, SourceModel, TargetModel;
|
||||
|
@ -44,12 +45,7 @@ describe('Replication / Change APIs', function() {
|
|||
SourceModel.create({name: 'foo'}, function(err, inst) {
|
||||
if (err) return cb(err);
|
||||
test.model = inst;
|
||||
|
||||
// give loopback a chance to register the change
|
||||
// TODO(ritch) get rid of this...
|
||||
setTimeout(function() {
|
||||
SourceModel.replicate(TargetModel, cb);
|
||||
}, 100);
|
||||
SourceModel.replicate(TargetModel, cb);
|
||||
});
|
||||
};
|
||||
});
|
||||
|
@ -190,6 +186,8 @@ describe('Replication / Change APIs', function() {
|
|||
if (err) return cb(err);
|
||||
bulkUpdate.call(self, data, cb);
|
||||
});
|
||||
// create the new model only once
|
||||
TargetModel.bulkUpdate = bulkUpdate;
|
||||
};
|
||||
|
||||
var lastCp;
|
||||
|
@ -214,13 +212,12 @@ describe('Replication / Change APIs', function() {
|
|||
|
||||
TargetModel.find(function(err, list) {
|
||||
expect(getIds(list), 'target ids after first sync')
|
||||
.to.eql(['init']);
|
||||
.to.include.members(['init']);
|
||||
next();
|
||||
});
|
||||
});
|
||||
},
|
||||
function replicateAgain(next) {
|
||||
TargetModel.bulkUpdate = bulkUpdate;
|
||||
SourceModel.replicate(lastCp + 1, TargetModel, next);
|
||||
},
|
||||
function verify(next) {
|
||||
|
@ -272,6 +269,26 @@ describe('Replication / Change APIs', function() {
|
|||
}
|
||||
});
|
||||
|
||||
it('leaves current target checkpoint empty', function(done) {
|
||||
async.series([
|
||||
function createTestData(next) {
|
||||
SourceModel.create({}, next);
|
||||
},
|
||||
replicateExpectingSuccess(),
|
||||
function verify(next) {
|
||||
TargetModel.currentCheckpoint(function(err, cp) {
|
||||
if (err) return next(err);
|
||||
TargetModel.getChangeModel().find(
|
||||
{ where: { checkpoint: { gte: cp } } },
|
||||
function(err, changes) {
|
||||
if (err) return done(err);
|
||||
expect(changes).to.be.empty();
|
||||
done();
|
||||
});
|
||||
});
|
||||
}
|
||||
], done);
|
||||
});
|
||||
});
|
||||
|
||||
describe('conflict detection - both updated', function() {
|
||||
|
@ -657,6 +674,90 @@ describe('Replication / Change APIs', function() {
|
|||
}
|
||||
});
|
||||
|
||||
describe('complex setup', function() {
|
||||
var sourceInstance;
|
||||
|
||||
beforeEach(function createReplicatedInstance(done) {
|
||||
async.series([
|
||||
function createInstance(next) {
|
||||
SourceModel.create({ id: 'test-instance' }, function(err, result) {
|
||||
sourceInstance = result;
|
||||
next(err);
|
||||
});
|
||||
},
|
||||
replicateExpectingSuccess(),
|
||||
verifyModelsAreEqual()
|
||||
], done);
|
||||
});
|
||||
|
||||
it('correctly replicates without checkpoint filter', function(done) {
|
||||
async.series([
|
||||
updateSourceInstanceNameTo('updated'),
|
||||
replicateExpectingSuccess(),
|
||||
verifyModelsAreEqual(),
|
||||
|
||||
function deleteInstance(next) {
|
||||
sourceInstance.remove(next);
|
||||
},
|
||||
replicateExpectingSuccess(),
|
||||
function verifyTargetModelWasDeleted(next) {
|
||||
TargetModel.find(function(err, list) {
|
||||
if (err) return next(err);
|
||||
expect(getIds(list)).to.not.contain(sourceInstance.id);
|
||||
next();
|
||||
});
|
||||
}
|
||||
], done);
|
||||
});
|
||||
|
||||
it('replicates multiple updates within the same CP', function(done) {
|
||||
async.series([
|
||||
replicateExpectingSuccess(),
|
||||
verifyModelsAreEqual(),
|
||||
|
||||
updateSourceInstanceNameTo('updated'),
|
||||
updateSourceInstanceNameTo('again'),
|
||||
replicateExpectingSuccess(),
|
||||
verifyModelsAreEqual()
|
||||
], done);
|
||||
});
|
||||
|
||||
function updateSourceInstanceNameTo(value) {
|
||||
return function updateInstance(next) {
|
||||
sourceInstance.name = value;
|
||||
sourceInstance.save(next);
|
||||
};
|
||||
}
|
||||
|
||||
function verifyModelsAreEqual() {
|
||||
return function verify(next) {
|
||||
TargetModel.findById(sourceInstance.id, function(err, target) {
|
||||
if (err) return next(err);
|
||||
expect(target && target.toObject()).to.eql(sourceInstance.toObject());
|
||||
next();
|
||||
});
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
function replicateExpectingSuccess(source, target, since) {
|
||||
if (!source) source = SourceModel;
|
||||
if (!target) target = TargetModel;
|
||||
if (!since) since = -1;
|
||||
return function replicate(next) {
|
||||
debug('replicateExpectingSuccess from %s to %s since %s',
|
||||
source.modelName, target.modelName, since);
|
||||
source.replicate(since, target, function(err, conflicts) {
|
||||
if (err) return next(err);
|
||||
if (conflicts.length) {
|
||||
return next(new Error('Unexpected conflicts\n' +
|
||||
conflicts.map(JSON.stringify).join('\n')));
|
||||
}
|
||||
next();
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
function spyAndStoreSinceArg(Model, methodName, store) {
|
||||
var orig = Model[methodName];
|
||||
Model[methodName] = function(since) {
|
||||
|
|
Loading…
Reference in New Issue