Replication: fix checkpoint-related race condition

Rework the "replicate()" to create a new source checkpoint as the first
step, so that any changes made in parallel will be associated with
the next checkpoint.

Before this commit, there was a race condition where a change might
end up being associated with the already-replicated checkpoint and thus
not picked up by the next replication run.
This commit is contained in:
Miroslav Bajtoš 2015-03-03 11:50:06 +01:00
parent 2f9cf115c3
commit 2dc230b7cf
2 changed files with 66 additions and 5 deletions

View File

@ -855,11 +855,11 @@ PersistedModel.replicate = function(since, targetModel, options, callback) {
}; };
var tasks = [ var tasks = [
checkpoint,
getSourceChanges, getSourceChanges,
getDiffFromTarget, getDiffFromTarget,
createSourceUpdates, createSourceUpdates,
bulkUpdate, bulkUpdate
checkpoint
]; ];
async.waterfall(tasks, done); async.waterfall(tasks, done);
@ -889,7 +889,7 @@ PersistedModel.replicate = function(since, targetModel, options, callback) {
function checkpoint() { function checkpoint() {
var cb = arguments[arguments.length - 1]; var cb = arguments[arguments.length - 1];
sourceModel.checkpoint(cb); sourceModel.checkpoint(function(err) { cb(err); });
} }
function done(err) { function done(err) {

View File

@ -134,9 +134,8 @@ describe('Replication / Change APIs', function() {
function verify(next) { function verify(next) {
TargetModel.find(function(err, list) { TargetModel.find(function(err, list) {
if (err) return done(err); if (err) return done(err);
var ids = list.map(function(it) { return it.id; });
// '1' should be skipped by replication // '1' should be skipped by replication
expect(ids).to.eql(['2']); expect(getIds(list)).to.eql(['2']);
next(); next();
}); });
} }
@ -172,6 +171,58 @@ describe('Replication / Change APIs', function() {
done(); done();
}); });
}); });
it('picks up changes made during replication', function(done) {
var bulkUpdate = TargetModel.bulkUpdate;
TargetModel.bulkUpdate = function(data, cb) {
var self = this;
// simulate the situation when another model is created
// while a replication run is in progress
SourceModel.create({ id: 'racer' }, function(err) {
if (err) return cb(err);
bulkUpdate.call(self, data, cb);
});
};
var lastCp;
async.series([
function buildSomeDataToReplicate(next) {
SourceModel.create({ id: 'init' }, next);
},
function getLastCp(next) {
SourceModel.currentCheckpoint(function(err, cp) {
if (err) return done(err);
lastCp = cp;
next();
});
},
function replicate(next) {
SourceModel.replicate(TargetModel, next);
},
function verifyAssumptions(next) {
SourceModel.find(function(err, list) {
expect(getIds(list), 'source ids')
.to.eql(['init', 'racer']);
TargetModel.find(function(err, list) {
expect(getIds(list), 'target ids after first sync')
.to.eql(['init']);
next();
});
});
},
function replicateAgain(next) {
TargetModel.bulkUpdate = bulkUpdate;
SourceModel.replicate(lastCp + 1, TargetModel, next);
},
function verify(next) {
TargetModel.find(function(err, list) {
expect(getIds(list), 'target ids').to.eql(['init', 'racer']);
next();
});
}
], done);
});
}); });
describe('conflict detection - both updated', function() { describe('conflict detection - both updated', function() {
@ -564,4 +615,14 @@ describe('Replication / Change APIs', function() {
orig.apply(this, arguments); orig.apply(this, arguments);
}; };
} }
function getPropValue(obj, name) {
return Array.isArray(obj) ?
obj.map(function(it) { return getPropValue(it, name); }) :
obj[name];
}
function getIds(list) {
return getPropValue(list, 'id');
}
}); });