Merge pull request #1157 from strongloop/feature/replication-improvements
Replication improvements: Checkpoint
This commit is contained in:
commit
2885317634
|
@ -428,6 +428,8 @@ module.exports = function(Change) {
|
||||||
Change.getCheckpointModel = function() {
|
Change.getCheckpointModel = function() {
|
||||||
var checkpointModel = this.Checkpoint;
|
var checkpointModel = this.Checkpoint;
|
||||||
if (checkpointModel) return checkpointModel;
|
if (checkpointModel) return checkpointModel;
|
||||||
|
// FIXME(bajtos) This code creates multiple different models with the same
|
||||||
|
// model name, which is not a valid supported usage of juggler's API.
|
||||||
this.Checkpoint = checkpointModel = loopback.Checkpoint.extend('checkpoint');
|
this.Checkpoint = checkpointModel = loopback.Checkpoint.extend('checkpoint');
|
||||||
assert(this.dataSource, 'Cannot getCheckpointModel(): ' + this.modelName +
|
assert(this.dataSource, 'Cannot getCheckpointModel(): ' + this.modelName +
|
||||||
' is not attached to a dataSource');
|
' is not attached to a dataSource');
|
||||||
|
|
|
@ -41,7 +41,7 @@ module.exports = function(Checkpoint) {
|
||||||
if (checkpoint) {
|
if (checkpoint) {
|
||||||
cb(null, checkpoint.seq);
|
cb(null, checkpoint.seq);
|
||||||
} else {
|
} else {
|
||||||
Checkpoint.create({seq: 0}, function(err, checkpoint) {
|
Checkpoint.create({ seq: 1 }, function(err, checkpoint) {
|
||||||
if (err) return cb(err);
|
if (err) return cb(err);
|
||||||
cb(null, checkpoint.seq);
|
cb(null, checkpoint.seq);
|
||||||
});
|
});
|
||||||
|
|
|
@ -757,7 +757,7 @@ PersistedModel.changes = function(since, filter, callback) {
|
||||||
|
|
||||||
// TODO(ritch) this whole thing could be optimized a bit more
|
// TODO(ritch) this whole thing could be optimized a bit more
|
||||||
Change.find({ where: {
|
Change.find({ where: {
|
||||||
checkpoint: {gt: since},
|
checkpoint: { gte: since },
|
||||||
modelName: this.modelName
|
modelName: this.modelName
|
||||||
}}, function(err, changes) {
|
}}, function(err, changes) {
|
||||||
if (err) return callback(err);
|
if (err) return callback(err);
|
||||||
|
@ -832,6 +832,10 @@ PersistedModel.replicate = function(since, targetModel, options, callback) {
|
||||||
since = -1;
|
since = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (typeof since !== 'object') {
|
||||||
|
since = { source: since, target: since };
|
||||||
|
}
|
||||||
|
|
||||||
options = options || {};
|
options = options || {};
|
||||||
|
|
||||||
var sourceModel = this;
|
var sourceModel = this;
|
||||||
|
@ -840,6 +844,7 @@ PersistedModel.replicate = function(since, targetModel, options, callback) {
|
||||||
var Change = this.getChangeModel();
|
var Change = this.getChangeModel();
|
||||||
var TargetChange = targetModel.getChangeModel();
|
var TargetChange = targetModel.getChangeModel();
|
||||||
var changeTrackingEnabled = Change && TargetChange;
|
var changeTrackingEnabled = Change && TargetChange;
|
||||||
|
var newSourceCp, newTargetCp;
|
||||||
|
|
||||||
assert(
|
assert(
|
||||||
changeTrackingEnabled,
|
changeTrackingEnabled,
|
||||||
|
@ -851,21 +856,21 @@ PersistedModel.replicate = function(since, targetModel, options, callback) {
|
||||||
};
|
};
|
||||||
|
|
||||||
var tasks = [
|
var tasks = [
|
||||||
|
checkpoints,
|
||||||
getSourceChanges,
|
getSourceChanges,
|
||||||
getDiffFromTarget,
|
getDiffFromTarget,
|
||||||
createSourceUpdates,
|
createSourceUpdates,
|
||||||
bulkUpdate,
|
bulkUpdate
|
||||||
checkpoint
|
|
||||||
];
|
];
|
||||||
|
|
||||||
async.waterfall(tasks, done);
|
async.waterfall(tasks, done);
|
||||||
|
|
||||||
function getSourceChanges(cb) {
|
function getSourceChanges(cb) {
|
||||||
sourceModel.changes(since, options.filter, cb);
|
sourceModel.changes(since.source, options.filter, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
function getDiffFromTarget(sourceChanges, cb) {
|
function getDiffFromTarget(sourceChanges, cb) {
|
||||||
targetModel.diff(since, sourceChanges, cb);
|
targetModel.diff(since.target, sourceChanges, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
function createSourceUpdates(_diff, cb) {
|
function createSourceUpdates(_diff, cb) {
|
||||||
|
@ -883,9 +888,15 @@ PersistedModel.replicate = function(since, targetModel, options, callback) {
|
||||||
targetModel.bulkUpdate(updates, cb);
|
targetModel.bulkUpdate(updates, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
function checkpoint() {
|
function checkpoints() {
|
||||||
var cb = arguments[arguments.length - 1];
|
var cb = arguments[arguments.length - 1];
|
||||||
sourceModel.checkpoint(cb);
|
sourceModel.checkpoint(function(err, source) {
|
||||||
|
newSourceCp = source.seq;
|
||||||
|
targetModel.checkpoint(function(err, target) {
|
||||||
|
newTargetCp = target.seq;
|
||||||
|
cb(err);
|
||||||
|
});
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
function done(err) {
|
function done(err) {
|
||||||
|
@ -901,7 +912,10 @@ PersistedModel.replicate = function(since, targetModel, options, callback) {
|
||||||
sourceModel.emit('conflicts', conflicts);
|
sourceModel.emit('conflicts', conflicts);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (callback) callback(null, conflicts);
|
if (callback) {
|
||||||
|
var newCheckpoints = { source: newSourceCp, target: newTargetCp };
|
||||||
|
callback(null, conflicts, newCheckpoints);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -14,7 +14,7 @@ describe('Checkpoint', function() {
|
||||||
function(next) {
|
function(next) {
|
||||||
Checkpoint.current(function(err, seq) {
|
Checkpoint.current(function(err, seq) {
|
||||||
if (err) next(err);
|
if (err) next(err);
|
||||||
expect(seq).to.equal(2);
|
expect(seq).to.equal(3);
|
||||||
next();
|
next();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,22 +8,34 @@ var expect = require('chai').expect;
|
||||||
|
|
||||||
describe('Replication / Change APIs', function() {
|
describe('Replication / Change APIs', function() {
|
||||||
var dataSource, SourceModel, TargetModel;
|
var dataSource, SourceModel, TargetModel;
|
||||||
|
var tid = 0; // per-test unique id used e.g. to build unique model names
|
||||||
|
|
||||||
beforeEach(function() {
|
beforeEach(function() {
|
||||||
|
tid++;
|
||||||
var test = this;
|
var test = this;
|
||||||
dataSource = this.dataSource = loopback.createDataSource({
|
dataSource = this.dataSource = loopback.createDataSource({
|
||||||
connector: loopback.Memory
|
connector: loopback.Memory
|
||||||
});
|
});
|
||||||
SourceModel = this.SourceModel = PersistedModel.extend('SourceModel',
|
SourceModel = this.SourceModel = PersistedModel.extend(
|
||||||
|
'SourceModel-' + tid,
|
||||||
{ id: { id: true, type: String, defaultFn: 'guid' } },
|
{ id: { id: true, type: String, defaultFn: 'guid' } },
|
||||||
{ trackChanges: true });
|
{ trackChanges: true });
|
||||||
|
|
||||||
SourceModel.attachTo(dataSource);
|
SourceModel.attachTo(dataSource);
|
||||||
|
|
||||||
TargetModel = this.TargetModel = PersistedModel.extend('TargetModel',
|
TargetModel = this.TargetModel = PersistedModel.extend(
|
||||||
|
'TargetModel-' + tid,
|
||||||
{ id: { id: true, type: String, defaultFn: 'guid' } },
|
{ id: { id: true, type: String, defaultFn: 'guid' } },
|
||||||
{ trackChanges: true });
|
{ trackChanges: true });
|
||||||
|
|
||||||
|
// NOTE(bajtos) At the moment, all models share the same Checkpoint
|
||||||
|
// model. This causes the in-process replication to work differently
|
||||||
|
// than client-server replication.
|
||||||
|
// As a workaround, we manually setup unique Checkpoint for TargetModel.
|
||||||
|
var TargetChange = TargetModel.Change;
|
||||||
|
TargetChange.Checkpoint = loopback.Checkpoint.extend('TargetCheckpoint');
|
||||||
|
TargetChange.Checkpoint.attachTo(dataSource);
|
||||||
|
|
||||||
TargetModel.attachTo(dataSource);
|
TargetModel.attachTo(dataSource);
|
||||||
|
|
||||||
test.startingCheckpoint = -1;
|
test.startingCheckpoint = -1;
|
||||||
|
@ -109,6 +121,157 @@ describe('Replication / Change APIs', function() {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('applies "since" filter on source changes', function(done) {
|
||||||
|
async.series([
|
||||||
|
function createModelInSourceCp1(next) {
|
||||||
|
SourceModel.create({ id: '1' }, next);
|
||||||
|
},
|
||||||
|
function checkpoint(next) {
|
||||||
|
SourceModel.checkpoint(next);
|
||||||
|
},
|
||||||
|
function createModelInSourceCp2(next) {
|
||||||
|
SourceModel.create({ id: '2' }, next);
|
||||||
|
},
|
||||||
|
function replicateLastChangeOnly(next) {
|
||||||
|
SourceModel.currentCheckpoint(function(err, cp) {
|
||||||
|
if (err) return done(err);
|
||||||
|
SourceModel.replicate(cp, TargetModel, next);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
function verify(next) {
|
||||||
|
TargetModel.find(function(err, list) {
|
||||||
|
if (err) return done(err);
|
||||||
|
// '1' should be skipped by replication
|
||||||
|
expect(getIds(list)).to.eql(['2']);
|
||||||
|
next();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
], done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('applies "since" filter on target changes', function(done) {
|
||||||
|
// Because the "since" filter is just an optimization,
|
||||||
|
// there isn't really any observable behaviour we could
|
||||||
|
// check to assert correct implementation.
|
||||||
|
var diffSince = [];
|
||||||
|
spyAndStoreSinceArg(TargetModel, 'diff', diffSince);
|
||||||
|
|
||||||
|
SourceModel.replicate(10, TargetModel, function(err) {
|
||||||
|
if (err) return done(err);
|
||||||
|
expect(diffSince).to.eql([10]);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('uses different "since" value for source and target', function(done) {
|
||||||
|
var sourceSince = [];
|
||||||
|
var targetSince = [];
|
||||||
|
|
||||||
|
spyAndStoreSinceArg(SourceModel, 'changes', sourceSince);
|
||||||
|
spyAndStoreSinceArg(TargetModel, 'diff', targetSince);
|
||||||
|
|
||||||
|
var since = { source: 1, target: 2 };
|
||||||
|
SourceModel.replicate(since, TargetModel, function(err) {
|
||||||
|
if (err) return done(err);
|
||||||
|
expect(sourceSince).to.eql([1]);
|
||||||
|
expect(targetSince).to.eql([2]);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('picks up changes made during replication', function(done) {
|
||||||
|
var bulkUpdate = TargetModel.bulkUpdate;
|
||||||
|
TargetModel.bulkUpdate = function(data, cb) {
|
||||||
|
var self = this;
|
||||||
|
// simulate the situation when another model is created
|
||||||
|
// while a replication run is in progress
|
||||||
|
SourceModel.create({ id: 'racer' }, function(err) {
|
||||||
|
if (err) return cb(err);
|
||||||
|
bulkUpdate.call(self, data, cb);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
var lastCp;
|
||||||
|
async.series([
|
||||||
|
function buildSomeDataToReplicate(next) {
|
||||||
|
SourceModel.create({ id: 'init' }, next);
|
||||||
|
},
|
||||||
|
function getLastCp(next) {
|
||||||
|
SourceModel.currentCheckpoint(function(err, cp) {
|
||||||
|
if (err) return done(err);
|
||||||
|
lastCp = cp;
|
||||||
|
next();
|
||||||
|
});
|
||||||
|
},
|
||||||
|
function replicate(next) {
|
||||||
|
SourceModel.replicate(TargetModel, next);
|
||||||
|
},
|
||||||
|
function verifyAssumptions(next) {
|
||||||
|
SourceModel.find(function(err, list) {
|
||||||
|
expect(getIds(list), 'source ids')
|
||||||
|
.to.eql(['init', 'racer']);
|
||||||
|
|
||||||
|
TargetModel.find(function(err, list) {
|
||||||
|
expect(getIds(list), 'target ids after first sync')
|
||||||
|
.to.eql(['init']);
|
||||||
|
next();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
},
|
||||||
|
function replicateAgain(next) {
|
||||||
|
TargetModel.bulkUpdate = bulkUpdate;
|
||||||
|
SourceModel.replicate(lastCp + 1, TargetModel, next);
|
||||||
|
},
|
||||||
|
function verify(next) {
|
||||||
|
TargetModel.find(function(err, list) {
|
||||||
|
expect(getIds(list), 'target ids').to.eql(['init', 'racer']);
|
||||||
|
next();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
], done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns new current checkpoints to callback', function(done) {
|
||||||
|
var sourceCp, targetCp;
|
||||||
|
async.series([
|
||||||
|
bumpSourceCheckpoint,
|
||||||
|
bumpTargetCheckpoint,
|
||||||
|
bumpTargetCheckpoint,
|
||||||
|
function replicate(cb) {
|
||||||
|
expect(sourceCp).to.not.equal(targetCp);
|
||||||
|
|
||||||
|
SourceModel.replicate(
|
||||||
|
TargetModel,
|
||||||
|
function(err, conflicts, newCheckpoints) {
|
||||||
|
if (err) return cb(err);
|
||||||
|
expect(conflicts, 'conflicts').to.eql([]);
|
||||||
|
expect(newCheckpoints, 'currentCheckpoints').to.eql({
|
||||||
|
source: sourceCp + 1,
|
||||||
|
target: targetCp + 1
|
||||||
|
});
|
||||||
|
cb();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
], done);
|
||||||
|
|
||||||
|
function bumpSourceCheckpoint(cb) {
|
||||||
|
SourceModel.checkpoint(function(err, inst) {
|
||||||
|
if (err) return cb(err);
|
||||||
|
sourceCp = inst.seq;
|
||||||
|
cb();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function bumpTargetCheckpoint(cb) {
|
||||||
|
TargetModel.checkpoint(function(err, inst) {
|
||||||
|
if (err) return cb(err);
|
||||||
|
targetCp = inst.seq;
|
||||||
|
cb();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('conflict detection - both updated', function() {
|
describe('conflict detection - both updated', function() {
|
||||||
|
@ -493,4 +656,22 @@ describe('Replication / Change APIs', function() {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
function spyAndStoreSinceArg(Model, methodName, store) {
|
||||||
|
var orig = Model[methodName];
|
||||||
|
Model[methodName] = function(since) {
|
||||||
|
store.push(since);
|
||||||
|
orig.apply(this, arguments);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function getPropValue(obj, name) {
|
||||||
|
return Array.isArray(obj) ?
|
||||||
|
obj.map(function(it) { return getPropValue(it, name); }) :
|
||||||
|
obj[name];
|
||||||
|
}
|
||||||
|
|
||||||
|
function getIds(list) {
|
||||||
|
return getPropValue(list, 'id');
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in New Issue