Return new checkpoints in callback of replicate()

Extend `PersistedModel.replicate` to pass the newly created checkpoints
as the third callback argument.

The typical usage of these values is to pass them as the `since`
argument of the next `replicate()` call.

    global.since = -1;

    function sync(cb) {
      LocalModel.replicate(
        since,
        RemoteModel,
        function(err, conflicts, cps)
          if (err) return cb(err);
          if (!conflicts.length) {
            since = cps;
            return cb();
          }
          // resolve conflicts and try again
        });
    }
This commit is contained in:
Miroslav Bajtoš 2015-03-03 14:23:46 +01:00
parent 53ebddfa9f
commit 628e3a30ca
3 changed files with 60 additions and 3 deletions

View File

@ -428,6 +428,8 @@ module.exports = function(Change) {
Change.getCheckpointModel = function() {
var checkpointModel = this.Checkpoint;
if (checkpointModel) return checkpointModel;
// FIXME(bajtos) This code creates multiple different models with the same
// model name, which is not a valid supported usage of juggler's API.
this.Checkpoint = checkpointModel = loopback.Checkpoint.extend('checkpoint');
assert(this.dataSource, 'Cannot getCheckpointModel(): ' + this.modelName +
' is not attached to a dataSource');

View File

@ -844,6 +844,7 @@ PersistedModel.replicate = function(since, targetModel, options, callback) {
var Change = this.getChangeModel();
var TargetChange = targetModel.getChangeModel();
var changeTrackingEnabled = Change && TargetChange;
var newSourceCp, newTargetCp;
assert(
changeTrackingEnabled,
@ -889,8 +890,10 @@ PersistedModel.replicate = function(since, targetModel, options, callback) {
function checkpoints() {
var cb = arguments[arguments.length - 1];
sourceModel.checkpoint(function(err) {
targetModel.checkpoint(function(err) {
sourceModel.checkpoint(function(err, source) {
newSourceCp = source.seq;
targetModel.checkpoint(function(err, target) {
newTargetCp = target.seq;
cb(err);
});
});
@ -909,7 +912,10 @@ PersistedModel.replicate = function(since, targetModel, options, callback) {
sourceModel.emit('conflicts', conflicts);
}
if (callback) callback(null, conflicts);
if (callback) {
var newCheckpoints = { source: newSourceCp, target: newTargetCp };
callback(null, conflicts, newCheckpoints);
}
}
};

View File

@ -28,6 +28,14 @@ describe('Replication / Change APIs', function() {
{ id: { id: true, type: String, defaultFn: 'guid' } },
{ trackChanges: true });
// NOTE(bajtos) At the moment, all models share the same Checkpoint
// model. This causes the in-process replication to work differently
// than client-server replication.
// As a workaround, we manually setup unique Checkpoint for TargetModel.
var TargetChange = TargetModel.Change;
TargetChange.Checkpoint = loopback.Checkpoint.extend('TargetCheckpoint');
TargetChange.Checkpoint.attachTo(dataSource);
TargetModel.attachTo(dataSource);
test.startingCheckpoint = -1;
@ -223,6 +231,47 @@ describe('Replication / Change APIs', function() {
}
], done);
});
it('returns new current checkpoints to callback', function(done) {
var sourceCp, targetCp;
async.series([
bumpSourceCheckpoint,
bumpTargetCheckpoint,
bumpTargetCheckpoint,
function replicate(cb) {
expect(sourceCp).to.not.equal(targetCp);
SourceModel.replicate(
TargetModel,
function(err, conflicts, newCheckpoints) {
if (err) return cb(err);
expect(conflicts, 'conflicts').to.eql([]);
expect(newCheckpoints, 'currentCheckpoints').to.eql({
source: sourceCp + 1,
target: targetCp + 1
});
cb();
});
}
], done);
function bumpSourceCheckpoint(cb) {
SourceModel.checkpoint(function(err, inst) {
if (err) return cb(err);
sourceCp = inst.seq;
cb();
});
}
function bumpTargetCheckpoint(cb) {
TargetModel.checkpoint(function(err, inst) {
if (err) return cb(err);
targetCp = inst.seq;
cb();
});
}
});
});
describe('conflict detection - both updated', function() {