Add more integration tests for replication
Add tests covering typical replication scenarios that happen in the setup where multiple clients are synchronizing changes against a single server (database).
This commit is contained in:
parent
76d9244448
commit
c2e1b12644
|
@ -9,10 +9,12 @@ var debug = require('debug')('test');
|
||||||
|
|
||||||
describe('Replication / Change APIs', function() {
|
describe('Replication / Change APIs', function() {
|
||||||
var dataSource, SourceModel, TargetModel;
|
var dataSource, SourceModel, TargetModel;
|
||||||
|
var useSinceFilter;
|
||||||
var tid = 0; // per-test unique id used e.g. to build unique model names
|
var tid = 0; // per-test unique id used e.g. to build unique model names
|
||||||
|
|
||||||
beforeEach(function() {
|
beforeEach(function() {
|
||||||
tid++;
|
tid++;
|
||||||
|
useSinceFilter = false;
|
||||||
var test = this;
|
var test = this;
|
||||||
dataSource = this.dataSource = loopback.createDataSource({
|
dataSource = this.dataSource = loopback.createDataSource({
|
||||||
connector: loopback.Memory
|
connector: loopback.Memory
|
||||||
|
@ -675,26 +677,44 @@ describe('Replication / Change APIs', function() {
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('complex setup', function() {
|
describe('complex setup', function() {
|
||||||
var sourceInstance;
|
var sourceInstance, sourceInstanceId, AnotherModel;
|
||||||
|
|
||||||
beforeEach(function createReplicatedInstance(done) {
|
beforeEach(function createReplicatedInstance(done) {
|
||||||
async.series([
|
async.series([
|
||||||
function createInstance(next) {
|
function createInstance(next) {
|
||||||
SourceModel.create({ id: 'test-instance' }, function(err, result) {
|
SourceModel.create({ id: 'test-instance' }, function(err, result) {
|
||||||
sourceInstance = result;
|
sourceInstance = result;
|
||||||
|
sourceInstanceId = result.id;
|
||||||
next(err);
|
next(err);
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
replicateExpectingSuccess(),
|
replicateExpectingSuccess(),
|
||||||
verifyModelsAreEqual()
|
verifySourceWasReplicated()
|
||||||
], done);
|
], done);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
beforeEach(function setupThirdModel() {
|
||||||
|
AnotherModel = this.AnotherModel = PersistedModel.extend(
|
||||||
|
'AnotherModel-' + tid,
|
||||||
|
{ id: { id: true, type: String, defaultFn: 'guid' } },
|
||||||
|
{ trackChanges: true });
|
||||||
|
|
||||||
|
// NOTE(bajtos) At the moment, all models share the same Checkpoint
|
||||||
|
// model. This causes the in-process replication to work differently
|
||||||
|
// than client-server replication.
|
||||||
|
// As a workaround, we manually setup unique Checkpoint for AnotherModel.
|
||||||
|
var AnotherChange = AnotherModel.Change;
|
||||||
|
AnotherChange.Checkpoint = loopback.Checkpoint.extend('AnotherCheckpoint');
|
||||||
|
AnotherChange.Checkpoint.attachTo(dataSource);
|
||||||
|
|
||||||
|
AnotherModel.attachTo(dataSource);
|
||||||
|
});
|
||||||
|
|
||||||
it('correctly replicates without checkpoint filter', function(done) {
|
it('correctly replicates without checkpoint filter', function(done) {
|
||||||
async.series([
|
async.series([
|
||||||
updateSourceInstanceNameTo('updated'),
|
updateSourceInstanceNameTo('updated'),
|
||||||
replicateExpectingSuccess(),
|
replicateExpectingSuccess(),
|
||||||
verifyModelsAreEqual(),
|
verifySourceWasReplicated(),
|
||||||
|
|
||||||
function deleteInstance(next) {
|
function deleteInstance(next) {
|
||||||
sourceInstance.remove(next);
|
sourceInstance.remove(next);
|
||||||
|
@ -713,15 +733,145 @@ describe('Replication / Change APIs', function() {
|
||||||
it('replicates multiple updates within the same CP', function(done) {
|
it('replicates multiple updates within the same CP', function(done) {
|
||||||
async.series([
|
async.series([
|
||||||
replicateExpectingSuccess(),
|
replicateExpectingSuccess(),
|
||||||
verifyModelsAreEqual(),
|
verifySourceWasReplicated(),
|
||||||
|
|
||||||
updateSourceInstanceNameTo('updated'),
|
updateSourceInstanceNameTo('updated'),
|
||||||
updateSourceInstanceNameTo('again'),
|
updateSourceInstanceNameTo('again'),
|
||||||
replicateExpectingSuccess(),
|
replicateExpectingSuccess(),
|
||||||
verifyModelsAreEqual()
|
verifySourceWasReplicated()
|
||||||
], done);
|
], done);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('clientA-server-clientB', function() {
|
||||||
|
var ClientA, Server, ClientB;
|
||||||
|
|
||||||
|
beforeEach(function() {
|
||||||
|
ClientA = SourceModel;
|
||||||
|
Server = TargetModel;
|
||||||
|
ClientB = AnotherModel;
|
||||||
|
|
||||||
|
// NOTE(bajtos) The tests should ideally pass without the since
|
||||||
|
// filter too. Unfortunately that's not possible with the current
|
||||||
|
// implementation that remembers only the last two changes made.
|
||||||
|
useSinceFilter = true;
|
||||||
|
});
|
||||||
|
|
||||||
|
it('replicates new models', function(done) {
|
||||||
|
async.series([
|
||||||
|
// Note that ClientA->Server was already replicated during setup
|
||||||
|
replicateExpectingSuccess(Server, ClientB),
|
||||||
|
verifySourceWasReplicated(ClientB)
|
||||||
|
], done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('propagates updates with no false conflicts', function(done) {
|
||||||
|
async.series([
|
||||||
|
updateSourceInstanceNameTo('v2'),
|
||||||
|
replicateExpectingSuccess(ClientA, Server),
|
||||||
|
|
||||||
|
replicateExpectingSuccess(Server, ClientB),
|
||||||
|
|
||||||
|
updateSourceInstanceNameTo('v3'),
|
||||||
|
replicateExpectingSuccess(ClientA, Server),
|
||||||
|
updateSourceInstanceNameTo('v4'),
|
||||||
|
replicateExpectingSuccess(ClientA, Server),
|
||||||
|
|
||||||
|
replicateExpectingSuccess(Server, ClientB),
|
||||||
|
verifySourceWasReplicated(ClientB)
|
||||||
|
], done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('propagates deletes with no false conflicts', function(done) {
|
||||||
|
async.series([
|
||||||
|
deleteSourceInstance(),
|
||||||
|
replicateExpectingSuccess(ClientA, Server),
|
||||||
|
replicateExpectingSuccess(Server, ClientB),
|
||||||
|
verifySourceWasReplicated(ClientB)
|
||||||
|
], done);
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('bidirectional sync', function() {
|
||||||
|
beforeEach(function finishInitialSync(next) {
|
||||||
|
// The fixture setup creates a new model instance and replicates
|
||||||
|
// it from ClientA to Server. Since we are performing bidirectional
|
||||||
|
// synchronization in this suite, we must complete the first sync,
|
||||||
|
// otherwise some of the tests may fail.
|
||||||
|
replicateExpectingSuccess(Server, ClientA)(next);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('propagates CREATE', function(done) {
|
||||||
|
async.series([
|
||||||
|
sync(ClientA, Server),
|
||||||
|
sync(ClientB, Server)
|
||||||
|
], done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('propagates CREATE+UPDATE', function(done) {
|
||||||
|
async.series([
|
||||||
|
// NOTE: ClientB has not fetched the new model instance yet
|
||||||
|
updateSourceInstanceNameTo('v2'),
|
||||||
|
sync(ClientA, Server),
|
||||||
|
|
||||||
|
// ClientB fetches the created & updated instance from the server
|
||||||
|
sync(ClientB, Server),
|
||||||
|
], done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('propagates DELETE', function(done) {
|
||||||
|
async.series([
|
||||||
|
// NOTE: ClientB has not fetched the new model instance yet
|
||||||
|
updateSourceInstanceNameTo('v2'),
|
||||||
|
sync(ClientA, Server),
|
||||||
|
|
||||||
|
// ClientB fetches the created & updated instance from the server
|
||||||
|
sync(ClientB, Server),
|
||||||
|
], done);
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
it('does not report false conflicts', function(done) {
|
||||||
|
async.series([
|
||||||
|
// client A makes some work
|
||||||
|
updateSourceInstanceNameTo('v2'),
|
||||||
|
sync(ClientA, Server),
|
||||||
|
|
||||||
|
// ClientB fetches the change from the server
|
||||||
|
sync(ClientB, Server),
|
||||||
|
verifySourceWasReplicated(ClientB),
|
||||||
|
|
||||||
|
// client B makes some work
|
||||||
|
updateClientB('v5'),
|
||||||
|
sync(Server, ClientB),
|
||||||
|
updateClientB('v6'),
|
||||||
|
sync(ClientB, Server),
|
||||||
|
|
||||||
|
// client A fetches the changes
|
||||||
|
sync(ClientA, Server)
|
||||||
|
], done);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
function updateClientB(name) {
|
||||||
|
return function updateInstanceB(next) {
|
||||||
|
ClientB.findById(sourceInstanceId, function(err, instance) {
|
||||||
|
if (err) return next(err);
|
||||||
|
instance.name = name;
|
||||||
|
instance.save(next);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function sync(client, server) {
|
||||||
|
return function syncBothWays(next) {
|
||||||
|
async.series([
|
||||||
|
replicateExpectingSuccess(server, client),
|
||||||
|
replicateExpectingSuccess(client, server)
|
||||||
|
], next);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
function updateSourceInstanceNameTo(value) {
|
function updateSourceInstanceNameTo(value) {
|
||||||
return function updateInstance(next) {
|
return function updateInstance(next) {
|
||||||
sourceInstance.name = value;
|
sourceInstance.name = value;
|
||||||
|
@ -729,30 +879,51 @@ describe('Replication / Change APIs', function() {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
function verifyModelsAreEqual() {
|
function deleteSourceInstance(value) {
|
||||||
|
return function deleteInstance(next) {
|
||||||
|
sourceInstance.remove(function(err) {
|
||||||
|
sourceInstance = null;
|
||||||
|
next(err);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function verifySourceWasReplicated(target) {
|
||||||
|
if (!target) target = TargetModel;
|
||||||
return function verify(next) {
|
return function verify(next) {
|
||||||
TargetModel.findById(sourceInstance.id, function(err, target) {
|
target.findById(sourceInstanceId, function(err, targetInstance) {
|
||||||
if (err) return next(err);
|
if (err) return next(err);
|
||||||
expect(target && target.toObject()).to.eql(sourceInstance.toObject());
|
expect(targetInstance && targetInstance.toObject())
|
||||||
|
.to.eql(sourceInstance && sourceInstance.toObject());
|
||||||
next();
|
next();
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
var _since = {};
|
||||||
function replicateExpectingSuccess(source, target, since) {
|
function replicateExpectingSuccess(source, target, since) {
|
||||||
if (!source) source = SourceModel;
|
if (!source) source = SourceModel;
|
||||||
if (!target) target = TargetModel;
|
if (!target) target = TargetModel;
|
||||||
if (!since) since = -1;
|
|
||||||
return function replicate(next) {
|
return function replicate(next) {
|
||||||
debug('replicateExpectingSuccess from %s to %s since %s',
|
var sinceIx = source.modelName + ':to:' + target.modelName;
|
||||||
|
if (since === undefined) {
|
||||||
|
since = useSinceFilter ?
|
||||||
|
_since[sinceIx] || -1 :
|
||||||
|
-1;
|
||||||
|
}
|
||||||
|
|
||||||
|
debug('replicateExpectingSuccess from %s to %s since %j',
|
||||||
source.modelName, target.modelName, since);
|
source.modelName, target.modelName, since);
|
||||||
source.replicate(since, target, function(err, conflicts) {
|
|
||||||
|
source.replicate(since, target, function(err, conflicts, cps) {
|
||||||
if (err) return next(err);
|
if (err) return next(err);
|
||||||
if (conflicts.length) {
|
if (conflicts.length) {
|
||||||
return next(new Error('Unexpected conflicts\n' +
|
return next(new Error('Unexpected conflicts\n' +
|
||||||
conflicts.map(JSON.stringify).join('\n')));
|
conflicts.map(JSON.stringify).join('\n')));
|
||||||
}
|
}
|
||||||
|
_since[sinceIx] = cps;
|
||||||
next();
|
next();
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in New Issue