Merge pull request #1176 from strongloop/feature/more-replication-improvements
Prevent more kinds of false replication conflicts
This commit is contained in:
commit
e59493ec40
|
@ -141,10 +141,6 @@ module.exports = function(Change) {
|
||||||
|
|
||||||
Change.prototype.rectify = function(cb) {
|
Change.prototype.rectify = function(cb) {
|
||||||
var change = this;
|
var change = this;
|
||||||
var tasks = [
|
|
||||||
updateRevision,
|
|
||||||
updateCheckpoint
|
|
||||||
];
|
|
||||||
var currentRev = this.rev;
|
var currentRev = this.rev;
|
||||||
|
|
||||||
change.debug('rectify change');
|
change.debug('rectify change');
|
||||||
|
@ -153,8 +149,53 @@ module.exports = function(Change) {
|
||||||
if (err) throw new Error(err);
|
if (err) throw new Error(err);
|
||||||
};
|
};
|
||||||
|
|
||||||
async.parallel(tasks, function(err) {
|
async.parallel([
|
||||||
|
function getCurrentCheckpoint(next) {
|
||||||
|
change.constructor.getCheckpointModel().current(next);
|
||||||
|
},
|
||||||
|
function getCurrentRevision(next) {
|
||||||
|
change.currentRevision(next);
|
||||||
|
}
|
||||||
|
], doRectify);
|
||||||
|
|
||||||
|
function doRectify(err, results) {
|
||||||
if (err) return cb(err);
|
if (err) return cb(err);
|
||||||
|
var checkpoint = results[0];
|
||||||
|
var rev = results[1];
|
||||||
|
|
||||||
|
if (rev) {
|
||||||
|
// avoid setting rev and prev to the same value
|
||||||
|
if (currentRev === rev) {
|
||||||
|
change.debug('rev and prev are equal (not updating rev)');
|
||||||
|
} else {
|
||||||
|
change.rev = rev;
|
||||||
|
change.debug('updated revision (was ' + currentRev + ')');
|
||||||
|
if (change.checkpoint !== checkpoint) {
|
||||||
|
// previous revision is updated only across checkpoints
|
||||||
|
change.prev = currentRev;
|
||||||
|
change.debug('updated prev');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
change.rev = null;
|
||||||
|
change.debug('updated revision (was ' + currentRev + ')');
|
||||||
|
if (change.checkpoint !== checkpoint) {
|
||||||
|
// previous revision is updated only across checkpoints
|
||||||
|
if (currentRev) {
|
||||||
|
change.prev = currentRev;
|
||||||
|
} else if (!change.prev) {
|
||||||
|
change.debug('ERROR - could not determing prev');
|
||||||
|
change.prev = Change.UNKNOWN;
|
||||||
|
}
|
||||||
|
change.debug('updated prev');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (change.checkpoint != checkpoint) {
|
||||||
|
debug('update checkpoint to', checkpoint);
|
||||||
|
change.checkpoint = checkpoint;
|
||||||
|
}
|
||||||
|
|
||||||
if (change.prev === Change.UNKNOWN) {
|
if (change.prev === Change.UNKNOWN) {
|
||||||
// this occurs when a record of a change doesn't exist
|
// this occurs when a record of a change doesn't exist
|
||||||
// and its current revision is null (not found)
|
// and its current revision is null (not found)
|
||||||
|
@ -162,41 +203,6 @@ module.exports = function(Change) {
|
||||||
} else {
|
} else {
|
||||||
change.save(cb);
|
change.save(cb);
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
function updateRevision(cb) {
|
|
||||||
// get the current revision
|
|
||||||
change.currentRevision(function(err, rev) {
|
|
||||||
if (err) return Change.handleError(err, cb);
|
|
||||||
if (rev) {
|
|
||||||
// avoid setting rev and prev to the same value
|
|
||||||
if (currentRev !== rev) {
|
|
||||||
change.rev = rev;
|
|
||||||
change.prev = currentRev;
|
|
||||||
} else {
|
|
||||||
change.debug('rev and prev are equal (not updating rev)');
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
change.rev = null;
|
|
||||||
if (currentRev) {
|
|
||||||
change.prev = currentRev;
|
|
||||||
} else if (!change.prev) {
|
|
||||||
change.debug('ERROR - could not determing prev');
|
|
||||||
change.prev = Change.UNKNOWN;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
change.debug('updated revision (was ' + currentRev + ')');
|
|
||||||
cb();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
function updateCheckpoint(cb) {
|
|
||||||
change.constructor.getCheckpointModel().current(function(err, checkpoint) {
|
|
||||||
if (err) return Change.handleError(err);
|
|
||||||
debug('updated checkpoint to', checkpoint);
|
|
||||||
change.checkpoint = checkpoint;
|
|
||||||
cb();
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -446,10 +452,13 @@ module.exports = function(Change) {
|
||||||
Change.prototype.debug = function() {
|
Change.prototype.debug = function() {
|
||||||
if (debug.enabled) {
|
if (debug.enabled) {
|
||||||
var args = Array.prototype.slice.call(arguments);
|
var args = Array.prototype.slice.call(arguments);
|
||||||
|
args[0] = args[0] + ' %s';
|
||||||
|
args.push(this.modelName);
|
||||||
debug.apply(this, args);
|
debug.apply(this, args);
|
||||||
debug('\tid', this.id);
|
debug('\tid', this.id);
|
||||||
debug('\trev', this.rev);
|
debug('\trev', this.rev);
|
||||||
debug('\tprev', this.prev);
|
debug('\tprev', this.prev);
|
||||||
|
debug('\tcheckpoint', this.checkpoint);
|
||||||
debug('\tmodelName', this.modelName);
|
debug('\tmodelName', this.modelName);
|
||||||
debug('\tmodelId', this.modelId);
|
debug('\tmodelId', this.modelId);
|
||||||
debug('\ttype', this.type());
|
debug('\ttype', this.type());
|
||||||
|
|
|
@ -8,6 +8,7 @@ var runtime = require('./runtime');
|
||||||
var assert = require('assert');
|
var assert = require('assert');
|
||||||
var async = require('async');
|
var async = require('async');
|
||||||
var deprecated = require('depd')('loopback');
|
var deprecated = require('depd')('loopback');
|
||||||
|
var debug = require('debug')('loopback:persisted-model');
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extends Model with basic query and CRUD support.
|
* Extends Model with basic query and CRUD support.
|
||||||
|
@ -839,21 +840,64 @@ PersistedModel.replicate = function(since, targetModel, options, callback) {
|
||||||
options = options || {};
|
options = options || {};
|
||||||
|
|
||||||
var sourceModel = this;
|
var sourceModel = this;
|
||||||
var diff;
|
callback = callback || function defaultReplicationCallback(err) {
|
||||||
var updates;
|
if (err) throw err;
|
||||||
var Change = this.getChangeModel();
|
};
|
||||||
|
|
||||||
|
debug('replicating %s since %s to %s since %s',
|
||||||
|
sourceModel.modelName,
|
||||||
|
since.source,
|
||||||
|
targetModel.modelName,
|
||||||
|
since.target);
|
||||||
|
if (options.filter) {
|
||||||
|
debug('\twith filter %j', options.filter);
|
||||||
|
}
|
||||||
|
|
||||||
|
// In order to avoid a race condition between the replication and
|
||||||
|
// other clients modifying the data, we must create the new target
|
||||||
|
// checkpoint as the first step of the replication process.
|
||||||
|
// As a side-effect of that, the replicated changes are associated
|
||||||
|
// with the new target checkpoint. This is actually desired behaviour,
|
||||||
|
// because that way clients replicating *from* the target model
|
||||||
|
// since the new checkpoint will pick these changes up.
|
||||||
|
// However, it increases the likelihood of (false) conflicts being detected.
|
||||||
|
// In order to prevent that, we run the replication multiple times,
|
||||||
|
// until no changes were replicated, but at most MAX_ATTEMPTS times
|
||||||
|
// to prevent starvation. In most cases, the second run will find no changes
|
||||||
|
// to replicate and we are done.
|
||||||
|
var MAX_ATTEMPTS = 3;
|
||||||
|
|
||||||
|
run(1, since);
|
||||||
|
|
||||||
|
function run(attempt, since) {
|
||||||
|
debug('\titeration #%s', attempt);
|
||||||
|
tryReplicate(sourceModel, targetModel, since, options, next);
|
||||||
|
|
||||||
|
function next(err, conflicts, cps, updates) {
|
||||||
|
var finished = err || conflicts.length ||
|
||||||
|
!updates || updates.length === 0 ||
|
||||||
|
attempt >= MAX_ATTEMPTS;
|
||||||
|
|
||||||
|
if (finished)
|
||||||
|
return callback(err, conflicts, cps);
|
||||||
|
run(attempt + 1, cps);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
function tryReplicate(sourceModel, targetModel, since, options, callback) {
|
||||||
|
var Change = sourceModel.getChangeModel();
|
||||||
var TargetChange = targetModel.getChangeModel();
|
var TargetChange = targetModel.getChangeModel();
|
||||||
var changeTrackingEnabled = Change && TargetChange;
|
var changeTrackingEnabled = Change && TargetChange;
|
||||||
var newSourceCp, newTargetCp;
|
|
||||||
|
|
||||||
assert(
|
assert(
|
||||||
changeTrackingEnabled,
|
changeTrackingEnabled,
|
||||||
'You must enable change tracking before replicating'
|
'You must enable change tracking before replicating'
|
||||||
);
|
);
|
||||||
|
|
||||||
callback = callback || function defaultReplicationCallback(err) {
|
var diff;
|
||||||
if (err) throw err;
|
var updates;
|
||||||
};
|
var newSourceCp, newTargetCp;
|
||||||
|
|
||||||
var tasks = [
|
var tasks = [
|
||||||
checkpoints,
|
checkpoints,
|
||||||
|
@ -866,17 +910,38 @@ PersistedModel.replicate = function(since, targetModel, options, callback) {
|
||||||
async.waterfall(tasks, done);
|
async.waterfall(tasks, done);
|
||||||
|
|
||||||
function getSourceChanges(cb) {
|
function getSourceChanges(cb) {
|
||||||
sourceModel.changes(since.source, options.filter, cb);
|
sourceModel.changes(since.source, options.filter, debug.enabled ? log : cb);
|
||||||
|
|
||||||
|
function log(err, result) {
|
||||||
|
if (err) return cb(err);
|
||||||
|
debug('\tusing source changes');
|
||||||
|
result.forEach(function(it) { debug('\t\t%j', it); });
|
||||||
|
cb(err, result);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function getDiffFromTarget(sourceChanges, cb) {
|
function getDiffFromTarget(sourceChanges, cb) {
|
||||||
targetModel.diff(since.target, sourceChanges, cb);
|
targetModel.diff(since.target, sourceChanges, debug.enabled ? log : cb);
|
||||||
|
|
||||||
|
function log(err, result) {
|
||||||
|
if (err) return cb(err);
|
||||||
|
if (result.conflicts && result.conflicts.length) {
|
||||||
|
debug('\tdiff conflicts');
|
||||||
|
result.conflicts.forEach(function(d) { debug('\t\t%j', d); });
|
||||||
|
}
|
||||||
|
if (result.deltas && result.deltas.length) {
|
||||||
|
debug('\tdiff deltas');
|
||||||
|
result.deltas.forEach(function(it) { debug('\t\t%j', it); });
|
||||||
|
}
|
||||||
|
cb(err, result);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function createSourceUpdates(_diff, cb) {
|
function createSourceUpdates(_diff, cb) {
|
||||||
diff = _diff;
|
diff = _diff;
|
||||||
diff.conflicts = diff.conflicts || [];
|
diff.conflicts = diff.conflicts || [];
|
||||||
if (diff && diff.deltas && diff.deltas.length) {
|
if (diff && diff.deltas && diff.deltas.length) {
|
||||||
|
debug('\tbuilding a list of updates');
|
||||||
sourceModel.createUpdates(diff.deltas, cb);
|
sourceModel.createUpdates(diff.deltas, cb);
|
||||||
} else {
|
} else {
|
||||||
// nothing to replicate
|
// nothing to replicate
|
||||||
|
@ -884,7 +949,9 @@ PersistedModel.replicate = function(since, targetModel, options, callback) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function bulkUpdate(updates, cb) {
|
function bulkUpdate(_updates, cb) {
|
||||||
|
debug('\tstarting bulk update');
|
||||||
|
updates = _updates;
|
||||||
targetModel.bulkUpdate(updates, cb);
|
targetModel.bulkUpdate(updates, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -894,6 +961,9 @@ PersistedModel.replicate = function(since, targetModel, options, callback) {
|
||||||
newSourceCp = source.seq;
|
newSourceCp = source.seq;
|
||||||
targetModel.checkpoint(function(err, target) {
|
targetModel.checkpoint(function(err, target) {
|
||||||
newTargetCp = target.seq;
|
newTargetCp = target.seq;
|
||||||
|
debug('\tcreated checkpoints');
|
||||||
|
debug('\t\t%s for source model %s', newSourceCp, sourceModel.modelName);
|
||||||
|
debug('\t\t%s for target model %s', newTargetCp, targetModel.modelName);
|
||||||
cb(err);
|
cb(err);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
@ -902,6 +972,12 @@ PersistedModel.replicate = function(since, targetModel, options, callback) {
|
||||||
function done(err) {
|
function done(err) {
|
||||||
if (err) return callback(err);
|
if (err) return callback(err);
|
||||||
|
|
||||||
|
debug('\treplication finished');
|
||||||
|
debug('\t\t%s conflict(s) detected', diff.conflicts.length);
|
||||||
|
debug('\t\t%s change(s) applied', updates && updates.length);
|
||||||
|
debug('\t\tnew checkpoints: { source: %j, target: %j }',
|
||||||
|
newSourceCp, newTargetCp);
|
||||||
|
|
||||||
var conflicts = diff.conflicts.map(function(change) {
|
var conflicts = diff.conflicts.map(function(change) {
|
||||||
return new Change.Conflict(
|
return new Change.Conflict(
|
||||||
change.modelId, sourceModel, targetModel
|
change.modelId, sourceModel, targetModel
|
||||||
|
@ -914,10 +990,10 @@ PersistedModel.replicate = function(since, targetModel, options, callback) {
|
||||||
|
|
||||||
if (callback) {
|
if (callback) {
|
||||||
var newCheckpoints = { source: newSourceCp, target: newTargetCp };
|
var newCheckpoints = { source: newSourceCp, target: newTargetCp };
|
||||||
callback(null, conflicts, newCheckpoints);
|
callback(null, conflicts, newCheckpoints, updates);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create an update list (for `Model.bulkUpdate()`) from a delta list
|
* Create an update list (for `Model.bulkUpdate()`) from a delta list
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
var async = require('async');
|
||||||
|
|
||||||
var Change;
|
var Change;
|
||||||
var TestModel;
|
var TestModel;
|
||||||
|
|
||||||
|
@ -131,18 +133,67 @@ describe('Change', function() {
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('change.rectify(callback)', function() {
|
describe('change.rectify(callback)', function() {
|
||||||
it('should create a new change with the correct revision', function(done) {
|
var change;
|
||||||
var test = this;
|
beforeEach(function() {
|
||||||
var change = new Change({
|
change = new Change({
|
||||||
modelName: this.modelName,
|
modelName: this.modelName,
|
||||||
modelId: this.modelId
|
modelId: this.modelId
|
||||||
});
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should create a new change with the correct revision', function(done) {
|
||||||
|
var test = this;
|
||||||
change.rectify(function(err, ch) {
|
change.rectify(function(err, ch) {
|
||||||
assert.equal(ch.rev, test.revisionForModel);
|
assert.equal(ch.rev, test.revisionForModel);
|
||||||
done();
|
done();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// This test is a low-level equivalent of the test in replication.test.js
|
||||||
|
// called "replicates multiple updates within the same CP"
|
||||||
|
it('should merge updates within the same checkpoint', function(done) {
|
||||||
|
var test = this;
|
||||||
|
var originalRev = this.revisionForModel;
|
||||||
|
var cp;
|
||||||
|
|
||||||
|
async.series([
|
||||||
|
rectify,
|
||||||
|
checkpoint,
|
||||||
|
update,
|
||||||
|
rectify,
|
||||||
|
update,
|
||||||
|
rectify,
|
||||||
|
function(next) {
|
||||||
|
expect(change.checkpoint, 'checkpoint').to.equal(cp);
|
||||||
|
expect(change.type(), 'type').to.equal('update');
|
||||||
|
expect(change.prev, 'prev').to.equal(originalRev);
|
||||||
|
expect(change.rev, 'rev').to.equal(test.revisionForModel);
|
||||||
|
next();
|
||||||
|
}
|
||||||
|
], done);
|
||||||
|
|
||||||
|
function rectify(next) {
|
||||||
|
change.rectify(next);
|
||||||
|
}
|
||||||
|
|
||||||
|
function checkpoint(next) {
|
||||||
|
TestModel.checkpoint(function(err, inst) {
|
||||||
|
if (err) return next(err);
|
||||||
|
cp = inst.seq;
|
||||||
|
next();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function update(next) {
|
||||||
|
var model = test.model;
|
||||||
|
|
||||||
|
model.name += 'updated';
|
||||||
|
model.save(function(err) {
|
||||||
|
test.revisionForModel = Change.revisionForInst(model);
|
||||||
|
next(err);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('change.currentRevision(callback)', function() {
|
describe('change.currentRevision(callback)', function() {
|
||||||
|
|
|
@ -5,13 +5,16 @@ var Change = loopback.Change;
|
||||||
var defineModelTestsWithDataSource = require('./util/model-tests');
|
var defineModelTestsWithDataSource = require('./util/model-tests');
|
||||||
var PersistedModel = loopback.PersistedModel;
|
var PersistedModel = loopback.PersistedModel;
|
||||||
var expect = require('chai').expect;
|
var expect = require('chai').expect;
|
||||||
|
var debug = require('debug')('test');
|
||||||
|
|
||||||
describe('Replication / Change APIs', function() {
|
describe('Replication / Change APIs', function() {
|
||||||
var dataSource, SourceModel, TargetModel;
|
var dataSource, SourceModel, TargetModel;
|
||||||
|
var useSinceFilter;
|
||||||
var tid = 0; // per-test unique id used e.g. to build unique model names
|
var tid = 0; // per-test unique id used e.g. to build unique model names
|
||||||
|
|
||||||
beforeEach(function() {
|
beforeEach(function() {
|
||||||
tid++;
|
tid++;
|
||||||
|
useSinceFilter = false;
|
||||||
var test = this;
|
var test = this;
|
||||||
dataSource = this.dataSource = loopback.createDataSource({
|
dataSource = this.dataSource = loopback.createDataSource({
|
||||||
connector: loopback.Memory
|
connector: loopback.Memory
|
||||||
|
@ -44,12 +47,7 @@ describe('Replication / Change APIs', function() {
|
||||||
SourceModel.create({name: 'foo'}, function(err, inst) {
|
SourceModel.create({name: 'foo'}, function(err, inst) {
|
||||||
if (err) return cb(err);
|
if (err) return cb(err);
|
||||||
test.model = inst;
|
test.model = inst;
|
||||||
|
|
||||||
// give loopback a chance to register the change
|
|
||||||
// TODO(ritch) get rid of this...
|
|
||||||
setTimeout(function() {
|
|
||||||
SourceModel.replicate(TargetModel, cb);
|
SourceModel.replicate(TargetModel, cb);
|
||||||
}, 100);
|
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
|
@ -191,6 +189,8 @@ describe('Replication / Change APIs', function() {
|
||||||
if (err) return cb(err);
|
if (err) return cb(err);
|
||||||
bulkUpdate.call(self, data, cb);
|
bulkUpdate.call(self, data, cb);
|
||||||
});
|
});
|
||||||
|
// create the new model only once
|
||||||
|
TargetModel.bulkUpdate = bulkUpdate;
|
||||||
};
|
};
|
||||||
|
|
||||||
var lastCp;
|
var lastCp;
|
||||||
|
@ -215,13 +215,12 @@ describe('Replication / Change APIs', function() {
|
||||||
|
|
||||||
TargetModel.find(function(err, list) {
|
TargetModel.find(function(err, list) {
|
||||||
expect(getIds(list), 'target ids after first sync')
|
expect(getIds(list), 'target ids after first sync')
|
||||||
.to.eql(['init']);
|
.to.include.members(['init']);
|
||||||
next();
|
next();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
function replicateAgain(next) {
|
function replicateAgain(next) {
|
||||||
TargetModel.bulkUpdate = bulkUpdate;
|
|
||||||
SourceModel.replicate(lastCp + 1, TargetModel, next);
|
SourceModel.replicate(lastCp + 1, TargetModel, next);
|
||||||
},
|
},
|
||||||
function verify(next) {
|
function verify(next) {
|
||||||
|
@ -273,6 +272,26 @@ describe('Replication / Change APIs', function() {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('leaves current target checkpoint empty', function(done) {
|
||||||
|
async.series([
|
||||||
|
function createTestData(next) {
|
||||||
|
SourceModel.create({}, next);
|
||||||
|
},
|
||||||
|
replicateExpectingSuccess(),
|
||||||
|
function verify(next) {
|
||||||
|
TargetModel.currentCheckpoint(function(err, cp) {
|
||||||
|
if (err) return next(err);
|
||||||
|
TargetModel.getChangeModel().find(
|
||||||
|
{ where: { checkpoint: { gte: cp } } },
|
||||||
|
function(err, changes) {
|
||||||
|
if (err) return done(err);
|
||||||
|
expect(changes).to.be.empty();
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
], done);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('conflict detection - both updated', function() {
|
describe('conflict detection - both updated', function() {
|
||||||
|
@ -658,6 +677,259 @@ describe('Replication / Change APIs', function() {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('complex setup', function() {
|
||||||
|
var sourceInstance, sourceInstanceId, AnotherModel;
|
||||||
|
|
||||||
|
beforeEach(function createReplicatedInstance(done) {
|
||||||
|
async.series([
|
||||||
|
function createInstance(next) {
|
||||||
|
SourceModel.create({ id: 'test-instance' }, function(err, result) {
|
||||||
|
sourceInstance = result;
|
||||||
|
sourceInstanceId = result.id;
|
||||||
|
next(err);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
replicateExpectingSuccess(),
|
||||||
|
verifySourceWasReplicated()
|
||||||
|
], done);
|
||||||
|
});
|
||||||
|
|
||||||
|
beforeEach(function setupThirdModel() {
|
||||||
|
AnotherModel = this.AnotherModel = PersistedModel.extend(
|
||||||
|
'AnotherModel-' + tid,
|
||||||
|
{ id: { id: true, type: String, defaultFn: 'guid' } },
|
||||||
|
{ trackChanges: true });
|
||||||
|
|
||||||
|
// NOTE(bajtos) At the moment, all models share the same Checkpoint
|
||||||
|
// model. This causes the in-process replication to work differently
|
||||||
|
// than client-server replication.
|
||||||
|
// As a workaround, we manually setup unique Checkpoint for AnotherModel.
|
||||||
|
var AnotherChange = AnotherModel.Change;
|
||||||
|
AnotherChange.Checkpoint = loopback.Checkpoint.extend('AnotherCheckpoint');
|
||||||
|
AnotherChange.Checkpoint.attachTo(dataSource);
|
||||||
|
|
||||||
|
AnotherModel.attachTo(dataSource);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('correctly replicates without checkpoint filter', function(done) {
|
||||||
|
async.series([
|
||||||
|
updateSourceInstanceNameTo('updated'),
|
||||||
|
replicateExpectingSuccess(),
|
||||||
|
verifySourceWasReplicated(),
|
||||||
|
|
||||||
|
function deleteInstance(next) {
|
||||||
|
sourceInstance.remove(next);
|
||||||
|
},
|
||||||
|
replicateExpectingSuccess(),
|
||||||
|
function verifyTargetModelWasDeleted(next) {
|
||||||
|
TargetModel.find(function(err, list) {
|
||||||
|
if (err) return next(err);
|
||||||
|
expect(getIds(list)).to.not.contain(sourceInstance.id);
|
||||||
|
next();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
], done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('replicates multiple updates within the same CP', function(done) {
|
||||||
|
async.series([
|
||||||
|
replicateExpectingSuccess(),
|
||||||
|
verifySourceWasReplicated(),
|
||||||
|
|
||||||
|
updateSourceInstanceNameTo('updated'),
|
||||||
|
updateSourceInstanceNameTo('again'),
|
||||||
|
replicateExpectingSuccess(),
|
||||||
|
verifySourceWasReplicated()
|
||||||
|
], done);
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('clientA-server-clientB', function() {
|
||||||
|
var ClientA, Server, ClientB;
|
||||||
|
|
||||||
|
beforeEach(function() {
|
||||||
|
ClientA = SourceModel;
|
||||||
|
Server = TargetModel;
|
||||||
|
ClientB = AnotherModel;
|
||||||
|
|
||||||
|
// NOTE(bajtos) The tests should ideally pass without the since
|
||||||
|
// filter too. Unfortunately that's not possible with the current
|
||||||
|
// implementation that remembers only the last two changes made.
|
||||||
|
useSinceFilter = true;
|
||||||
|
});
|
||||||
|
|
||||||
|
it('replicates new models', function(done) {
|
||||||
|
async.series([
|
||||||
|
// Note that ClientA->Server was already replicated during setup
|
||||||
|
replicateExpectingSuccess(Server, ClientB),
|
||||||
|
verifySourceWasReplicated(ClientB)
|
||||||
|
], done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('propagates updates with no false conflicts', function(done) {
|
||||||
|
async.series([
|
||||||
|
updateSourceInstanceNameTo('v2'),
|
||||||
|
replicateExpectingSuccess(ClientA, Server),
|
||||||
|
|
||||||
|
replicateExpectingSuccess(Server, ClientB),
|
||||||
|
|
||||||
|
updateSourceInstanceNameTo('v3'),
|
||||||
|
replicateExpectingSuccess(ClientA, Server),
|
||||||
|
updateSourceInstanceNameTo('v4'),
|
||||||
|
replicateExpectingSuccess(ClientA, Server),
|
||||||
|
|
||||||
|
replicateExpectingSuccess(Server, ClientB),
|
||||||
|
verifySourceWasReplicated(ClientB)
|
||||||
|
], done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('propagates deletes with no false conflicts', function(done) {
|
||||||
|
async.series([
|
||||||
|
deleteSourceInstance(),
|
||||||
|
replicateExpectingSuccess(ClientA, Server),
|
||||||
|
replicateExpectingSuccess(Server, ClientB),
|
||||||
|
verifySourceWasReplicated(ClientB)
|
||||||
|
], done);
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('bidirectional sync', function() {
|
||||||
|
beforeEach(function finishInitialSync(next) {
|
||||||
|
// The fixture setup creates a new model instance and replicates
|
||||||
|
// it from ClientA to Server. Since we are performing bidirectional
|
||||||
|
// synchronization in this suite, we must complete the first sync,
|
||||||
|
// otherwise some of the tests may fail.
|
||||||
|
replicateExpectingSuccess(Server, ClientA)(next);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('propagates CREATE', function(done) {
|
||||||
|
async.series([
|
||||||
|
sync(ClientA, Server),
|
||||||
|
sync(ClientB, Server)
|
||||||
|
], done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('propagates CREATE+UPDATE', function(done) {
|
||||||
|
async.series([
|
||||||
|
// NOTE: ClientB has not fetched the new model instance yet
|
||||||
|
updateSourceInstanceNameTo('v2'),
|
||||||
|
sync(ClientA, Server),
|
||||||
|
|
||||||
|
// ClientB fetches the created & updated instance from the server
|
||||||
|
sync(ClientB, Server),
|
||||||
|
], done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('propagates DELETE', function(done) {
|
||||||
|
async.series([
|
||||||
|
// NOTE: ClientB has not fetched the new model instance yet
|
||||||
|
updateSourceInstanceNameTo('v2'),
|
||||||
|
sync(ClientA, Server),
|
||||||
|
|
||||||
|
// ClientB fetches the created & updated instance from the server
|
||||||
|
sync(ClientB, Server),
|
||||||
|
], done);
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
it('does not report false conflicts', function(done) {
|
||||||
|
async.series([
|
||||||
|
// client A makes some work
|
||||||
|
updateSourceInstanceNameTo('v2'),
|
||||||
|
sync(ClientA, Server),
|
||||||
|
|
||||||
|
// ClientB fetches the change from the server
|
||||||
|
sync(ClientB, Server),
|
||||||
|
verifySourceWasReplicated(ClientB),
|
||||||
|
|
||||||
|
// client B makes some work
|
||||||
|
updateClientB('v5'),
|
||||||
|
sync(Server, ClientB),
|
||||||
|
updateClientB('v6'),
|
||||||
|
sync(ClientB, Server),
|
||||||
|
|
||||||
|
// client A fetches the changes
|
||||||
|
sync(ClientA, Server)
|
||||||
|
], done);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
function updateClientB(name) {
|
||||||
|
return function updateInstanceB(next) {
|
||||||
|
ClientB.findById(sourceInstanceId, function(err, instance) {
|
||||||
|
if (err) return next(err);
|
||||||
|
instance.name = name;
|
||||||
|
instance.save(next);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function sync(client, server) {
|
||||||
|
return function syncBothWays(next) {
|
||||||
|
async.series([
|
||||||
|
replicateExpectingSuccess(server, client),
|
||||||
|
replicateExpectingSuccess(client, server)
|
||||||
|
], next);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
function updateSourceInstanceNameTo(value) {
|
||||||
|
return function updateInstance(next) {
|
||||||
|
sourceInstance.name = value;
|
||||||
|
sourceInstance.save(next);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function deleteSourceInstance(value) {
|
||||||
|
return function deleteInstance(next) {
|
||||||
|
sourceInstance.remove(function(err) {
|
||||||
|
sourceInstance = null;
|
||||||
|
next(err);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function verifySourceWasReplicated(target) {
|
||||||
|
if (!target) target = TargetModel;
|
||||||
|
return function verify(next) {
|
||||||
|
target.findById(sourceInstanceId, function(err, targetInstance) {
|
||||||
|
if (err) return next(err);
|
||||||
|
expect(targetInstance && targetInstance.toObject())
|
||||||
|
.to.eql(sourceInstance && sourceInstance.toObject());
|
||||||
|
next();
|
||||||
|
});
|
||||||
|
};
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
var _since = {};
|
||||||
|
function replicateExpectingSuccess(source, target, since) {
|
||||||
|
if (!source) source = SourceModel;
|
||||||
|
if (!target) target = TargetModel;
|
||||||
|
|
||||||
|
return function replicate(next) {
|
||||||
|
var sinceIx = source.modelName + ':to:' + target.modelName;
|
||||||
|
if (since === undefined) {
|
||||||
|
since = useSinceFilter ?
|
||||||
|
_since[sinceIx] || -1 :
|
||||||
|
-1;
|
||||||
|
}
|
||||||
|
|
||||||
|
debug('replicateExpectingSuccess from %s to %s since %j',
|
||||||
|
source.modelName, target.modelName, since);
|
||||||
|
|
||||||
|
source.replicate(since, target, function(err, conflicts, cps) {
|
||||||
|
if (err) return next(err);
|
||||||
|
if (conflicts.length) {
|
||||||
|
return next(new Error('Unexpected conflicts\n' +
|
||||||
|
conflicts.map(JSON.stringify).join('\n')));
|
||||||
|
}
|
||||||
|
_since[sinceIx] = cps;
|
||||||
|
next();
|
||||||
|
});
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
function spyAndStoreSinceArg(Model, methodName, store) {
|
function spyAndStoreSinceArg(Model, methodName, store) {
|
||||||
var orig = Model[methodName];
|
var orig = Model[methodName];
|
||||||
Model[methodName] = function(since) {
|
Model[methodName] = function(since) {
|
||||||
|
|
Loading…
Reference in New Issue