From 2f9cf115c3a0b458ecaf463e92711422d84944d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Tue, 3 Mar 2015 11:23:13 +0100 Subject: [PATCH 1/5] Support different "since" for source and target Modify `PersistedModel.replicate` to allow consumers to provide different "since" values to be used for the local and remote changes. Make the "since" filters consistent and include the "since" value in the range via `gte`. Before this commit, the local query used `gt` and the remote query used `gte`. --- lib/persisted-model.js | 10 ++++-- test/replication.test.js | 75 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 80 insertions(+), 5 deletions(-) diff --git a/lib/persisted-model.js b/lib/persisted-model.js index 13ca19c4..ca70b636 100644 --- a/lib/persisted-model.js +++ b/lib/persisted-model.js @@ -757,7 +757,7 @@ PersistedModel.changes = function(since, filter, callback) { // TODO(ritch) this whole thing could be optimized a bit more Change.find({ where: { - checkpoint: {gt: since}, + checkpoint: { gte: since }, modelName: this.modelName }}, function(err, changes) { if (err) return callback(err); @@ -832,6 +832,10 @@ PersistedModel.replicate = function(since, targetModel, options, callback) { since = -1; } + if (typeof since !== 'object') { + since = { source: since, target: since }; + } + options = options || {}; var sourceModel = this; @@ -861,11 +865,11 @@ PersistedModel.replicate = function(since, targetModel, options, callback) { async.waterfall(tasks, done); function getSourceChanges(cb) { - sourceModel.changes(since, options.filter, cb); + sourceModel.changes(since.source, options.filter, cb); } function getDiffFromTarget(sourceChanges, cb) { - targetModel.diff(since, sourceChanges, cb); + targetModel.diff(since.target, sourceChanges, cb); } function createSourceUpdates(_diff, cb) { diff --git a/test/replication.test.js b/test/replication.test.js index 83575f6f..53aec8bf 100644 --- a/test/replication.test.js +++ b/test/replication.test.js @@ -8,19 +8,23 @@ var expect = require('chai').expect; describe('Replication / Change APIs', function() { var dataSource, SourceModel, TargetModel; + var tid = 0; // per-test unique id used e.g. to build unique model names beforeEach(function() { + tid++; var test = this; dataSource = this.dataSource = loopback.createDataSource({ connector: loopback.Memory }); - SourceModel = this.SourceModel = PersistedModel.extend('SourceModel', + SourceModel = this.SourceModel = PersistedModel.extend( + 'SourceModel-' + tid, { id: { id: true, type: String, defaultFn: 'guid' } }, { trackChanges: true }); SourceModel.attachTo(dataSource); - TargetModel = this.TargetModel = PersistedModel.extend('TargetModel', + TargetModel = this.TargetModel = PersistedModel.extend( + 'TargetModel-' + tid, { id: { id: true, type: String, defaultFn: 'guid' } }, { trackChanges: true }); @@ -109,6 +113,65 @@ describe('Replication / Change APIs', function() { }); } }); + + it('applies "since" filter on source changes', function(done) { + async.series([ + function createModelInSourceCp1(next) { + SourceModel.create({ id: '1' }, next); + }, + function checkpoint(next) { + SourceModel.checkpoint(next); + }, + function createModelInSourceCp2(next) { + SourceModel.create({ id: '2' }, next); + }, + function replicateLastChangeOnly(next) { + SourceModel.currentCheckpoint(function(err, cp) { + if (err) return done(err); + SourceModel.replicate(cp, TargetModel, next); + }); + }, + function verify(next) { + TargetModel.find(function(err, list) { + if (err) return done(err); + var ids = list.map(function(it) { return it.id; }); + // '1' should be skipped by replication + expect(ids).to.eql(['2']); + next(); + }); + } + ], done); + }); + + it('applies "since" filter on target changes', function(done) { + // Because the "since" filter is just an optimization, + // there isn't really any observable behaviour we could + // check to assert correct implementation. + var diffSince = []; + spyAndStoreSinceArg(TargetModel, 'diff', diffSince); + + SourceModel.replicate(10, TargetModel, function(err) { + if (err) return done(err); + expect(diffSince).to.eql([10]); + done(); + }); + }); + + it('uses different "since" value for source and target', function(done) { + var sourceSince = []; + var targetSince = []; + + spyAndStoreSinceArg(SourceModel, 'changes', sourceSince); + spyAndStoreSinceArg(TargetModel, 'diff', targetSince); + + var since = { source: 1, target: 2 }; + SourceModel.replicate(since, TargetModel, function(err) { + if (err) return done(err); + expect(sourceSince).to.eql([1]); + expect(targetSince).to.eql([2]); + done(); + }); + }); }); describe('conflict detection - both updated', function() { @@ -493,4 +556,12 @@ describe('Replication / Change APIs', function() { }); } }); + + function spyAndStoreSinceArg(Model, methodName, store) { + var orig = Model[methodName]; + Model[methodName] = function(since) { + store.push(since); + orig.apply(this, arguments); + }; + } }); From 2dc230b7cf4316c5dd28ba62ded54a040b4f7b16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Tue, 3 Mar 2015 11:50:06 +0100 Subject: [PATCH 2/5] Replication: fix checkpoint-related race condition Rework the "replicate()" to create a new source checkpoint as the first step, so that any changes made in parallel will be associated with the next checkpoint. Before this commit, there was a race condition where a change might end up being associated with the already-replicated checkpoint and thus not picked up by the next replication run. --- lib/persisted-model.js | 6 ++-- test/replication.test.js | 65 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 66 insertions(+), 5 deletions(-) diff --git a/lib/persisted-model.js b/lib/persisted-model.js index ca70b636..8dfe82c9 100644 --- a/lib/persisted-model.js +++ b/lib/persisted-model.js @@ -855,11 +855,11 @@ PersistedModel.replicate = function(since, targetModel, options, callback) { }; var tasks = [ + checkpoint, getSourceChanges, getDiffFromTarget, createSourceUpdates, - bulkUpdate, - checkpoint + bulkUpdate ]; async.waterfall(tasks, done); @@ -889,7 +889,7 @@ PersistedModel.replicate = function(since, targetModel, options, callback) { function checkpoint() { var cb = arguments[arguments.length - 1]; - sourceModel.checkpoint(cb); + sourceModel.checkpoint(function(err) { cb(err); }); } function done(err) { diff --git a/test/replication.test.js b/test/replication.test.js index 53aec8bf..085d7020 100644 --- a/test/replication.test.js +++ b/test/replication.test.js @@ -134,9 +134,8 @@ describe('Replication / Change APIs', function() { function verify(next) { TargetModel.find(function(err, list) { if (err) return done(err); - var ids = list.map(function(it) { return it.id; }); // '1' should be skipped by replication - expect(ids).to.eql(['2']); + expect(getIds(list)).to.eql(['2']); next(); }); } @@ -172,6 +171,58 @@ describe('Replication / Change APIs', function() { done(); }); }); + + it('picks up changes made during replication', function(done) { + var bulkUpdate = TargetModel.bulkUpdate; + TargetModel.bulkUpdate = function(data, cb) { + var self = this; + // simulate the situation when another model is created + // while a replication run is in progress + SourceModel.create({ id: 'racer' }, function(err) { + if (err) return cb(err); + bulkUpdate.call(self, data, cb); + }); + }; + + var lastCp; + async.series([ + function buildSomeDataToReplicate(next) { + SourceModel.create({ id: 'init' }, next); + }, + function getLastCp(next) { + SourceModel.currentCheckpoint(function(err, cp) { + if (err) return done(err); + lastCp = cp; + next(); + }); + }, + function replicate(next) { + SourceModel.replicate(TargetModel, next); + }, + function verifyAssumptions(next) { + SourceModel.find(function(err, list) { + expect(getIds(list), 'source ids') + .to.eql(['init', 'racer']); + + TargetModel.find(function(err, list) { + expect(getIds(list), 'target ids after first sync') + .to.eql(['init']); + next(); + }); + }); + }, + function replicateAgain(next) { + TargetModel.bulkUpdate = bulkUpdate; + SourceModel.replicate(lastCp + 1, TargetModel, next); + }, + function verify(next) { + TargetModel.find(function(err, list) { + expect(getIds(list), 'target ids').to.eql(['init', 'racer']); + next(); + }); + } + ], done); + }); }); describe('conflict detection - both updated', function() { @@ -564,4 +615,14 @@ describe('Replication / Change APIs', function() { orig.apply(this, arguments); }; } + + function getPropValue(obj, name) { + return Array.isArray(obj) ? + obj.map(function(it) { return getPropValue(it, name); }) : + obj[name]; + } + + function getIds(list) { + return getPropValue(list, 'id'); + } }); From 53ebddfa9fbd031a99a269a71655acb6a96639eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Tue, 3 Mar 2015 12:00:47 +0100 Subject: [PATCH 3/5] Create a remote checkpoint during replication too Before this change, in the case of a one-way replication, the remote checkpoint was never updated, thus the "diff" step had to check all changes made through the whole history. This commit fixes the problem by creating a new remote checkpoint at the same time when a local checkpoint is created. It is important to create the new checkpoint before the replication is started to prevent a race condition where a remote change can end up being associated with an already replicated checkpoint. --- lib/persisted-model.js | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/lib/persisted-model.js b/lib/persisted-model.js index 8dfe82c9..0062a64b 100644 --- a/lib/persisted-model.js +++ b/lib/persisted-model.js @@ -855,7 +855,7 @@ PersistedModel.replicate = function(since, targetModel, options, callback) { }; var tasks = [ - checkpoint, + checkpoints, getSourceChanges, getDiffFromTarget, createSourceUpdates, @@ -887,9 +887,13 @@ PersistedModel.replicate = function(since, targetModel, options, callback) { targetModel.bulkUpdate(updates, cb); } - function checkpoint() { + function checkpoints() { var cb = arguments[arguments.length - 1]; - sourceModel.checkpoint(function(err) { cb(err); }); + sourceModel.checkpoint(function(err) { + targetModel.checkpoint(function(err) { + cb(err); + }); + }); } function done(err) { From 628e3a30ca83a1e20b48b58da694ac532ae2a1ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Tue, 3 Mar 2015 14:23:46 +0100 Subject: [PATCH 4/5] Return new checkpoints in callback of replicate() Extend `PersistedModel.replicate` to pass the newly created checkpoints as the third callback argument. The typical usage of these values is to pass them as the `since` argument of the next `replicate()` call. global.since = -1; function sync(cb) { LocalModel.replicate( since, RemoteModel, function(err, conflicts, cps) if (err) return cb(err); if (!conflicts.length) { since = cps; return cb(); } // resolve conflicts and try again }); } --- common/models/change.js | 2 ++ lib/persisted-model.js | 12 +++++++--- test/replication.test.js | 49 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+), 3 deletions(-) diff --git a/common/models/change.js b/common/models/change.js index 2e0b4ba6..e2bd6030 100644 --- a/common/models/change.js +++ b/common/models/change.js @@ -428,6 +428,8 @@ module.exports = function(Change) { Change.getCheckpointModel = function() { var checkpointModel = this.Checkpoint; if (checkpointModel) return checkpointModel; + // FIXME(bajtos) This code creates multiple different models with the same + // model name, which is not a valid supported usage of juggler's API. this.Checkpoint = checkpointModel = loopback.Checkpoint.extend('checkpoint'); assert(this.dataSource, 'Cannot getCheckpointModel(): ' + this.modelName + ' is not attached to a dataSource'); diff --git a/lib/persisted-model.js b/lib/persisted-model.js index 0062a64b..d87d72e0 100644 --- a/lib/persisted-model.js +++ b/lib/persisted-model.js @@ -844,6 +844,7 @@ PersistedModel.replicate = function(since, targetModel, options, callback) { var Change = this.getChangeModel(); var TargetChange = targetModel.getChangeModel(); var changeTrackingEnabled = Change && TargetChange; + var newSourceCp, newTargetCp; assert( changeTrackingEnabled, @@ -889,8 +890,10 @@ PersistedModel.replicate = function(since, targetModel, options, callback) { function checkpoints() { var cb = arguments[arguments.length - 1]; - sourceModel.checkpoint(function(err) { - targetModel.checkpoint(function(err) { + sourceModel.checkpoint(function(err, source) { + newSourceCp = source.seq; + targetModel.checkpoint(function(err, target) { + newTargetCp = target.seq; cb(err); }); }); @@ -909,7 +912,10 @@ PersistedModel.replicate = function(since, targetModel, options, callback) { sourceModel.emit('conflicts', conflicts); } - if (callback) callback(null, conflicts); + if (callback) { + var newCheckpoints = { source: newSourceCp, target: newTargetCp }; + callback(null, conflicts, newCheckpoints); + } } }; diff --git a/test/replication.test.js b/test/replication.test.js index 085d7020..2bc3a780 100644 --- a/test/replication.test.js +++ b/test/replication.test.js @@ -28,6 +28,14 @@ describe('Replication / Change APIs', function() { { id: { id: true, type: String, defaultFn: 'guid' } }, { trackChanges: true }); + // NOTE(bajtos) At the moment, all models share the same Checkpoint + // model. This causes the in-process replication to work differently + // than client-server replication. + // As a workaround, we manually setup unique Checkpoint for TargetModel. + var TargetChange = TargetModel.Change; + TargetChange.Checkpoint = loopback.Checkpoint.extend('TargetCheckpoint'); + TargetChange.Checkpoint.attachTo(dataSource); + TargetModel.attachTo(dataSource); test.startingCheckpoint = -1; @@ -223,6 +231,47 @@ describe('Replication / Change APIs', function() { } ], done); }); + + it('returns new current checkpoints to callback', function(done) { + var sourceCp, targetCp; + async.series([ + bumpSourceCheckpoint, + bumpTargetCheckpoint, + bumpTargetCheckpoint, + function replicate(cb) { + expect(sourceCp).to.not.equal(targetCp); + + SourceModel.replicate( + TargetModel, + function(err, conflicts, newCheckpoints) { + if (err) return cb(err); + expect(conflicts, 'conflicts').to.eql([]); + expect(newCheckpoints, 'currentCheckpoints').to.eql({ + source: sourceCp + 1, + target: targetCp + 1 + }); + cb(); + }); + } + ], done); + + function bumpSourceCheckpoint(cb) { + SourceModel.checkpoint(function(err, inst) { + if (err) return cb(err); + sourceCp = inst.seq; + cb(); + }); + } + + function bumpTargetCheckpoint(cb) { + TargetModel.checkpoint(function(err, inst) { + if (err) return cb(err); + targetCp = inst.seq; + cb(); + }); + } + }); + }); describe('conflict detection - both updated', function() { From 3d5c8a744369359fb6e426262f0ad1c933c5fb29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Tue, 3 Mar 2015 14:53:49 +0100 Subject: [PATCH 5/5] Checkpoint: start with seq=1 instead of seq=0 Since the seq behaves in many senses like an id, it should meet the usual expectation people have about ids. Using only truthy values is one of them. --- common/models/checkpoint.js | 2 +- test/checkpoint.test.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/common/models/checkpoint.js b/common/models/checkpoint.js index 2bba736a..e243b15d 100644 --- a/common/models/checkpoint.js +++ b/common/models/checkpoint.js @@ -41,7 +41,7 @@ module.exports = function(Checkpoint) { if (checkpoint) { cb(null, checkpoint.seq); } else { - Checkpoint.create({seq: 0}, function(err, checkpoint) { + Checkpoint.create({ seq: 1 }, function(err, checkpoint) { if (err) return cb(err); cb(null, checkpoint.seq); }); diff --git a/test/checkpoint.test.js b/test/checkpoint.test.js index 0b7aa3f2..223dd03c 100644 --- a/test/checkpoint.test.js +++ b/test/checkpoint.test.js @@ -14,7 +14,7 @@ describe('Checkpoint', function() { function(next) { Checkpoint.current(function(err, seq) { if (err) next(err); - expect(seq).to.equal(2); + expect(seq).to.equal(3); next(); }); }