diff --git a/lib/persisted-model.js b/lib/persisted-model.js index ca70b636..8dfe82c9 100644 --- a/lib/persisted-model.js +++ b/lib/persisted-model.js @@ -855,11 +855,11 @@ PersistedModel.replicate = function(since, targetModel, options, callback) { }; var tasks = [ + checkpoint, getSourceChanges, getDiffFromTarget, createSourceUpdates, - bulkUpdate, - checkpoint + bulkUpdate ]; async.waterfall(tasks, done); @@ -889,7 +889,7 @@ PersistedModel.replicate = function(since, targetModel, options, callback) { function checkpoint() { var cb = arguments[arguments.length - 1]; - sourceModel.checkpoint(cb); + sourceModel.checkpoint(function(err) { cb(err); }); } function done(err) { diff --git a/test/replication.test.js b/test/replication.test.js index 53aec8bf..085d7020 100644 --- a/test/replication.test.js +++ b/test/replication.test.js @@ -134,9 +134,8 @@ describe('Replication / Change APIs', function() { function verify(next) { TargetModel.find(function(err, list) { if (err) return done(err); - var ids = list.map(function(it) { return it.id; }); // '1' should be skipped by replication - expect(ids).to.eql(['2']); + expect(getIds(list)).to.eql(['2']); next(); }); } @@ -172,6 +171,58 @@ describe('Replication / Change APIs', function() { done(); }); }); + + it('picks up changes made during replication', function(done) { + var bulkUpdate = TargetModel.bulkUpdate; + TargetModel.bulkUpdate = function(data, cb) { + var self = this; + // simulate the situation when another model is created + // while a replication run is in progress + SourceModel.create({ id: 'racer' }, function(err) { + if (err) return cb(err); + bulkUpdate.call(self, data, cb); + }); + }; + + var lastCp; + async.series([ + function buildSomeDataToReplicate(next) { + SourceModel.create({ id: 'init' }, next); + }, + function getLastCp(next) { + SourceModel.currentCheckpoint(function(err, cp) { + if (err) return done(err); + lastCp = cp; + next(); + }); + }, + function replicate(next) { + SourceModel.replicate(TargetModel, next); + }, + function verifyAssumptions(next) { + SourceModel.find(function(err, list) { + expect(getIds(list), 'source ids') + .to.eql(['init', 'racer']); + + TargetModel.find(function(err, list) { + expect(getIds(list), 'target ids after first sync') + .to.eql(['init']); + next(); + }); + }); + }, + function replicateAgain(next) { + TargetModel.bulkUpdate = bulkUpdate; + SourceModel.replicate(lastCp + 1, TargetModel, next); + }, + function verify(next) { + TargetModel.find(function(err, list) { + expect(getIds(list), 'target ids').to.eql(['init', 'racer']); + next(); + }); + } + ], done); + }); }); describe('conflict detection - both updated', function() { @@ -564,4 +615,14 @@ describe('Replication / Change APIs', function() { orig.apply(this, arguments); }; } + + function getPropValue(obj, name) { + return Array.isArray(obj) ? + obj.map(function(it) { return getPropValue(it, name); }) : + obj[name]; + } + + function getIds(list) { + return getPropValue(list, 'id'); + } });