// Copyright IBM Corp. 2014,2018. All Rights Reserved. // Node module: loopback // This file is licensed under the MIT License. // License text available at https://opensource.org/licenses/MIT 'use strict'; var assert = require('assert'); var async = require('async'); var loopback = require('../'); var Change = loopback.Change; var defineModelTestsWithDataSource = require('./util/model-tests'); var PersistedModel = loopback.PersistedModel; var expect = require('./helpers/expect'); var debug = require('debug')('test'); var runtime = require('./../lib/runtime'); describe('Replication / Change APIs', function() { this.timeout(10000); var dataSource, SourceModel, TargetModel, useSinceFilter; var tid = 0; // per-test unique id used e.g. to build unique model names beforeEach(function() { tid++; useSinceFilter = false; var test = this; dataSource = this.dataSource = loopback.createDataSource({ connector: loopback.Memory, }); SourceModel = this.SourceModel = PersistedModel.extend( 'SourceModel-' + tid, {id: {id: true, type: String, defaultFn: 'guid'}}, {trackChanges: true}); SourceModel.attachTo(dataSource); TargetModel = this.TargetModel = PersistedModel.extend( 'TargetModel-' + tid, {id: {id: true, type: String, defaultFn: 'guid'}}, {trackChanges: true}); // NOTE(bajtos) At the moment, all models share the same Checkpoint // model. This causes the in-process replication to work differently // than client-server replication. // As a workaround, we manually setup unique Checkpoint for TargetModel. var TargetChange = TargetModel.Change; TargetChange.Checkpoint = loopback.Checkpoint.extend('TargetCheckpoint'); TargetChange.Checkpoint.attachTo(dataSource); TargetModel.attachTo(dataSource); test.startingCheckpoint = -1; this.createInitalData = function(cb) { SourceModel.create({name: 'foo'}, function(err, inst) { if (err) return cb(err); test.model = inst; SourceModel.replicate(TargetModel, cb); }); }; }); describe('cleanup check for enableChangeTracking', function() { describe('when no changeCleanupInterval set', function() { it('should call rectifyAllChanges if running on server', function(done) { var calls = mockRectifyAllChanges(SourceModel); SourceModel.enableChangeTracking(); if (runtime.isServer) { expect(calls).to.eql(['rectifyAllChanges']); } else { expect(calls).to.eql([]); } done(); }); }); describe('when changeCleanupInterval set to -1', function() { var Model; beforeEach(function() { Model = this.Model = PersistedModel.extend( 'Model-' + tid, {id: {id: true, type: String, defaultFn: 'guid'}}, {trackChanges: true, changeCleanupInterval: -1}); Model.attachTo(dataSource); }); it('should not call rectifyAllChanges', function(done) { var calls = mockRectifyAllChanges(Model); Model.enableChangeTracking(); expect(calls).to.eql([]); done(); }); }); describe('when changeCleanupInterval set to 10000', function() { var Model; beforeEach(function() { Model = this.Model = PersistedModel.extend( 'Model-' + tid, {id: {id: true, type: String, defaultFn: 'guid'}}, {trackChanges: true, changeCleanupInterval: 10000}); Model.attachTo(dataSource); }); it('should call rectifyAllChanges if running on server', function(done) { var calls = mockRectifyAllChanges(Model); Model.enableChangeTracking(); if (runtime.isServer) { expect(calls).to.eql(['rectifyAllChanges']); } else { expect(calls).to.eql([]); } done(); }); }); function mockRectifyAllChanges(Model) { var calls = []; Model.rectifyAllChanges = function(cb) { calls.push('rectifyAllChanges'); process.nextTick(cb); }; return calls; } }); describe('optimization check rectifyChange Vs rectifyAllChanges', function() { beforeEach(function initialData(done) { var data = [{name: 'John', surname: 'Doe'}, {name: 'Jane', surname: 'Roe'}]; async.waterfall([ function(callback) { SourceModel.create(data, callback); }, function(data, callback) { SourceModel.replicate(TargetModel, callback); }], function(err, result) { done(err); }); }); it('should call rectifyAllChanges if no id is passed for rectifyOnDelete', function(done) { var calls = mockSourceModelRectify(); SourceModel.destroyAll({name: 'John'}, function(err, data) { if (err) return done(err); expect(calls).to.eql(['rectifyAllChanges']); done(); }); }); it('should call rectifyAllChanges if no id is passed for rectifyOnSave', function(done) { var calls = mockSourceModelRectify(); var newData = {'name': 'Janie'}; SourceModel.update({name: 'Jane'}, newData, function(err, data) { if (err) return done(err); expect(calls).to.eql(['rectifyAllChanges']); done(); }); }); it('rectifyOnDelete for Delete should call rectifyChange instead of rectifyAllChanges', function(done) { var calls = mockTargetModelRectify(); async.waterfall([ function(callback) { SourceModel.destroyAll({name: 'John'}, callback); }, function(data, callback) { SourceModel.replicate(TargetModel, callback); // replicate should call `rectifyOnSave` and then `rectifyChange` not `rectifyAllChanges` through `after save` operation }, ], function(err, results) { if (err) return done(err); expect(calls).to.eql(['rectifyChange']); done(); }); }); it('rectifyOnSave for Update should call rectifyChange instead of rectifyAllChanges', function(done) { var calls = mockTargetModelRectify(); var newData = {'name': 'Janie'}; async.waterfall([ function(callback) { SourceModel.update({name: 'Jane'}, newData, callback); }, function(data, callback) { SourceModel.replicate(TargetModel, callback); // replicate should call `rectifyOnSave` and then `rectifyChange` not `rectifyAllChanges` through `after save` operation }, ], function(err, result) { if (err) return done(err); expect(calls).to.eql(['rectifyChange']); done(); }); }); it('rectifyOnSave for Create should call rectifyChange instead of rectifyAllChanges', function(done) { var calls = mockTargetModelRectify(); var newData = [{name: 'Janie', surname: 'Doe'}]; async.waterfall([ function(callback) { SourceModel.create(newData, callback); }, function(data, callback) { SourceModel.replicate(TargetModel, callback); // replicate should call `rectifyOnSave` and then `rectifyChange` not `rectifyAllChanges` through `after save` operation }, ], function(err, result) { if (err) return done(err); expect(calls).to.eql(['rectifyChange']); done(); }); }); function mockSourceModelRectify() { var calls = []; SourceModel.rectifyChange = function(id, cb) { calls.push('rectifyChange'); process.nextTick(cb); }; SourceModel.rectifyAllChanges = function(cb) { calls.push('rectifyAllChanges'); process.nextTick(cb); }; return calls; } function mockTargetModelRectify() { var calls = []; TargetModel.rectifyChange = function(id, cb) { calls.push('rectifyChange'); process.nextTick(cb); }; TargetModel.rectifyAllChanges = function(cb) { calls.push('rectifyAllChanges'); process.nextTick(cb); }; return calls; } }); describe('Model.changes(since, filter, callback)', function() { it('Get changes since the given checkpoint', function(done) { var test = this; this.SourceModel.create({name: 'foo'}, function(err) { if (err) return done(err); setTimeout(function() { test.SourceModel.changes(test.startingCheckpoint, {}, function(err, changes) { assert.equal(changes.length, 1); done(); }); }, 1); }); }); it('excludes changes from older checkpoints', function(done) { var FUTURE_CHECKPOINT = 999; SourceModel.create({name: 'foo'}, function(err) { if (err) return done(err); SourceModel.changes(FUTURE_CHECKPOINT, {}, function(err, changes) { if (err) return done(err); expect(changes).to.be.empty(); done(); }); }); }); }); describe('Model.replicate(since, targetModel, options, callback)', function() { it('Replicate data using the target model', function(done) { var test = this; var options = {}; this.SourceModel.create({name: 'foo'}, function(err) { if (err) return done(err); test.SourceModel.replicate(test.startingCheckpoint, test.TargetModel, options, function(err, conflicts) { if (err) return done(err); assertTargetModelEqualsSourceModel(conflicts, test.SourceModel, test.TargetModel, done); }); }); }); it('Replicate data using the target model - promise variant', function(done) { var test = this; var options = {}; this.SourceModel.create({name: 'foo'}, function(err) { if (err) return done(err); test.SourceModel.replicate(test.startingCheckpoint, test.TargetModel, options) .then(function(conflicts) { assertTargetModelEqualsSourceModel(conflicts, test.SourceModel, test.TargetModel, done); }) .catch(function(err) { done(err); }); }); }); it('applies "since" filter on source changes', function(done) { async.series([ function createModelInSourceCp1(next) { SourceModel.create({id: '1'}, next); }, function checkpoint(next) { SourceModel.checkpoint(next); }, function createModelInSourceCp2(next) { SourceModel.create({id: '2'}, next); }, function replicateLastChangeOnly(next) { SourceModel.currentCheckpoint(function(err, cp) { if (err) return done(err); SourceModel.replicate(cp, TargetModel, next); }); }, function verify(next) { TargetModel.find(function(err, list) { if (err) return done(err); // '1' should be skipped by replication expect(getIds(list)).to.eql(['2']); next(); }); }, ], done); }); it('applies "since" filter on source changes - promise variant', function(done) { async.series([ function createModelInSourceCp1(next) { SourceModel.create({id: '1'}, next); }, function checkpoint(next) { SourceModel.checkpoint(next); }, function createModelInSourceCp2(next) { SourceModel.create({id: '2'}, next); }, function replicateLastChangeOnly(next) { SourceModel.currentCheckpoint(function(err, cp) { if (err) return done(err); SourceModel.replicate(cp, TargetModel, {}) .then(function(next) { done(); }) .catch(err); }); }, function verify(next) { TargetModel.find(function(err, list) { if (err) return done(err); // '1' should be skipped by replication expect(getIds(list)).to.eql(['2']); next(); }); }, ], done); }); it('applies "since" filter on target changes', function(done) { // Because the "since" filter is just an optimization, // there isn't really any observable behaviour we could // check to assert correct implementation. var diffSince = []; spyAndStoreSinceArg(TargetModel, 'diff', diffSince); SourceModel.replicate(10, TargetModel, function(err) { if (err) return done(err); expect(diffSince).to.eql([10]); done(); }); }); it('applies "since" filter on target changes - promise variant', function(done) { // Because the "since" filter is just an optimization, // there isn't really any observable behaviour we could // check to assert correct implementation. var diffSince = []; spyAndStoreSinceArg(TargetModel, 'diff', diffSince); SourceModel.replicate(10, TargetModel, {}) .then(function() { expect(diffSince).to.eql([10]); done(); }) .catch(function(err) { done(err); }); }); it('uses different "since" value for source and target', function(done) { var sourceSince = []; var targetSince = []; spyAndStoreSinceArg(SourceModel, 'changes', sourceSince); spyAndStoreSinceArg(TargetModel, 'diff', targetSince); var since = {source: 1, target: 2}; SourceModel.replicate(since, TargetModel, function(err) { if (err) return done(err); expect(sourceSince).to.eql([1]); expect(targetSince).to.eql([2]); done(); }); }); it('uses different "since" value for source and target - promise variant', function(done) { var sourceSince = []; var targetSince = []; spyAndStoreSinceArg(SourceModel, 'changes', sourceSince); spyAndStoreSinceArg(TargetModel, 'diff', targetSince); var since = {source: 1, target: 2}; SourceModel.replicate(since, TargetModel, {}) .then(function() { expect(sourceSince).to.eql([1]); expect(targetSince).to.eql([2]); done(); }) .catch(function(err) { done(err); }); }); it('picks up changes made during replication', function(done) { setupRaceConditionInReplication(function(cb) { // simulate the situation when another model is created // while a replication run is in progress SourceModel.create({id: 'racer'}, cb); }); var lastCp; async.series([ function buildSomeDataToReplicate(next) { SourceModel.create({id: 'init'}, next); }, function getLastCp(next) { SourceModel.currentCheckpoint(function(err, cp) { if (err) return done(err); lastCp = cp; next(); }); }, function replicate(next) { SourceModel.replicate(TargetModel, next); }, function verifyAssumptions(next) { SourceModel.find(function(err, list) { expect(getIds(list), 'source ids') .to.eql(['init', 'racer']); TargetModel.find(function(err, list) { expect(getIds(list), 'target ids after first sync') .to.include.members(['init']); next(); }); }); }, function replicateAgain(next) { SourceModel.replicate(lastCp + 1, TargetModel, next); }, function verify(next) { TargetModel.find(function(err, list) { expect(getIds(list), 'target ids').to.eql(['init', 'racer']); next(); }); }, ], done); }); it('returns new current checkpoints to callback', function(done) { var sourceCp, targetCp; async.series([ bumpSourceCheckpoint, bumpTargetCheckpoint, bumpTargetCheckpoint, function replicate(cb) { expect(sourceCp).to.not.equal(targetCp); SourceModel.replicate( TargetModel, function(err, conflicts, newCheckpoints) { if (err) return cb(err); expect(conflicts, 'conflicts').to.eql([]); expect(newCheckpoints, 'currentCheckpoints').to.eql({ source: sourceCp + 1, target: targetCp + 1, }); cb(); }); }, ], done); function bumpSourceCheckpoint(cb) { SourceModel.checkpoint(function(err, inst) { if (err) return cb(err); sourceCp = inst.seq; cb(); }); } function bumpTargetCheckpoint(cb) { TargetModel.checkpoint(function(err, inst) { if (err) return cb(err); targetCp = inst.seq; cb(); }); } }); it('leaves current target checkpoint empty', function(done) { async.series([ function createTestData(next) { SourceModel.create({}, next); }, replicateExpectingSuccess(), function verify(next) { TargetModel.currentCheckpoint(function(err, cp) { if (err) return next(err); TargetModel.getChangeModel().find( {where: {checkpoint: {gte: cp}}}, function(err, changes) { if (err) return done(err); expect(changes).to.have.length(0); done(); }); }); }, ], done); }); describe('with 3rd-party changes', function() { it('detects UPDATE made during UPDATE', function(done) { async.series([ createModel(SourceModel, {id: '1'}), replicateExpectingSuccess(), function updateModel(next) { SourceModel.updateAll({id: '1'}, {name: 'source'}, next); }, function replicateWith3rdPartyModifyingData(next) { setupRaceConditionInReplication(function(cb) { var connector = TargetModel.dataSource.connector; if (connector.updateAttributes.length <= 4) { connector.updateAttributes( TargetModel.modelName, '1', {name: '3rd-party'}, cb); } else { // 2.x connectors require `options` connector.updateAttributes( TargetModel.modelName, '1', {name: '3rd-party'}, {}, // options cb); } }); SourceModel.replicate( TargetModel, function(err, conflicts, cps, updates) { if (err) return next(err); var conflictedIds = getPropValue(conflicts || [], 'modelId'); expect(conflictedIds).to.eql(['1']); // resolve the conflict using ours conflicts[0].resolve(next); }); }, replicateExpectingSuccess(), verifyInstanceWasReplicated(SourceModel, TargetModel, '1'), ], done); }); it('detects CREATE made during CREATE', function(done) { async.series([ // FIXME(bajtos) Remove the 'name' property once the implementation // of UPDATE is fixed to correctly remove properties createModel(SourceModel, {id: '1', name: 'source'}), function replicateWith3rdPartyModifyingData(next) { var connector = TargetModel.dataSource.connector; setupRaceConditionInReplication(function(cb) { if (connector.create.length <= 3) { connector.create( TargetModel.modelName, {id: '1', name: '3rd-party'}, cb); } else { // 2.x connectors require `options` connector.create( TargetModel.modelName, {id: '1', name: '3rd-party'}, {}, // options cb); } }); SourceModel.replicate( TargetModel, function(err, conflicts, cps, updates) { if (err) return next(err); var conflictedIds = getPropValue(conflicts || [], 'modelId'); expect(conflictedIds).to.eql(['1']); // resolve the conflict using ours conflicts[0].resolve(next); }); }, replicateExpectingSuccess(), verifyInstanceWasReplicated(SourceModel, TargetModel, '1'), ], done); }); it('detects UPDATE made during DELETE', function(done) { async.series([ createModel(SourceModel, {id: '1'}), replicateExpectingSuccess(), function deleteModel(next) { SourceModel.deleteById('1', next); }, function replicateWith3rdPartyModifyingData(next) { setupRaceConditionInReplication(function(cb) { var connector = TargetModel.dataSource.connector; if (connector.updateAttributes.length <= 4) { connector.updateAttributes( TargetModel.modelName, '1', {name: '3rd-party'}, cb); } else { // 2.x connectors require `options` connector.updateAttributes( TargetModel.modelName, '1', {name: '3rd-party'}, {}, // options cb); } }); SourceModel.replicate( TargetModel, function(err, conflicts, cps, updates) { if (err) return next(err); var conflictedIds = getPropValue(conflicts || [], 'modelId'); expect(conflictedIds).to.eql(['1']); // resolve the conflict using ours conflicts[0].resolve(next); }); }, replicateExpectingSuccess(), verifyInstanceWasReplicated(SourceModel, TargetModel, '1'), ], done); }); it('handles DELETE made during DELETE', function(done) { async.series([ createModel(SourceModel, {id: '1'}), replicateExpectingSuccess(), function deleteModel(next) { SourceModel.deleteById('1', next); }, function setup3rdPartyModifyingData(next) { var connector = TargetModel.dataSource.connector; setupRaceConditionInReplication(function(cb) { if (connector.destroy.length <= 3) { connector.destroy( TargetModel.modelName, '1', cb); } else { // 2.x connectors require `options` connector.destroy( TargetModel.modelName, '1', {}, // options cb); } }); next(); }, replicateExpectingSuccess(), verifyInstanceWasReplicated(SourceModel, TargetModel, '1'), ], done); }); }); }); describe('conflict detection - both updated', function() { beforeEach(function(done) { var SourceModel = this.SourceModel; var TargetModel = this.TargetModel; var test = this; test.createInitalData(createConflict); function createConflict(err, conflicts) { async.parallel([ function(cb) { SourceModel.findOne(function(err, inst) { if (err) return cb(err); inst.name = 'source update'; inst.save(cb); }); }, function(cb) { TargetModel.findOne(function(err, inst) { if (err) return cb(err); inst.name = 'target update'; inst.save(cb); }); }, ], function(err) { if (err) return done(err); SourceModel.replicate(TargetModel, function(err, conflicts) { if (err) return done(err); test.conflicts = conflicts; test.conflict = conflicts[0]; done(); }); }); } }); it('should detect a single conflict', function() { assert.equal(this.conflicts.length, 1); assert(this.conflict); }); it('type should be UPDATE', function(done) { this.conflict.type(function(err, type) { assert.equal(type, Change.UPDATE); done(); }); }); it('conflict.changes()', function(done) { var test = this; this.conflict.changes(function(err, sourceChange, targetChange) { assert.equal(typeof sourceChange.id, 'string'); assert.equal(typeof targetChange.id, 'string'); assert.equal(test.model.getId(), sourceChange.getModelId()); assert.equal(sourceChange.type(), Change.UPDATE); assert.equal(targetChange.type(), Change.UPDATE); done(); }); }); it('conflict.models()', function(done) { var test = this; this.conflict.models(function(err, source, target) { assert.deepEqual(source.toJSON(), { id: test.model.id, name: 'source update', }); assert.deepEqual(target.toJSON(), { id: test.model.id, name: 'target update', }); done(); }); }); }); describe('conflict detection - source deleted', function() { beforeEach(function(done) { var SourceModel = this.SourceModel; var TargetModel = this.TargetModel; var test = this; test.createInitalData(createConflict); function createConflict() { async.parallel([ function(cb) { SourceModel.findOne(function(err, inst) { if (err) return cb(err); test.model = inst; inst.remove(cb); }); }, function(cb) { TargetModel.findOne(function(err, inst) { if (err) return cb(err); inst.name = 'target update'; inst.save(cb); }); }, ], function(err) { if (err) return done(err); SourceModel.replicate(TargetModel, function(err, conflicts) { if (err) return done(err); test.conflicts = conflicts; test.conflict = conflicts[0]; done(); }); }); } }); it('should detect a single conflict', function() { assert.equal(this.conflicts.length, 1); assert(this.conflict); }); it('type should be DELETE', function(done) { this.conflict.type(function(err, type) { assert.equal(type, Change.DELETE); done(); }); }); it('conflict.changes()', function(done) { var test = this; this.conflict.changes(function(err, sourceChange, targetChange) { assert.equal(typeof sourceChange.id, 'string'); assert.equal(typeof targetChange.id, 'string'); assert.equal(test.model.getId(), sourceChange.getModelId()); assert.equal(sourceChange.type(), Change.DELETE); assert.equal(targetChange.type(), Change.UPDATE); done(); }); }); it('conflict.models()', function(done) { var test = this; this.conflict.models(function(err, source, target) { assert.equal(source, null); assert.deepEqual(target.toJSON(), { id: test.model.id, name: 'target update', }); done(); }); }); }); describe('conflict detection - target deleted', function() { beforeEach(function(done) { var SourceModel = this.SourceModel; var TargetModel = this.TargetModel; var test = this; test.createInitalData(createConflict); function createConflict() { async.parallel([ function(cb) { SourceModel.findOne(function(err, inst) { if (err) return cb(err); test.model = inst; inst.name = 'source update'; inst.save(cb); }); }, function(cb) { TargetModel.findOne(function(err, inst) { if (err) return cb(err); inst.remove(cb); }); }, ], function(err) { if (err) return done(err); SourceModel.replicate(TargetModel, function(err, conflicts) { if (err) return done(err); test.conflicts = conflicts; test.conflict = conflicts[0]; done(); }); }); } }); it('should detect a single conflict', function() { assert.equal(this.conflicts.length, 1); assert(this.conflict); }); it('type should be DELETE', function(done) { this.conflict.type(function(err, type) { assert.equal(type, Change.DELETE); done(); }); }); it('conflict.changes()', function(done) { var test = this; this.conflict.changes(function(err, sourceChange, targetChange) { assert.equal(typeof sourceChange.id, 'string'); assert.equal(typeof targetChange.id, 'string'); assert.equal(test.model.getId(), sourceChange.getModelId()); assert.equal(sourceChange.type(), Change.UPDATE); assert.equal(targetChange.type(), Change.DELETE); done(); }); }); it('conflict.models()', function(done) { var test = this; this.conflict.models(function(err, source, target) { assert.equal(target, null); assert.deepEqual(source.toJSON(), { id: test.model.id, name: 'source update', }); done(); }); }); }); describe('conflict detection - both deleted', function() { beforeEach(function(done) { var SourceModel = this.SourceModel; var TargetModel = this.TargetModel; var test = this; test.createInitalData(createConflict); function createConflict() { async.parallel([ function(cb) { SourceModel.findOne(function(err, inst) { if (err) return cb(err); test.model = inst; inst.remove(cb); }); }, function(cb) { TargetModel.findOne(function(err, inst) { if (err) return cb(err); inst.remove(cb); }); }, ], function(err) { if (err) return done(err); SourceModel.replicate(TargetModel, function(err, conflicts) { if (err) return done(err); test.conflicts = conflicts; test.conflict = conflicts[0]; done(); }); }); } }); it('should not detect a conflict', function() { assert.equal(this.conflicts.length, 0); assert(!this.conflict); }); }); describe('change detection', function() { it('detects "create"', function(done) { SourceModel.create({}, function(err, inst) { if (err) return done(err); assertChangeRecordedForId(inst.id, done); }); }); it('detects "updateOrCreate"', function(done) { givenReplicatedInstance(function(err, created) { if (err) return done(err); var data = created.toObject(); created.name = 'updated'; SourceModel.updateOrCreate(created, function(err, inst) { if (err) return done(err); assertChangeRecordedForId(inst.id, done); }); }); }); it('detects "upsertWithWhere"', function(done) { givenReplicatedInstance(function(err, inst) { if (err) return done(err); SourceModel.upsertWithWhere( {name: inst.name}, {name: 'updated'}, function(err) { if (err) return done(err); assertChangeRecordedForId(inst.id, done); }); }); }); it('detects "findOrCreate"', function(done) { // make sure we bypass find+create and call the connector directly SourceModel.dataSource.connector.findOrCreate = function(model, query, data, callback) { if (this.all.length <= 3) { this.all(model, query, function(err, list) { if (err || (list && list[0])) return callback(err, list && list[0], false); this.create(model, data, function(err) { callback(err, data, true); }); }.bind(this)); } else { // 2.x connectors requires `options` this.all(model, query, {}, function(err, list) { if (err || (list && list[0])) return callback(err, list && list[0], false); this.create(model, data, {}, function(err) { callback(err, data, true); }); }.bind(this)); } }; SourceModel.findOrCreate( {where: {name: 'does-not-exist'}}, {name: 'created'}, function(err, inst) { if (err) return done(err); assertChangeRecordedForId(inst.id, done); }); }); it('detects "deleteById"', function(done) { givenReplicatedInstance(function(err, inst) { if (err) return done(err); SourceModel.deleteById(inst.id, function(err) { assertChangeRecordedForId(inst.id, done); }); }); }); it('detects "deleteAll"', function(done) { givenReplicatedInstance(function(err, inst) { if (err) return done(err); SourceModel.deleteAll({name: inst.name}, function(err) { if (err) return done(err); assertChangeRecordedForId(inst.id, done); }); }); }); it('detects "updateAll"', function(done) { givenReplicatedInstance(function(err, inst) { if (err) return done(err); SourceModel.updateAll( {name: inst.name}, {name: 'updated'}, function(err) { if (err) return done(err); assertChangeRecordedForId(inst.id, done); }); }); }); it('detects "prototype.save"', function(done) { givenReplicatedInstance(function(err, inst) { if (err) return done(err); inst.name = 'updated'; inst.save(function(err) { if (err) return done(err); assertChangeRecordedForId(inst.id, done); }); }); }); it('detects "prototype.updateAttributes"', function(done) { givenReplicatedInstance(function(err, inst) { if (err) return done(err); inst.updateAttributes({name: 'updated'}, function(err) { if (err) return done(err); assertChangeRecordedForId(inst.id, done); }); }); }); it('detects "prototype.delete"', function(done) { givenReplicatedInstance(function(err, inst) { if (err) return done(err); inst.delete(function(err) { assertChangeRecordedForId(inst.id, done); }); }); }); function givenReplicatedInstance(cb) { SourceModel.create({name: 'a-name'}, function(err, inst) { if (err) return cb(err); SourceModel.checkpoint(function(err) { if (err) return cb(err); cb(null, inst); }); }); } function assertChangeRecordedForId(id, cb) { SourceModel.getChangeModel().getCheckpointModel() .current(function(err, cp) { if (err) return cb(err); SourceModel.changes(cp - 1, {}, function(err, pendingChanges) { if (err) return cb(err); expect(pendingChanges, 'list of changes').to.have.length(1); var change = pendingChanges[0].toObject(); expect(change).to.have.property('checkpoint', cp); // sanity check expect(change).to.have.property('modelName', SourceModel.modelName); // NOTE(bajtos) Change.modelId is always String // regardless of the type of the changed model's id property expect(change).to.have.property('modelId', '' + id); cb(); }); }); } }); describe('complex setup', function() { var sourceInstance, sourceInstanceId, AnotherModel; beforeEach(function createReplicatedInstance(done) { async.series([ function createInstance(next) { SourceModel.create({id: 'test-instance'}, function(err, result) { sourceInstance = result; sourceInstanceId = result.id; next(err); }); }, replicateExpectingSuccess(), verifySourceWasReplicated(), ], done); }); beforeEach(function setupThirdModel() { AnotherModel = this.AnotherModel = PersistedModel.extend( 'AnotherModel-' + tid, {id: {id: true, type: String, defaultFn: 'guid'}}, {trackChanges: true}); // NOTE(bajtos) At the moment, all models share the same Checkpoint // model. This causes the in-process replication to work differently // than client-server replication. // As a workaround, we manually setup unique Checkpoint for AnotherModel. var AnotherChange = AnotherModel.Change; AnotherChange.Checkpoint = loopback.Checkpoint.extend('AnotherCheckpoint'); AnotherChange.Checkpoint.attachTo(dataSource); AnotherModel.attachTo(dataSource); }); it('correctly replicates without checkpoint filter', function(done) { async.series([ updateSourceInstanceNameTo('updated'), replicateExpectingSuccess(), verifySourceWasReplicated(), function deleteInstance(next) { sourceInstance.remove(next); }, replicateExpectingSuccess(), function verifyTargetModelWasDeleted(next) { TargetModel.find(function(err, list) { if (err) return next(err); expect(getIds(list)).to.not.contain(sourceInstance.id); next(); }); }, ], done); }); it('replicates multiple updates within the same CP', function(done) { async.series([ replicateExpectingSuccess(), verifySourceWasReplicated(), updateSourceInstanceNameTo('updated'), updateSourceInstanceNameTo('again'), replicateExpectingSuccess(), verifySourceWasReplicated(), ], done); }); describe('clientA-server-clientB', function() { var ClientA, Server, ClientB; beforeEach(function() { ClientA = SourceModel; Server = TargetModel; ClientB = AnotherModel; // NOTE(bajtos) The tests should ideally pass without the since // filter too. Unfortunately that's not possible with the current // implementation that remembers only the last two changes made. useSinceFilter = true; }); it('replicates new models', function(done) { async.series([ // Note that ClientA->Server was already replicated during setup replicateExpectingSuccess(Server, ClientB), verifySourceWasReplicated(ClientB), ], done); }); it('propagates updates with no false conflicts', function(done) { async.series([ updateSourceInstanceNameTo('v2'), replicateExpectingSuccess(ClientA, Server), replicateExpectingSuccess(Server, ClientB), updateSourceInstanceNameTo('v3'), replicateExpectingSuccess(ClientA, Server), updateSourceInstanceNameTo('v4'), replicateExpectingSuccess(ClientA, Server), replicateExpectingSuccess(Server, ClientB), verifySourceWasReplicated(ClientB), ], done); }); it('propagates deletes with no false conflicts', function(done) { async.series([ deleteSourceInstance(), replicateExpectingSuccess(ClientA, Server), replicateExpectingSuccess(Server, ClientB), verifySourceWasReplicated(ClientB), ], done); }); describe('bidirectional sync', function() { beforeEach(function finishInitialSync(next) { // The fixture setup creates a new model instance and replicates // it from ClientA to Server. Since we are performing bidirectional // synchronization in this suite, we must complete the first sync, // otherwise some of the tests may fail. replicateExpectingSuccess(Server, ClientA)(next); }); it('propagates CREATE', function(done) { async.series([ sync(ClientA, Server), sync(ClientB, Server), ], done); }); it('propagates CREATE+UPDATE', function(done) { async.series([ // NOTE: ClientB has not fetched the new model instance yet updateSourceInstanceNameTo('v2'), sync(ClientA, Server), // ClientB fetches the created & updated instance from the server sync(ClientB, Server), ], done); }); it('propagates DELETE', function(done) { async.series([ // NOTE: ClientB has not fetched the new model instance yet updateSourceInstanceNameTo('v2'), sync(ClientA, Server), // ClientB fetches the created & updated instance from the server sync(ClientB, Server), ], done); }); it('does not report false conflicts', function(done) { async.series([ // client A makes some work updateSourceInstanceNameTo('v2'), sync(ClientA, Server), // ClientB fetches the change from the server sync(ClientB, Server), verifySourceWasReplicated(ClientB), // client B makes some work updateClientB('v5'), sync(Server, ClientB), updateClientB('v6'), sync(ClientB, Server), // client A fetches the changes sync(ClientA, Server), ], done); }); it('handles UPDATE conflict resolved using "ours"', function(done) { testUpdateConflictIsResolved( function resolveUsingOurs(conflict, cb) { conflict.resolveUsingSource(cb); }, done); }); it('handles UPDATE conflict resolved using "theirs"', function(done) { testUpdateConflictIsResolved( function resolveUsingTheirs(conflict, cb) { // We sync ClientA->Server first expect(conflict.SourceModel.modelName) .to.equal(ClientB.modelName); conflict.resolveUsingTarget(cb); }, done); }); it('handles UPDATE conflict resolved manually', function(done) { testUpdateConflictIsResolved( function resolveManually(conflict, cb) { conflict.resolveManually({name: 'manual'}, cb); }, done); }); it('handles DELETE conflict resolved using "ours"', function(done) { testDeleteConflictIsResolved( function resolveUsingOurs(conflict, cb) { conflict.resolveUsingSource(cb); }, done); }); it('handles DELETE conflict resolved using "theirs"', function(done) { testDeleteConflictIsResolved( function resolveUsingTheirs(conflict, cb) { // We sync ClientA->Server first expect(conflict.SourceModel.modelName) .to.equal(ClientB.modelName); conflict.resolveUsingTarget(cb); }, done); }); it('handles DELETE conflict resolved as manual delete', function(done) { testDeleteConflictIsResolved( function resolveManually(conflict, cb) { conflict.resolveManually(null, cb); }, done); }); it('handles DELETE conflict resolved manually', function(done) { testDeleteConflictIsResolved( function resolveManually(conflict, cb) { conflict.resolveManually({name: 'manual'}, cb); }, done); }); }); function testUpdateConflictIsResolved(resolver, cb) { async.series([ // sync the new model to ClientB sync(ClientB, Server), verifyInstanceWasReplicated(ClientA, ClientB, sourceInstanceId), // ClientA makes a change updateSourceInstanceNameTo('a'), sync(ClientA, Server), // ClientB changes the same instance updateClientB('b'), function syncAndResolveConflict(next) { replicate(ClientB, Server, function(err, conflicts, cps) { if (err) return next(err); expect(conflicts).to.have.length(1); expect(conflicts[0].SourceModel.modelName) .to.equal(ClientB.modelName); debug('Resolving the conflict %j', conflicts[0]); resolver(conflicts[0], next); }); }, // repeat the last sync, it should pass now sync(ClientB, Server), // and sync back to ClientA too sync(ClientA, Server), verifyInstanceWasReplicated(ClientB, ClientA, sourceInstanceId), ], cb); } function testDeleteConflictIsResolved(resolver, cb) { async.series([ // sync the new model to ClientB sync(ClientB, Server), verifyInstanceWasReplicated(ClientA, ClientB, sourceInstanceId), // ClientA makes a change function deleteInstanceOnClientA(next) { ClientA.deleteById(sourceInstanceId, next); }, sync(ClientA, Server), // ClientB changes the same instance updateClientB('b'), function syncAndResolveConflict(next) { replicate(ClientB, Server, function(err, conflicts, cps) { if (err) return next(err); expect(conflicts).to.have.length(1); expect(conflicts[0].SourceModel.modelName) .to.equal(ClientB.modelName); debug('Resolving the conflict %j', conflicts[0]); resolver(conflicts[0], next); }); }, // repeat the last sync, it should pass now sync(ClientB, Server), // and sync back to ClientA too sync(ClientA, Server), verifyInstanceWasReplicated(ClientB, ClientA, sourceInstanceId), ], cb); } function updateClientB(name) { return function updateInstanceB(next) { ClientB.findById(sourceInstanceId, function(err, instance) { if (err) return next(err); instance.name = name; instance.save(next); }); }; } function sync(client, server) { return function syncBothWays(next) { async.series([ // NOTE(bajtos) It's important to replicate from the client to the // server first, so that we can resolve any conflicts at the client replicateExpectingSuccess(client, server), replicateExpectingSuccess(server, client), ], next); }; } }); function updateSourceInstanceNameTo(value) { return function updateInstance(next) { debug('update source instance name to %j', value); sourceInstance.name = value; sourceInstance.save(next); }; } function deleteSourceInstance(value) { return function deleteInstance(next) { debug('delete source instance', value); sourceInstance.remove(function(err) { sourceInstance = null; next(err); }); }; } function verifySourceWasReplicated(target) { if (!target) target = TargetModel; return function verify(next) { target.findById(sourceInstanceId, function(err, targetInstance) { if (err) return next(err); expect(targetInstance && targetInstance.toObject()) .to.eql(sourceInstance && sourceInstance.toObject()); next(); }); }; } }); describe('ensure options object is set on context during bulkUpdate', function() { var syncPropertyExists = false; var OptionsSourceModel; beforeEach(function() { OptionsSourceModel = PersistedModel.extend( 'OptionsSourceModel-' + tid, {id: {id: true, type: String, defaultFn: 'guid'}}, {trackChanges: true}); OptionsSourceModel.attachTo(dataSource); OptionsSourceModel.observe('before save', function updateTimestamp(ctx, next) { if (ctx.options.sync) { syncPropertyExists = true; } else { syncPropertyExists = false; } next(); }); }); it('bulkUpdate should call Model updates with the provided options object', function(done) { var testData = {name: 'Janie', surname: 'Doe'}; var updates = [ { data: null, change: null, type: 'create', }, ]; var options = { sync: true, }; async.waterfall([ function(callback) { TargetModel.create(testData, callback); }, function(data, callback) { updates[0].data = data; TargetModel.getChangeModel().find({where: {modelId: data.id}}, callback); }, function(data, callback) { updates[0].change = data; OptionsSourceModel.bulkUpdate(updates, options, callback); }], function(err, result) { if (err) return done(err); expect(syncPropertyExists).to.eql(true); done(); } ); }); }); describe('ensure bulkUpdate works with just 2 args', function() { it('bulkUpdate should successfully finish without options', function(done) { var testData = {name: 'Janie', surname: 'Doe'}; var updates = [{ data: null, change: null, type: 'create', }]; async.waterfall([ function(callback) { TargetModel.create(testData, callback); }, function(data, callback) { updates[0].data = data; TargetModel.getChangeModel().find({where: {modelId: data.id}}, callback); }, function(data, callback) { updates[0].change = data; SourceModel.bulkUpdate(updates, callback); }, ], function(err, result) { if (err) return done(err); done(); }); }); }); describe('Replication with chunking', function() { beforeEach(function() { var test = this; SourceModel = this.SourceModel = PersistedModel.extend( 'SourceModel-' + tid, {id: {id: true, type: String, defaultFn: 'guid'}}, {trackChanges: true, replicationChunkSize: 1}); SourceModel.attachTo(dataSource); TargetModel = this.TargetModel = PersistedModel.extend( 'TargetModel-' + tid, {id: {id: true, type: String, defaultFn: 'guid'}}, {trackChanges: true, replicationChunkSize: 1}); var TargetChange = TargetModel.Change; TargetChange.Checkpoint = loopback.Checkpoint.extend('TargetCheckpoint'); TargetChange.Checkpoint.attachTo(dataSource); TargetModel.attachTo(dataSource); test.startingCheckpoint = -1; }); describe('Model.replicate(since, targetModel, options, callback)', function() { it('calls bulkUpdate multiple times', function(done) { var test = this; var options = {}; var calls = mockBulkUpdate(TargetModel); SourceModel.create([{name: 'foo'}, {name: 'bar'}], function(err) { if (err) return done(err); test.SourceModel.replicate(test.startingCheckpoint, test.TargetModel, options, function(err, conflicts) { if (err) return done(err); assertTargetModelEqualsSourceModel(conflicts, test.SourceModel, test.TargetModel, done); expect(calls.length).to.eql(2); }); }); }); }); }); describe('Replication without chunking', function() { beforeEach(function() { var test = this; SourceModel = this.SourceModel = PersistedModel.extend( 'SourceModel-' + tid, {id: {id: true, type: String, defaultFn: 'guid'}}, {trackChanges: true}); SourceModel.attachTo(dataSource); TargetModel = this.TargetModel = PersistedModel.extend( 'TargetModel-' + tid, {id: {id: true, type: String, defaultFn: 'guid'}}, {trackChanges: true}); var TargetChange = TargetModel.Change; TargetChange.Checkpoint = loopback.Checkpoint.extend('TargetCheckpoint'); TargetChange.Checkpoint.attachTo(dataSource); TargetModel.attachTo(dataSource); test.startingCheckpoint = -1; }); describe('Model.replicate(since, targetModel, options, callback)', function() { it('calls bulkUpdate only once', function(done) { var test = this; var options = {}; var calls = mockBulkUpdate(TargetModel); SourceModel.create([{name: 'foo'}, {name: 'bar'}], function(err) { if (err) return done(err); test.SourceModel.replicate(test.startingCheckpoint, test.TargetModel, options, function(err, conflicts) { if (err) return done(err); assertTargetModelEqualsSourceModel(conflicts, test.SourceModel, test.TargetModel, done); expect(calls.length).to.eql(1); }); }); }); }); }); function mockBulkUpdate(modelToMock) { var calls = []; var originalBulkUpdateFunction = modelToMock.bulkUpdate; modelToMock.bulkUpdate = function(since, filter, callback) { calls.push('bulkUpdate'); originalBulkUpdateFunction.call(this, since, filter, callback); }; return calls; } var _since = {}; function replicate(source, target, since, next) { if (typeof since === 'function') { next = since; since = undefined; } var sinceIx = source.modelName + ':to:' + target.modelName; if (since === undefined) { since = useSinceFilter ? _since[sinceIx] || -1 : -1; } debug('replicate from %s to %s since %j', source.modelName, target.modelName, since); source.replicate(since, target, function(err, conflicts, cps) { if (err) return next(err); if (conflicts.length === 0) { _since[sinceIx] = cps; } next(err, conflicts, cps); }); } function createModel(Model, data) { return function create(next) { Model.create(data, next); }; } function replicateExpectingSuccess(source, target, since) { if (!source) source = SourceModel; if (!target) target = TargetModel; return function doReplicate(next) { replicate(source, target, since, function(err, conflicts, cps) { if (err) return next(err); if (conflicts.length) { return next(new Error('Unexpected conflicts\n' + conflicts.map(JSON.stringify).join('\n'))); } next(); }); }; } function setupRaceConditionInReplication(fn) { var bulkUpdate = TargetModel.bulkUpdate; TargetModel.bulkUpdate = function(data, options, cb) { // simulate the situation when a 3rd party modifies the database // while a replication run is in progress var self = this; fn(function(err) { if (err) return cb(err); bulkUpdate.call(self, data, options, cb); }); // apply the 3rd party modification only once TargetModel.bulkUpdate = bulkUpdate; }; } function verifyInstanceWasReplicated(source, target, id) { return function verify(next) { source.findById(id, function(err, expected) { if (err) return next(err); target.findById(id, function(err, actual) { if (err) return next(err); expect(actual && actual.toObject()) .to.eql(expected && expected.toObject()); debug('replicated instance: %j', actual); next(); }); }); }; } function spyAndStoreSinceArg(Model, methodName, store) { var orig = Model[methodName]; Model[methodName] = function(since) { store.push(since); orig.apply(this, arguments); }; } function getPropValue(obj, name) { return Array.isArray(obj) ? obj.map(function(it) { return getPropValue(it, name); }) : obj[name]; } function getIds(list) { return getPropValue(list, 'id'); } function assertTargetModelEqualsSourceModel(conflicts, sourceModel, targetModel, done) { var sourceData, targetData; assert(conflicts.length === 0); async.parallel([ function(cb) { sourceModel.find(function(err, result) { if (err) return cb(err); sourceData = result; cb(); }); }, function(cb) { targetModel.find(function(err, result) { if (err) return cb(err); targetData = result; cb(); }); }, ], function(err) { if (err) return done(err); assert.deepEqual(sourceData, targetData); done(); }); } }); describe('Replication / Change APIs with custom change properties', function() { this.timeout(10000); var dataSource, useSinceFilter, SourceModel, TargetModel, startingCheckpoint; var tid = 0; // per-test unique id used e.g. to build unique model names beforeEach(function() { tid++; useSinceFilter = false; var test = this; dataSource = this.dataSource = loopback.createDataSource({ connector: loopback.Memory, }); SourceModel = this.SourceModel = PersistedModel.extend( 'SourceModelWithCustomChangeProperties-' + tid, { id: {id: true, type: String, defaultFn: 'guid'}, customProperty: {type: 'string'}, }, { trackChanges: true, additionalChangeModelProperties: {customProperty: {type: 'string'}}, }); SourceModel.createChangeFilter = function(since, modelFilter) { const filter = this.base.createChangeFilter.apply(this, arguments); if (modelFilter && modelFilter.where && modelFilter.where.customProperty) filter.where.customProperty = modelFilter.where.customProperty; return filter; }; SourceModel.prototype.fillCustomChangeProperties = function(change, cb) { const customProperty = this.customProperty; const base = this.constructor.base; base.prototype.fillCustomChangeProperties.call(this, change, err => { if (err) return cb(err); change.customProperty = customProperty; cb(); }); }; SourceModel.attachTo(dataSource); TargetModel = this.TargetModel = PersistedModel.extend( 'TargetModelWithCustomChangeProperties-' + tid, { id: {id: true, type: String, defaultFn: 'guid'}, customProperty: {type: 'string'}, }, { trackChanges: true, additionalChangeModelProperties: {customProperty: {type: 'string'}}, }); var ChangeModelForTarget = TargetModel.Change; ChangeModelForTarget.Checkpoint = loopback.Checkpoint.extend('TargetCheckpoint'); ChangeModelForTarget.Checkpoint.attachTo(dataSource); TargetModel.attachTo(dataSource); startingCheckpoint = -1; }); describe('Model._defineChangeModel()', function() { it('defines change model with custom properties', function() { var changeModel = SourceModel.getChangeModel(); var changeModelProperties = changeModel.definition.properties; expect(changeModelProperties).to.have.property('customProperty'); }); }); describe('Model.changes(since, filter, callback)', function() { beforeEach(givenSomeSourceModelInstances); it('queries changes using customized filter', function(done) { var filterUsed = mockChangeFind(this.SourceModel); SourceModel.changes( startingCheckpoint, {where: {customProperty: '123'}}, function(err, changes) { if (err) return done(err); expect(filterUsed[0]).to.eql({ where: { checkpoint: {gte: -1}, modelName: SourceModel.modelName, customProperty: '123', }, }); done(); }); }); it('query returns the matching changes', function(done) { SourceModel.changes( startingCheckpoint, {where: {customProperty: '123'}}, function(err, changes) { expect(changes).to.have.length(1); expect(changes[0]).to.have.property('customProperty', '123'); done(); }); }); function givenSomeSourceModelInstances(done) { const data = [ {name: 'foo', customProperty: '123'}, {name: 'foo', customPropertyValue: '456'}, ]; this.SourceModel.create(data, done); } }); function mockChangeFind(Model) { var filterUsed = []; Model.getChangeModel().find = function(filter, cb) { filterUsed.push(filter); if (cb) { process.nextTick(cb); } }; return filterUsed; } });