From c2e1b1264432fadec43112123dbc367c501f8fb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Thu, 5 Mar 2015 14:29:30 +0100 Subject: [PATCH] Add more integration tests for replication Add tests covering typical replication scenarios that happen in the setup where multiple clients are synchronizing changes against a single server (database). --- test/replication.test.js | 193 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 182 insertions(+), 11 deletions(-) diff --git a/test/replication.test.js b/test/replication.test.js index eac7879f..2487a99f 100644 --- a/test/replication.test.js +++ b/test/replication.test.js @@ -9,10 +9,12 @@ var debug = require('debug')('test'); describe('Replication / Change APIs', function() { var dataSource, SourceModel, TargetModel; + var useSinceFilter; var tid = 0; // per-test unique id used e.g. to build unique model names beforeEach(function() { tid++; + useSinceFilter = false; var test = this; dataSource = this.dataSource = loopback.createDataSource({ connector: loopback.Memory @@ -675,26 +677,44 @@ describe('Replication / Change APIs', function() { }); describe('complex setup', function() { - var sourceInstance; + var sourceInstance, sourceInstanceId, AnotherModel; beforeEach(function createReplicatedInstance(done) { async.series([ function createInstance(next) { SourceModel.create({ id: 'test-instance' }, function(err, result) { sourceInstance = result; + sourceInstanceId = result.id; next(err); }); }, replicateExpectingSuccess(), - verifyModelsAreEqual() + verifySourceWasReplicated() ], done); }); + beforeEach(function setupThirdModel() { + AnotherModel = this.AnotherModel = PersistedModel.extend( + 'AnotherModel-' + tid, + { id: { id: true, type: String, defaultFn: 'guid' } }, + { trackChanges: true }); + + // NOTE(bajtos) At the moment, all models share the same Checkpoint + // model. This causes the in-process replication to work differently + // than client-server replication. + // As a workaround, we manually setup unique Checkpoint for AnotherModel. + var AnotherChange = AnotherModel.Change; + AnotherChange.Checkpoint = loopback.Checkpoint.extend('AnotherCheckpoint'); + AnotherChange.Checkpoint.attachTo(dataSource); + + AnotherModel.attachTo(dataSource); + }); + it('correctly replicates without checkpoint filter', function(done) { async.series([ updateSourceInstanceNameTo('updated'), replicateExpectingSuccess(), - verifyModelsAreEqual(), + verifySourceWasReplicated(), function deleteInstance(next) { sourceInstance.remove(next); @@ -713,15 +733,145 @@ describe('Replication / Change APIs', function() { it('replicates multiple updates within the same CP', function(done) { async.series([ replicateExpectingSuccess(), - verifyModelsAreEqual(), + verifySourceWasReplicated(), updateSourceInstanceNameTo('updated'), updateSourceInstanceNameTo('again'), replicateExpectingSuccess(), - verifyModelsAreEqual() + verifySourceWasReplicated() ], done); }); + describe('clientA-server-clientB', function() { + var ClientA, Server, ClientB; + + beforeEach(function() { + ClientA = SourceModel; + Server = TargetModel; + ClientB = AnotherModel; + + // NOTE(bajtos) The tests should ideally pass without the since + // filter too. Unfortunately that's not possible with the current + // implementation that remembers only the last two changes made. + useSinceFilter = true; + }); + + it('replicates new models', function(done) { + async.series([ + // Note that ClientA->Server was already replicated during setup + replicateExpectingSuccess(Server, ClientB), + verifySourceWasReplicated(ClientB) + ], done); + }); + + it('propagates updates with no false conflicts', function(done) { + async.series([ + updateSourceInstanceNameTo('v2'), + replicateExpectingSuccess(ClientA, Server), + + replicateExpectingSuccess(Server, ClientB), + + updateSourceInstanceNameTo('v3'), + replicateExpectingSuccess(ClientA, Server), + updateSourceInstanceNameTo('v4'), + replicateExpectingSuccess(ClientA, Server), + + replicateExpectingSuccess(Server, ClientB), + verifySourceWasReplicated(ClientB) + ], done); + }); + + it('propagates deletes with no false conflicts', function(done) { + async.series([ + deleteSourceInstance(), + replicateExpectingSuccess(ClientA, Server), + replicateExpectingSuccess(Server, ClientB), + verifySourceWasReplicated(ClientB) + ], done); + }); + + describe('bidirectional sync', function() { + beforeEach(function finishInitialSync(next) { + // The fixture setup creates a new model instance and replicates + // it from ClientA to Server. Since we are performing bidirectional + // synchronization in this suite, we must complete the first sync, + // otherwise some of the tests may fail. + replicateExpectingSuccess(Server, ClientA)(next); + }); + + it('propagates CREATE', function(done) { + async.series([ + sync(ClientA, Server), + sync(ClientB, Server) + ], done); + }); + + it('propagates CREATE+UPDATE', function(done) { + async.series([ + // NOTE: ClientB has not fetched the new model instance yet + updateSourceInstanceNameTo('v2'), + sync(ClientA, Server), + + // ClientB fetches the created & updated instance from the server + sync(ClientB, Server), + ], done); + }); + + it('propagates DELETE', function(done) { + async.series([ + // NOTE: ClientB has not fetched the new model instance yet + updateSourceInstanceNameTo('v2'), + sync(ClientA, Server), + + // ClientB fetches the created & updated instance from the server + sync(ClientB, Server), + ], done); + + }); + + it('does not report false conflicts', function(done) { + async.series([ + // client A makes some work + updateSourceInstanceNameTo('v2'), + sync(ClientA, Server), + + // ClientB fetches the change from the server + sync(ClientB, Server), + verifySourceWasReplicated(ClientB), + + // client B makes some work + updateClientB('v5'), + sync(Server, ClientB), + updateClientB('v6'), + sync(ClientB, Server), + + // client A fetches the changes + sync(ClientA, Server) + ], done); + }); + }); + + function updateClientB(name) { + return function updateInstanceB(next) { + ClientB.findById(sourceInstanceId, function(err, instance) { + if (err) return next(err); + instance.name = name; + instance.save(next); + }); + }; + } + + function sync(client, server) { + return function syncBothWays(next) { + async.series([ + replicateExpectingSuccess(server, client), + replicateExpectingSuccess(client, server) + ], next); + }; + } + + }); + function updateSourceInstanceNameTo(value) { return function updateInstance(next) { sourceInstance.name = value; @@ -729,30 +879,51 @@ describe('Replication / Change APIs', function() { }; } - function verifyModelsAreEqual() { + function deleteSourceInstance(value) { + return function deleteInstance(next) { + sourceInstance.remove(function(err) { + sourceInstance = null; + next(err); + }); + }; + } + + function verifySourceWasReplicated(target) { + if (!target) target = TargetModel; return function verify(next) { - TargetModel.findById(sourceInstance.id, function(err, target) { + target.findById(sourceInstanceId, function(err, targetInstance) { if (err) return next(err); - expect(target && target.toObject()).to.eql(sourceInstance.toObject()); + expect(targetInstance && targetInstance.toObject()) + .to.eql(sourceInstance && sourceInstance.toObject()); next(); }); }; } }); + var _since = {}; function replicateExpectingSuccess(source, target, since) { if (!source) source = SourceModel; if (!target) target = TargetModel; - if (!since) since = -1; + return function replicate(next) { - debug('replicateExpectingSuccess from %s to %s since %s', + var sinceIx = source.modelName + ':to:' + target.modelName; + if (since === undefined) { + since = useSinceFilter ? + _since[sinceIx] || -1 : + -1; + } + + debug('replicateExpectingSuccess from %s to %s since %j', source.modelName, target.modelName, since); - source.replicate(since, target, function(err, conflicts) { + + source.replicate(since, target, function(err, conflicts, cps) { if (err) return next(err); if (conflicts.length) { return next(new Error('Unexpected conflicts\n' + conflicts.map(JSON.stringify).join('\n'))); } + _since[sinceIx] = cps; next(); }); };