diff --git a/common/models/change.js b/common/models/change.js index 2e0b4ba6..e2bd6030 100644 --- a/common/models/change.js +++ b/common/models/change.js @@ -428,6 +428,8 @@ module.exports = function(Change) { Change.getCheckpointModel = function() { var checkpointModel = this.Checkpoint; if (checkpointModel) return checkpointModel; + // FIXME(bajtos) This code creates multiple different models with the same + // model name, which is not a valid supported usage of juggler's API. this.Checkpoint = checkpointModel = loopback.Checkpoint.extend('checkpoint'); assert(this.dataSource, 'Cannot getCheckpointModel(): ' + this.modelName + ' is not attached to a dataSource'); diff --git a/common/models/checkpoint.js b/common/models/checkpoint.js index baec3bd8..304f8314 100644 --- a/common/models/checkpoint.js +++ b/common/models/checkpoint.js @@ -41,7 +41,7 @@ module.exports = function(Checkpoint) { if (checkpoint) { cb(null, checkpoint.seq); } else { - Checkpoint.create({seq: 0}, function(err, checkpoint) { + Checkpoint.create({ seq: 1 }, function(err, checkpoint) { if (err) return cb(err); cb(null, checkpoint.seq); }); diff --git a/lib/persisted-model.js b/lib/persisted-model.js index 13ca19c4..d87d72e0 100644 --- a/lib/persisted-model.js +++ b/lib/persisted-model.js @@ -757,7 +757,7 @@ PersistedModel.changes = function(since, filter, callback) { // TODO(ritch) this whole thing could be optimized a bit more Change.find({ where: { - checkpoint: {gt: since}, + checkpoint: { gte: since }, modelName: this.modelName }}, function(err, changes) { if (err) return callback(err); @@ -832,6 +832,10 @@ PersistedModel.replicate = function(since, targetModel, options, callback) { since = -1; } + if (typeof since !== 'object') { + since = { source: since, target: since }; + } + options = options || {}; var sourceModel = this; @@ -840,6 +844,7 @@ PersistedModel.replicate = function(since, targetModel, options, callback) { var Change = this.getChangeModel(); var TargetChange = targetModel.getChangeModel(); var changeTrackingEnabled = Change && TargetChange; + var newSourceCp, newTargetCp; assert( changeTrackingEnabled, @@ -851,21 +856,21 @@ PersistedModel.replicate = function(since, targetModel, options, callback) { }; var tasks = [ + checkpoints, getSourceChanges, getDiffFromTarget, createSourceUpdates, - bulkUpdate, - checkpoint + bulkUpdate ]; async.waterfall(tasks, done); function getSourceChanges(cb) { - sourceModel.changes(since, options.filter, cb); + sourceModel.changes(since.source, options.filter, cb); } function getDiffFromTarget(sourceChanges, cb) { - targetModel.diff(since, sourceChanges, cb); + targetModel.diff(since.target, sourceChanges, cb); } function createSourceUpdates(_diff, cb) { @@ -883,9 +888,15 @@ PersistedModel.replicate = function(since, targetModel, options, callback) { targetModel.bulkUpdate(updates, cb); } - function checkpoint() { + function checkpoints() { var cb = arguments[arguments.length - 1]; - sourceModel.checkpoint(cb); + sourceModel.checkpoint(function(err, source) { + newSourceCp = source.seq; + targetModel.checkpoint(function(err, target) { + newTargetCp = target.seq; + cb(err); + }); + }); } function done(err) { @@ -901,7 +912,10 @@ PersistedModel.replicate = function(since, targetModel, options, callback) { sourceModel.emit('conflicts', conflicts); } - if (callback) callback(null, conflicts); + if (callback) { + var newCheckpoints = { source: newSourceCp, target: newTargetCp }; + callback(null, conflicts, newCheckpoints); + } } }; diff --git a/test/checkpoint.test.js b/test/checkpoint.test.js index 0b7aa3f2..223dd03c 100644 --- a/test/checkpoint.test.js +++ b/test/checkpoint.test.js @@ -14,7 +14,7 @@ describe('Checkpoint', function() { function(next) { Checkpoint.current(function(err, seq) { if (err) next(err); - expect(seq).to.equal(2); + expect(seq).to.equal(3); next(); }); } diff --git a/test/replication.test.js b/test/replication.test.js index 83575f6f..2bc3a780 100644 --- a/test/replication.test.js +++ b/test/replication.test.js @@ -8,22 +8,34 @@ var expect = require('chai').expect; describe('Replication / Change APIs', function() { var dataSource, SourceModel, TargetModel; + var tid = 0; // per-test unique id used e.g. to build unique model names beforeEach(function() { + tid++; var test = this; dataSource = this.dataSource = loopback.createDataSource({ connector: loopback.Memory }); - SourceModel = this.SourceModel = PersistedModel.extend('SourceModel', + SourceModel = this.SourceModel = PersistedModel.extend( + 'SourceModel-' + tid, { id: { id: true, type: String, defaultFn: 'guid' } }, { trackChanges: true }); SourceModel.attachTo(dataSource); - TargetModel = this.TargetModel = PersistedModel.extend('TargetModel', + TargetModel = this.TargetModel = PersistedModel.extend( + 'TargetModel-' + tid, { id: { id: true, type: String, defaultFn: 'guid' } }, { trackChanges: true }); + // NOTE(bajtos) At the moment, all models share the same Checkpoint + // model. This causes the in-process replication to work differently + // than client-server replication. + // As a workaround, we manually setup unique Checkpoint for TargetModel. + var TargetChange = TargetModel.Change; + TargetChange.Checkpoint = loopback.Checkpoint.extend('TargetCheckpoint'); + TargetChange.Checkpoint.attachTo(dataSource); + TargetModel.attachTo(dataSource); test.startingCheckpoint = -1; @@ -109,6 +121,157 @@ describe('Replication / Change APIs', function() { }); } }); + + it('applies "since" filter on source changes', function(done) { + async.series([ + function createModelInSourceCp1(next) { + SourceModel.create({ id: '1' }, next); + }, + function checkpoint(next) { + SourceModel.checkpoint(next); + }, + function createModelInSourceCp2(next) { + SourceModel.create({ id: '2' }, next); + }, + function replicateLastChangeOnly(next) { + SourceModel.currentCheckpoint(function(err, cp) { + if (err) return done(err); + SourceModel.replicate(cp, TargetModel, next); + }); + }, + function verify(next) { + TargetModel.find(function(err, list) { + if (err) return done(err); + // '1' should be skipped by replication + expect(getIds(list)).to.eql(['2']); + next(); + }); + } + ], done); + }); + + it('applies "since" filter on target changes', function(done) { + // Because the "since" filter is just an optimization, + // there isn't really any observable behaviour we could + // check to assert correct implementation. + var diffSince = []; + spyAndStoreSinceArg(TargetModel, 'diff', diffSince); + + SourceModel.replicate(10, TargetModel, function(err) { + if (err) return done(err); + expect(diffSince).to.eql([10]); + done(); + }); + }); + + it('uses different "since" value for source and target', function(done) { + var sourceSince = []; + var targetSince = []; + + spyAndStoreSinceArg(SourceModel, 'changes', sourceSince); + spyAndStoreSinceArg(TargetModel, 'diff', targetSince); + + var since = { source: 1, target: 2 }; + SourceModel.replicate(since, TargetModel, function(err) { + if (err) return done(err); + expect(sourceSince).to.eql([1]); + expect(targetSince).to.eql([2]); + done(); + }); + }); + + it('picks up changes made during replication', function(done) { + var bulkUpdate = TargetModel.bulkUpdate; + TargetModel.bulkUpdate = function(data, cb) { + var self = this; + // simulate the situation when another model is created + // while a replication run is in progress + SourceModel.create({ id: 'racer' }, function(err) { + if (err) return cb(err); + bulkUpdate.call(self, data, cb); + }); + }; + + var lastCp; + async.series([ + function buildSomeDataToReplicate(next) { + SourceModel.create({ id: 'init' }, next); + }, + function getLastCp(next) { + SourceModel.currentCheckpoint(function(err, cp) { + if (err) return done(err); + lastCp = cp; + next(); + }); + }, + function replicate(next) { + SourceModel.replicate(TargetModel, next); + }, + function verifyAssumptions(next) { + SourceModel.find(function(err, list) { + expect(getIds(list), 'source ids') + .to.eql(['init', 'racer']); + + TargetModel.find(function(err, list) { + expect(getIds(list), 'target ids after first sync') + .to.eql(['init']); + next(); + }); + }); + }, + function replicateAgain(next) { + TargetModel.bulkUpdate = bulkUpdate; + SourceModel.replicate(lastCp + 1, TargetModel, next); + }, + function verify(next) { + TargetModel.find(function(err, list) { + expect(getIds(list), 'target ids').to.eql(['init', 'racer']); + next(); + }); + } + ], done); + }); + + it('returns new current checkpoints to callback', function(done) { + var sourceCp, targetCp; + async.series([ + bumpSourceCheckpoint, + bumpTargetCheckpoint, + bumpTargetCheckpoint, + function replicate(cb) { + expect(sourceCp).to.not.equal(targetCp); + + SourceModel.replicate( + TargetModel, + function(err, conflicts, newCheckpoints) { + if (err) return cb(err); + expect(conflicts, 'conflicts').to.eql([]); + expect(newCheckpoints, 'currentCheckpoints').to.eql({ + source: sourceCp + 1, + target: targetCp + 1 + }); + cb(); + }); + } + ], done); + + function bumpSourceCheckpoint(cb) { + SourceModel.checkpoint(function(err, inst) { + if (err) return cb(err); + sourceCp = inst.seq; + cb(); + }); + } + + function bumpTargetCheckpoint(cb) { + TargetModel.checkpoint(function(err, inst) { + if (err) return cb(err); + targetCp = inst.seq; + cb(); + }); + } + }); + }); describe('conflict detection - both updated', function() { @@ -493,4 +656,22 @@ describe('Replication / Change APIs', function() { }); } }); + + function spyAndStoreSinceArg(Model, methodName, store) { + var orig = Model[methodName]; + Model[methodName] = function(since) { + store.push(since); + orig.apply(this, arguments); + }; + } + + function getPropValue(obj, name) { + return Array.isArray(obj) ? + obj.map(function(it) { return getPropValue(it, name); }) : + obj[name]; + } + + function getIds(list) { + return getPropValue(list, 'id'); + } });