1253 lines
39 KiB
JavaScript
1253 lines
39 KiB
JavaScript
var async = require('async');
|
|
var loopback = require('../');
|
|
var Change = loopback.Change;
|
|
var defineModelTestsWithDataSource = require('./util/model-tests');
|
|
var PersistedModel = loopback.PersistedModel;
|
|
var expect = require('chai').expect;
|
|
var debug = require('debug')('test');
|
|
|
|
describe('Replication / Change APIs', function() {
|
|
var dataSource, SourceModel, TargetModel;
|
|
var useSinceFilter;
|
|
var tid = 0; // per-test unique id used e.g. to build unique model names
|
|
|
|
beforeEach(function() {
|
|
tid++;
|
|
useSinceFilter = false;
|
|
var test = this;
|
|
dataSource = this.dataSource = loopback.createDataSource({
|
|
connector: loopback.Memory
|
|
});
|
|
SourceModel = this.SourceModel = PersistedModel.extend(
|
|
'SourceModel-' + tid,
|
|
{ id: { id: true, type: String, defaultFn: 'guid' } },
|
|
{ trackChanges: true });
|
|
|
|
SourceModel.attachTo(dataSource);
|
|
|
|
TargetModel = this.TargetModel = PersistedModel.extend(
|
|
'TargetModel-' + tid,
|
|
{ id: { id: true, type: String, defaultFn: 'guid' } },
|
|
{ trackChanges: true });
|
|
|
|
// NOTE(bajtos) At the moment, all models share the same Checkpoint
|
|
// model. This causes the in-process replication to work differently
|
|
// than client-server replication.
|
|
// As a workaround, we manually setup unique Checkpoint for TargetModel.
|
|
var TargetChange = TargetModel.Change;
|
|
TargetChange.Checkpoint = loopback.Checkpoint.extend('TargetCheckpoint');
|
|
TargetChange.Checkpoint.attachTo(dataSource);
|
|
|
|
TargetModel.attachTo(dataSource);
|
|
|
|
test.startingCheckpoint = -1;
|
|
|
|
this.createInitalData = function(cb) {
|
|
SourceModel.create({name: 'foo'}, function(err, inst) {
|
|
if (err) return cb(err);
|
|
test.model = inst;
|
|
SourceModel.replicate(TargetModel, cb);
|
|
});
|
|
};
|
|
});
|
|
|
|
describe('Model.changes(since, filter, callback)', function() {
|
|
it('Get changes since the given checkpoint', function(done) {
|
|
var test = this;
|
|
this.SourceModel.create({name: 'foo'}, function(err) {
|
|
if (err) return done(err);
|
|
setTimeout(function() {
|
|
test.SourceModel.changes(test.startingCheckpoint, {}, function(err, changes) {
|
|
assert.equal(changes.length, 1);
|
|
done();
|
|
});
|
|
}, 1);
|
|
});
|
|
});
|
|
|
|
it('excludes changes from older checkpoints', function(done) {
|
|
var FUTURE_CHECKPOINT = 999;
|
|
|
|
SourceModel.create({ name: 'foo' }, function(err) {
|
|
if (err) return done(err);
|
|
SourceModel.changes(FUTURE_CHECKPOINT, {}, function(err, changes) {
|
|
if (err) return done(err);
|
|
/*jshint -W030 */
|
|
expect(changes).to.be.empty;
|
|
done();
|
|
});
|
|
});
|
|
});
|
|
});
|
|
|
|
describe('Model.replicate(since, targetModel, options, callback)', function() {
|
|
it('Replicate data using the target model', function(done) {
|
|
var test = this;
|
|
var options = {};
|
|
var sourceData;
|
|
var targetData;
|
|
|
|
this.SourceModel.create({name: 'foo'}, function(err) {
|
|
if (err) return done(err);
|
|
test.SourceModel.replicate(test.startingCheckpoint, test.TargetModel,
|
|
options, function(err, conflicts) {
|
|
if (err) return done(err);
|
|
assert(conflicts.length === 0);
|
|
async.parallel([
|
|
function(cb) {
|
|
test.SourceModel.find(function(err, result) {
|
|
if (err) return cb(err);
|
|
sourceData = result;
|
|
cb();
|
|
});
|
|
},
|
|
function(cb) {
|
|
test.TargetModel.find(function(err, result) {
|
|
if (err) return cb(err);
|
|
targetData = result;
|
|
cb();
|
|
});
|
|
}
|
|
], function(err) {
|
|
if (err) return done(err);
|
|
|
|
assert.deepEqual(sourceData, targetData);
|
|
done();
|
|
});
|
|
});
|
|
});
|
|
});
|
|
|
|
it('applies "since" filter on source changes', function(done) {
|
|
async.series([
|
|
function createModelInSourceCp1(next) {
|
|
SourceModel.create({ id: '1' }, next);
|
|
},
|
|
function checkpoint(next) {
|
|
SourceModel.checkpoint(next);
|
|
},
|
|
function createModelInSourceCp2(next) {
|
|
SourceModel.create({ id: '2' }, next);
|
|
},
|
|
function replicateLastChangeOnly(next) {
|
|
SourceModel.currentCheckpoint(function(err, cp) {
|
|
if (err) return done(err);
|
|
SourceModel.replicate(cp, TargetModel, next);
|
|
});
|
|
},
|
|
function verify(next) {
|
|
TargetModel.find(function(err, list) {
|
|
if (err) return done(err);
|
|
// '1' should be skipped by replication
|
|
expect(getIds(list)).to.eql(['2']);
|
|
next();
|
|
});
|
|
}
|
|
], done);
|
|
});
|
|
|
|
it('applies "since" filter on target changes', function(done) {
|
|
// Because the "since" filter is just an optimization,
|
|
// there isn't really any observable behaviour we could
|
|
// check to assert correct implementation.
|
|
var diffSince = [];
|
|
spyAndStoreSinceArg(TargetModel, 'diff', diffSince);
|
|
|
|
SourceModel.replicate(10, TargetModel, function(err) {
|
|
if (err) return done(err);
|
|
expect(diffSince).to.eql([10]);
|
|
done();
|
|
});
|
|
});
|
|
|
|
it('uses different "since" value for source and target', function(done) {
|
|
var sourceSince = [];
|
|
var targetSince = [];
|
|
|
|
spyAndStoreSinceArg(SourceModel, 'changes', sourceSince);
|
|
spyAndStoreSinceArg(TargetModel, 'diff', targetSince);
|
|
|
|
var since = { source: 1, target: 2 };
|
|
SourceModel.replicate(since, TargetModel, function(err) {
|
|
if (err) return done(err);
|
|
expect(sourceSince).to.eql([1]);
|
|
expect(targetSince).to.eql([2]);
|
|
done();
|
|
});
|
|
});
|
|
|
|
it('picks up changes made during replication', function(done) {
|
|
setupRaceConditionInReplication(function(cb) {
|
|
// simulate the situation when another model is created
|
|
// while a replication run is in progress
|
|
SourceModel.create({ id: 'racer' }, cb);
|
|
});
|
|
|
|
var lastCp;
|
|
async.series([
|
|
function buildSomeDataToReplicate(next) {
|
|
SourceModel.create({ id: 'init' }, next);
|
|
},
|
|
function getLastCp(next) {
|
|
SourceModel.currentCheckpoint(function(err, cp) {
|
|
if (err) return done(err);
|
|
lastCp = cp;
|
|
next();
|
|
});
|
|
},
|
|
function replicate(next) {
|
|
SourceModel.replicate(TargetModel, next);
|
|
},
|
|
function verifyAssumptions(next) {
|
|
SourceModel.find(function(err, list) {
|
|
expect(getIds(list), 'source ids')
|
|
.to.eql(['init', 'racer']);
|
|
|
|
TargetModel.find(function(err, list) {
|
|
expect(getIds(list), 'target ids after first sync')
|
|
.to.include.members(['init']);
|
|
next();
|
|
});
|
|
});
|
|
},
|
|
function replicateAgain(next) {
|
|
SourceModel.replicate(lastCp + 1, TargetModel, next);
|
|
},
|
|
function verify(next) {
|
|
TargetModel.find(function(err, list) {
|
|
expect(getIds(list), 'target ids').to.eql(['init', 'racer']);
|
|
next();
|
|
});
|
|
}
|
|
], done);
|
|
});
|
|
|
|
it('returns new current checkpoints to callback', function(done) {
|
|
var sourceCp, targetCp;
|
|
async.series([
|
|
bumpSourceCheckpoint,
|
|
bumpTargetCheckpoint,
|
|
bumpTargetCheckpoint,
|
|
function replicate(cb) {
|
|
expect(sourceCp).to.not.equal(targetCp);
|
|
|
|
SourceModel.replicate(
|
|
TargetModel,
|
|
function(err, conflicts, newCheckpoints) {
|
|
if (err) return cb(err);
|
|
expect(conflicts, 'conflicts').to.eql([]);
|
|
expect(newCheckpoints, 'currentCheckpoints').to.eql({
|
|
source: sourceCp + 1,
|
|
target: targetCp + 1
|
|
});
|
|
cb();
|
|
});
|
|
}
|
|
], done);
|
|
|
|
function bumpSourceCheckpoint(cb) {
|
|
SourceModel.checkpoint(function(err, inst) {
|
|
if (err) return cb(err);
|
|
sourceCp = inst.seq;
|
|
cb();
|
|
});
|
|
}
|
|
|
|
function bumpTargetCheckpoint(cb) {
|
|
TargetModel.checkpoint(function(err, inst) {
|
|
if (err) return cb(err);
|
|
targetCp = inst.seq;
|
|
cb();
|
|
});
|
|
}
|
|
});
|
|
|
|
it('leaves current target checkpoint empty', function(done) {
|
|
async.series([
|
|
function createTestData(next) {
|
|
SourceModel.create({}, next);
|
|
},
|
|
replicateExpectingSuccess(),
|
|
function verify(next) {
|
|
TargetModel.currentCheckpoint(function(err, cp) {
|
|
if (err) return next(err);
|
|
TargetModel.getChangeModel().find(
|
|
{ where: { checkpoint: { gte: cp } } },
|
|
function(err, changes) {
|
|
if (err) return done(err);
|
|
expect(changes).to.have.length(0);
|
|
done();
|
|
});
|
|
});
|
|
}
|
|
], done);
|
|
});
|
|
|
|
describe('with 3rd-party changes', function() {
|
|
it('detects UPDATE made during UPDATE', function(done) {
|
|
async.series([
|
|
createModel(SourceModel, { id: '1' }),
|
|
replicateExpectingSuccess(),
|
|
function updateModel(next) {
|
|
SourceModel.updateAll({ id: '1' }, { name: 'source' }, next);
|
|
},
|
|
function replicateWith3rdPartyModifyingData(next) {
|
|
setupRaceConditionInReplication(function(cb) {
|
|
TargetModel.dataSource.connector.updateAttributes(
|
|
TargetModel.modelName,
|
|
'1',
|
|
{ name: '3rd-party' },
|
|
cb);
|
|
});
|
|
|
|
SourceModel.replicate(
|
|
TargetModel,
|
|
function(err, conflicts, cps, updates) {
|
|
if (err) return next(err);
|
|
|
|
var conflictedIds = getPropValue(conflicts || [], 'modelId');
|
|
expect(conflictedIds).to.eql(['1']);
|
|
|
|
// resolve the conflict using ours
|
|
conflicts[0].resolve(next);
|
|
});
|
|
},
|
|
|
|
replicateExpectingSuccess(),
|
|
verifyInstanceWasReplicated(SourceModel, TargetModel, '1')
|
|
], done);
|
|
});
|
|
|
|
it('detects CREATE made during CREATE', function(done) {
|
|
async.series([
|
|
// FIXME(bajtos) Remove the 'name' property once the implementation
|
|
// of UPDATE is fixed to correctly remove properties
|
|
createModel(SourceModel, { id: '1', name: 'source' }),
|
|
function replicateWith3rdPartyModifyingData(next) {
|
|
setupRaceConditionInReplication(function(cb) {
|
|
TargetModel.dataSource.connector.create(
|
|
TargetModel.modelName,
|
|
{ id: '1', name: '3rd-party' },
|
|
cb);
|
|
});
|
|
|
|
SourceModel.replicate(
|
|
TargetModel,
|
|
function(err, conflicts, cps, updates) {
|
|
if (err) return next(err);
|
|
|
|
var conflictedIds = getPropValue(conflicts || [], 'modelId');
|
|
expect(conflictedIds).to.eql(['1']);
|
|
|
|
// resolve the conflict using ours
|
|
conflicts[0].resolve(next);
|
|
});
|
|
},
|
|
|
|
replicateExpectingSuccess(),
|
|
verifyInstanceWasReplicated(SourceModel, TargetModel, '1')
|
|
], done);
|
|
});
|
|
|
|
it('detects UPDATE made during DELETE', function(done) {
|
|
async.series([
|
|
createModel(SourceModel, { id: '1' }),
|
|
replicateExpectingSuccess(),
|
|
function deleteModel(next) {
|
|
SourceModel.deleteById('1', next);
|
|
},
|
|
function replicateWith3rdPartyModifyingData(next) {
|
|
setupRaceConditionInReplication(function(cb) {
|
|
TargetModel.dataSource.connector.updateAttributes(
|
|
TargetModel.modelName,
|
|
'1',
|
|
{ name: '3rd-party' },
|
|
cb);
|
|
});
|
|
|
|
SourceModel.replicate(
|
|
TargetModel,
|
|
function(err, conflicts, cps, updates) {
|
|
if (err) return next(err);
|
|
|
|
var conflictedIds = getPropValue(conflicts || [], 'modelId');
|
|
expect(conflictedIds).to.eql(['1']);
|
|
|
|
// resolve the conflict using ours
|
|
conflicts[0].resolve(next);
|
|
});
|
|
},
|
|
|
|
replicateExpectingSuccess(),
|
|
verifyInstanceWasReplicated(SourceModel, TargetModel, '1')
|
|
], done);
|
|
});
|
|
|
|
it('handles DELETE made during DELETE', function(done) {
|
|
async.series([
|
|
createModel(SourceModel, { id: '1' }),
|
|
replicateExpectingSuccess(),
|
|
function deleteModel(next) {
|
|
SourceModel.deleteById('1', next);
|
|
},
|
|
function setup3rdPartyModifyingData(next) {
|
|
setupRaceConditionInReplication(function(cb) {
|
|
TargetModel.dataSource.connector.destroy(
|
|
TargetModel.modelName,
|
|
'1',
|
|
cb);
|
|
});
|
|
next();
|
|
},
|
|
replicateExpectingSuccess(),
|
|
verifyInstanceWasReplicated(SourceModel, TargetModel, '1')
|
|
], done);
|
|
});
|
|
});
|
|
});
|
|
|
|
describe('conflict detection - both updated', function() {
|
|
beforeEach(function(done) {
|
|
var SourceModel = this.SourceModel;
|
|
var TargetModel = this.TargetModel;
|
|
var test = this;
|
|
|
|
test.createInitalData(createConflict);
|
|
|
|
function createConflict(err, conflicts) {
|
|
async.parallel([
|
|
function(cb) {
|
|
SourceModel.findOne(function(err, inst) {
|
|
if (err) return cb(err);
|
|
inst.name = 'source update';
|
|
inst.save(cb);
|
|
});
|
|
},
|
|
function(cb) {
|
|
TargetModel.findOne(function(err, inst) {
|
|
if (err) return cb(err);
|
|
inst.name = 'target update';
|
|
inst.save(cb);
|
|
});
|
|
}
|
|
], function(err) {
|
|
if (err) return done(err);
|
|
SourceModel.replicate(TargetModel, function(err, conflicts) {
|
|
if (err) return done(err);
|
|
test.conflicts = conflicts;
|
|
test.conflict = conflicts[0];
|
|
done();
|
|
});
|
|
});
|
|
}
|
|
});
|
|
it('should detect a single conflict', function() {
|
|
assert.equal(this.conflicts.length, 1);
|
|
assert(this.conflict);
|
|
});
|
|
it('type should be UPDATE', function(done) {
|
|
this.conflict.type(function(err, type) {
|
|
assert.equal(type, Change.UPDATE);
|
|
done();
|
|
});
|
|
});
|
|
it('conflict.changes()', function(done) {
|
|
var test = this;
|
|
this.conflict.changes(function(err, sourceChange, targetChange) {
|
|
assert.equal(typeof sourceChange.id, 'string');
|
|
assert.equal(typeof targetChange.id, 'string');
|
|
assert.equal(test.model.getId(), sourceChange.getModelId());
|
|
assert.equal(sourceChange.type(), Change.UPDATE);
|
|
assert.equal(targetChange.type(), Change.UPDATE);
|
|
done();
|
|
});
|
|
});
|
|
it('conflict.models()', function(done) {
|
|
var test = this;
|
|
this.conflict.models(function(err, source, target) {
|
|
assert.deepEqual(source.toJSON(), {
|
|
id: test.model.id,
|
|
name: 'source update'
|
|
});
|
|
assert.deepEqual(target.toJSON(), {
|
|
id: test.model.id,
|
|
name: 'target update'
|
|
});
|
|
done();
|
|
});
|
|
});
|
|
});
|
|
|
|
describe('conflict detection - source deleted', function() {
|
|
beforeEach(function(done) {
|
|
var SourceModel = this.SourceModel;
|
|
var TargetModel = this.TargetModel;
|
|
var test = this;
|
|
|
|
test.createInitalData(createConflict);
|
|
|
|
function createConflict() {
|
|
async.parallel([
|
|
function(cb) {
|
|
SourceModel.findOne(function(err, inst) {
|
|
if (err) return cb(err);
|
|
test.model = inst;
|
|
inst.remove(cb);
|
|
});
|
|
},
|
|
function(cb) {
|
|
TargetModel.findOne(function(err, inst) {
|
|
if (err) return cb(err);
|
|
inst.name = 'target update';
|
|
inst.save(cb);
|
|
});
|
|
}
|
|
], function(err) {
|
|
if (err) return done(err);
|
|
SourceModel.replicate(TargetModel, function(err, conflicts) {
|
|
if (err) return done(err);
|
|
test.conflicts = conflicts;
|
|
test.conflict = conflicts[0];
|
|
done();
|
|
});
|
|
});
|
|
}
|
|
});
|
|
it('should detect a single conflict', function() {
|
|
assert.equal(this.conflicts.length, 1);
|
|
assert(this.conflict);
|
|
});
|
|
it('type should be DELETE', function(done) {
|
|
this.conflict.type(function(err, type) {
|
|
assert.equal(type, Change.DELETE);
|
|
done();
|
|
});
|
|
});
|
|
it('conflict.changes()', function(done) {
|
|
var test = this;
|
|
this.conflict.changes(function(err, sourceChange, targetChange) {
|
|
assert.equal(typeof sourceChange.id, 'string');
|
|
assert.equal(typeof targetChange.id, 'string');
|
|
assert.equal(test.model.getId(), sourceChange.getModelId());
|
|
assert.equal(sourceChange.type(), Change.DELETE);
|
|
assert.equal(targetChange.type(), Change.UPDATE);
|
|
done();
|
|
});
|
|
});
|
|
it('conflict.models()', function(done) {
|
|
var test = this;
|
|
this.conflict.models(function(err, source, target) {
|
|
assert.equal(source, null);
|
|
assert.deepEqual(target.toJSON(), {
|
|
id: test.model.id,
|
|
name: 'target update'
|
|
});
|
|
done();
|
|
});
|
|
});
|
|
});
|
|
|
|
describe('conflict detection - target deleted', function() {
|
|
beforeEach(function(done) {
|
|
var SourceModel = this.SourceModel;
|
|
var TargetModel = this.TargetModel;
|
|
var test = this;
|
|
|
|
test.createInitalData(createConflict);
|
|
|
|
function createConflict() {
|
|
async.parallel([
|
|
function(cb) {
|
|
SourceModel.findOne(function(err, inst) {
|
|
if (err) return cb(err);
|
|
test.model = inst;
|
|
inst.name = 'source update';
|
|
inst.save(cb);
|
|
});
|
|
},
|
|
function(cb) {
|
|
TargetModel.findOne(function(err, inst) {
|
|
if (err) return cb(err);
|
|
inst.remove(cb);
|
|
});
|
|
}
|
|
], function(err) {
|
|
if (err) return done(err);
|
|
SourceModel.replicate(TargetModel, function(err, conflicts) {
|
|
if (err) return done(err);
|
|
test.conflicts = conflicts;
|
|
test.conflict = conflicts[0];
|
|
done();
|
|
});
|
|
});
|
|
}
|
|
});
|
|
it('should detect a single conflict', function() {
|
|
assert.equal(this.conflicts.length, 1);
|
|
assert(this.conflict);
|
|
});
|
|
it('type should be DELETE', function(done) {
|
|
this.conflict.type(function(err, type) {
|
|
assert.equal(type, Change.DELETE);
|
|
done();
|
|
});
|
|
});
|
|
it('conflict.changes()', function(done) {
|
|
var test = this;
|
|
this.conflict.changes(function(err, sourceChange, targetChange) {
|
|
assert.equal(typeof sourceChange.id, 'string');
|
|
assert.equal(typeof targetChange.id, 'string');
|
|
assert.equal(test.model.getId(), sourceChange.getModelId());
|
|
assert.equal(sourceChange.type(), Change.UPDATE);
|
|
assert.equal(targetChange.type(), Change.DELETE);
|
|
done();
|
|
});
|
|
});
|
|
it('conflict.models()', function(done) {
|
|
var test = this;
|
|
this.conflict.models(function(err, source, target) {
|
|
assert.equal(target, null);
|
|
assert.deepEqual(source.toJSON(), {
|
|
id: test.model.id,
|
|
name: 'source update'
|
|
});
|
|
done();
|
|
});
|
|
});
|
|
});
|
|
|
|
describe('conflict detection - both deleted', function() {
|
|
beforeEach(function(done) {
|
|
var SourceModel = this.SourceModel;
|
|
var TargetModel = this.TargetModel;
|
|
var test = this;
|
|
|
|
test.createInitalData(createConflict);
|
|
|
|
function createConflict() {
|
|
async.parallel([
|
|
function(cb) {
|
|
SourceModel.findOne(function(err, inst) {
|
|
if (err) return cb(err);
|
|
test.model = inst;
|
|
inst.remove(cb);
|
|
});
|
|
},
|
|
function(cb) {
|
|
TargetModel.findOne(function(err, inst) {
|
|
if (err) return cb(err);
|
|
inst.remove(cb);
|
|
});
|
|
}
|
|
], function(err) {
|
|
if (err) return done(err);
|
|
SourceModel.replicate(TargetModel, function(err, conflicts) {
|
|
if (err) return done(err);
|
|
test.conflicts = conflicts;
|
|
test.conflict = conflicts[0];
|
|
done();
|
|
});
|
|
});
|
|
}
|
|
});
|
|
it('should not detect a conflict', function() {
|
|
assert.equal(this.conflicts.length, 0);
|
|
assert(!this.conflict);
|
|
});
|
|
});
|
|
|
|
describe('change detection', function() {
|
|
it('detects "create"', function(done) {
|
|
SourceModel.create({}, function(err, inst) {
|
|
if (err) return done(err);
|
|
assertChangeRecordedForId(inst.id, done);
|
|
});
|
|
});
|
|
|
|
it('detects "updateOrCreate"', function(done) {
|
|
givenReplicatedInstance(function(err, created) {
|
|
if (err) return done(err);
|
|
var data = created.toObject();
|
|
created.name = 'updated';
|
|
SourceModel.updateOrCreate(created, function(err, inst) {
|
|
if (err) return done(err);
|
|
assertChangeRecordedForId(inst.id, done);
|
|
});
|
|
});
|
|
});
|
|
|
|
it('detects "findOrCreate"', function(done) {
|
|
// make sure we bypass find+create and call the connector directly
|
|
SourceModel.dataSource.connector.findOrCreate =
|
|
function(model, query, data, callback) {
|
|
this.all(model, query, function(err, list) {
|
|
if (err || (list && list[0]))
|
|
return callback(err, list && list[0], false);
|
|
this.create(model, data, function(err) {
|
|
callback(err, data, true);
|
|
});
|
|
}.bind(this));
|
|
};
|
|
|
|
SourceModel.findOrCreate(
|
|
{ where: { name: 'does-not-exist' } },
|
|
{ name: 'created' },
|
|
function(err, inst) {
|
|
if (err) return done(err);
|
|
assertChangeRecordedForId(inst.id, done);
|
|
});
|
|
});
|
|
|
|
it('detects "deleteById"', function(done) {
|
|
givenReplicatedInstance(function(err, inst) {
|
|
if (err) return done(err);
|
|
SourceModel.deleteById(inst.id, function(err) {
|
|
assertChangeRecordedForId(inst.id, done);
|
|
});
|
|
});
|
|
});
|
|
|
|
it('detects "deleteAll"', function(done) {
|
|
givenReplicatedInstance(function(err, inst) {
|
|
if (err) return done(err);
|
|
SourceModel.deleteAll({ name: inst.name }, function(err) {
|
|
if (err) return done(err);
|
|
assertChangeRecordedForId(inst.id, done);
|
|
});
|
|
});
|
|
});
|
|
|
|
it('detects "updateAll"', function(done) {
|
|
givenReplicatedInstance(function(err, inst) {
|
|
if (err) return done(err);
|
|
SourceModel.updateAll(
|
|
{ name: inst.name },
|
|
{ name: 'updated' },
|
|
function(err) {
|
|
if (err) return done(err);
|
|
assertChangeRecordedForId(inst.id, done);
|
|
});
|
|
});
|
|
});
|
|
|
|
it('detects "prototype.save"', function(done) {
|
|
givenReplicatedInstance(function(err, inst) {
|
|
if (err) return done(err);
|
|
inst.name = 'updated';
|
|
inst.save(function(err) {
|
|
if (err) return done(err);
|
|
assertChangeRecordedForId(inst.id, done);
|
|
});
|
|
});
|
|
});
|
|
|
|
it('detects "prototype.updateAttributes"', function(done) {
|
|
givenReplicatedInstance(function(err, inst) {
|
|
if (err) return done(err);
|
|
inst.updateAttributes({ name: 'updated' }, function(err) {
|
|
if (err) return done(err);
|
|
assertChangeRecordedForId(inst.id, done);
|
|
});
|
|
});
|
|
});
|
|
|
|
it('detects "prototype.delete"', function(done) {
|
|
givenReplicatedInstance(function(err, inst) {
|
|
if (err) return done(err);
|
|
inst.delete(function(err) {
|
|
assertChangeRecordedForId(inst.id, done);
|
|
});
|
|
});
|
|
});
|
|
|
|
function givenReplicatedInstance(cb) {
|
|
SourceModel.create({ name: 'a-name' }, function(err, inst) {
|
|
if (err) return cb(err);
|
|
SourceModel.checkpoint(function(err) {
|
|
if (err) return cb(err);
|
|
cb(null, inst);
|
|
});
|
|
});
|
|
}
|
|
|
|
function assertChangeRecordedForId(id, cb) {
|
|
SourceModel.getChangeModel().getCheckpointModel()
|
|
.current(function(err, cp) {
|
|
if (err) return cb(err);
|
|
SourceModel.changes(cp - 1, {}, function(err, pendingChanges) {
|
|
if (err) return cb(err);
|
|
expect(pendingChanges, 'list of changes').to.have.length(1);
|
|
var change = pendingChanges[0].toObject();
|
|
expect(change).to.have.property('checkpoint', cp); // sanity check
|
|
expect(change).to.have.property('modelName', SourceModel.modelName);
|
|
// NOTE(bajtos) Change.modelId is always String
|
|
// regardless of the type of the changed model's id property
|
|
expect(change).to.have.property('modelId', '' + id);
|
|
cb();
|
|
});
|
|
});
|
|
}
|
|
});
|
|
|
|
describe('complex setup', function() {
|
|
var sourceInstance, sourceInstanceId, AnotherModel;
|
|
|
|
beforeEach(function createReplicatedInstance(done) {
|
|
async.series([
|
|
function createInstance(next) {
|
|
SourceModel.create({ id: 'test-instance' }, function(err, result) {
|
|
sourceInstance = result;
|
|
sourceInstanceId = result.id;
|
|
next(err);
|
|
});
|
|
},
|
|
replicateExpectingSuccess(),
|
|
verifySourceWasReplicated()
|
|
], done);
|
|
});
|
|
|
|
beforeEach(function setupThirdModel() {
|
|
AnotherModel = this.AnotherModel = PersistedModel.extend(
|
|
'AnotherModel-' + tid,
|
|
{ id: { id: true, type: String, defaultFn: 'guid' } },
|
|
{ trackChanges: true });
|
|
|
|
// NOTE(bajtos) At the moment, all models share the same Checkpoint
|
|
// model. This causes the in-process replication to work differently
|
|
// than client-server replication.
|
|
// As a workaround, we manually setup unique Checkpoint for AnotherModel.
|
|
var AnotherChange = AnotherModel.Change;
|
|
AnotherChange.Checkpoint = loopback.Checkpoint.extend('AnotherCheckpoint');
|
|
AnotherChange.Checkpoint.attachTo(dataSource);
|
|
|
|
AnotherModel.attachTo(dataSource);
|
|
});
|
|
|
|
it('correctly replicates without checkpoint filter', function(done) {
|
|
async.series([
|
|
updateSourceInstanceNameTo('updated'),
|
|
replicateExpectingSuccess(),
|
|
verifySourceWasReplicated(),
|
|
|
|
function deleteInstance(next) {
|
|
sourceInstance.remove(next);
|
|
},
|
|
replicateExpectingSuccess(),
|
|
function verifyTargetModelWasDeleted(next) {
|
|
TargetModel.find(function(err, list) {
|
|
if (err) return next(err);
|
|
expect(getIds(list)).to.not.contain(sourceInstance.id);
|
|
next();
|
|
});
|
|
}
|
|
], done);
|
|
});
|
|
|
|
it('replicates multiple updates within the same CP', function(done) {
|
|
async.series([
|
|
replicateExpectingSuccess(),
|
|
verifySourceWasReplicated(),
|
|
|
|
updateSourceInstanceNameTo('updated'),
|
|
updateSourceInstanceNameTo('again'),
|
|
replicateExpectingSuccess(),
|
|
verifySourceWasReplicated()
|
|
], done);
|
|
});
|
|
|
|
describe('clientA-server-clientB', function() {
|
|
var ClientA, Server, ClientB;
|
|
|
|
beforeEach(function() {
|
|
ClientA = SourceModel;
|
|
Server = TargetModel;
|
|
ClientB = AnotherModel;
|
|
|
|
// NOTE(bajtos) The tests should ideally pass without the since
|
|
// filter too. Unfortunately that's not possible with the current
|
|
// implementation that remembers only the last two changes made.
|
|
useSinceFilter = true;
|
|
});
|
|
|
|
it('replicates new models', function(done) {
|
|
async.series([
|
|
// Note that ClientA->Server was already replicated during setup
|
|
replicateExpectingSuccess(Server, ClientB),
|
|
verifySourceWasReplicated(ClientB)
|
|
], done);
|
|
});
|
|
|
|
it('propagates updates with no false conflicts', function(done) {
|
|
async.series([
|
|
updateSourceInstanceNameTo('v2'),
|
|
replicateExpectingSuccess(ClientA, Server),
|
|
|
|
replicateExpectingSuccess(Server, ClientB),
|
|
|
|
updateSourceInstanceNameTo('v3'),
|
|
replicateExpectingSuccess(ClientA, Server),
|
|
updateSourceInstanceNameTo('v4'),
|
|
replicateExpectingSuccess(ClientA, Server),
|
|
|
|
replicateExpectingSuccess(Server, ClientB),
|
|
verifySourceWasReplicated(ClientB)
|
|
], done);
|
|
});
|
|
|
|
it('propagates deletes with no false conflicts', function(done) {
|
|
async.series([
|
|
deleteSourceInstance(),
|
|
replicateExpectingSuccess(ClientA, Server),
|
|
replicateExpectingSuccess(Server, ClientB),
|
|
verifySourceWasReplicated(ClientB)
|
|
], done);
|
|
});
|
|
|
|
describe('bidirectional sync', function() {
|
|
beforeEach(function finishInitialSync(next) {
|
|
// The fixture setup creates a new model instance and replicates
|
|
// it from ClientA to Server. Since we are performing bidirectional
|
|
// synchronization in this suite, we must complete the first sync,
|
|
// otherwise some of the tests may fail.
|
|
replicateExpectingSuccess(Server, ClientA)(next);
|
|
});
|
|
|
|
it('propagates CREATE', function(done) {
|
|
async.series([
|
|
sync(ClientA, Server),
|
|
sync(ClientB, Server)
|
|
], done);
|
|
});
|
|
|
|
it('propagates CREATE+UPDATE', function(done) {
|
|
async.series([
|
|
// NOTE: ClientB has not fetched the new model instance yet
|
|
updateSourceInstanceNameTo('v2'),
|
|
sync(ClientA, Server),
|
|
|
|
// ClientB fetches the created & updated instance from the server
|
|
sync(ClientB, Server),
|
|
], done);
|
|
});
|
|
|
|
it('propagates DELETE', function(done) {
|
|
async.series([
|
|
// NOTE: ClientB has not fetched the new model instance yet
|
|
updateSourceInstanceNameTo('v2'),
|
|
sync(ClientA, Server),
|
|
|
|
// ClientB fetches the created & updated instance from the server
|
|
sync(ClientB, Server),
|
|
], done);
|
|
|
|
});
|
|
|
|
it('does not report false conflicts', function(done) {
|
|
async.series([
|
|
// client A makes some work
|
|
updateSourceInstanceNameTo('v2'),
|
|
sync(ClientA, Server),
|
|
|
|
// ClientB fetches the change from the server
|
|
sync(ClientB, Server),
|
|
verifySourceWasReplicated(ClientB),
|
|
|
|
// client B makes some work
|
|
updateClientB('v5'),
|
|
sync(Server, ClientB),
|
|
updateClientB('v6'),
|
|
sync(ClientB, Server),
|
|
|
|
// client A fetches the changes
|
|
sync(ClientA, Server)
|
|
], done);
|
|
});
|
|
|
|
it('handles UPDATE conflict resolved using "ours"', function(done) {
|
|
testUpdateConflictIsResolved(
|
|
function resolveUsingOurs(conflict, cb) {
|
|
conflict.resolveUsingSource(cb);
|
|
},
|
|
done);
|
|
});
|
|
|
|
it('handles UPDATE conflict resolved using "theirs"', function(done) {
|
|
testUpdateConflictIsResolved(
|
|
function resolveUsingTheirs(conflict, cb) {
|
|
// We sync ClientA->Server first
|
|
expect(conflict.SourceModel.modelName)
|
|
.to.equal(ClientB.modelName);
|
|
conflict.resolveUsingTarget(cb);
|
|
},
|
|
done);
|
|
});
|
|
|
|
it('handles UPDATE conflict resolved manually', function(done) {
|
|
testUpdateConflictIsResolved(
|
|
function resolveManually(conflict, cb) {
|
|
conflict.resolveManually({ name: 'manual' }, cb);
|
|
},
|
|
done);
|
|
});
|
|
|
|
it('handles DELETE conflict resolved using "ours"', function(done) {
|
|
testDeleteConflictIsResolved(
|
|
function resolveUsingOurs(conflict, cb) {
|
|
conflict.resolveUsingSource(cb);
|
|
},
|
|
done);
|
|
});
|
|
|
|
it('handles DELETE conflict resolved using "theirs"', function(done) {
|
|
testDeleteConflictIsResolved(
|
|
function resolveUsingTheirs(conflict, cb) {
|
|
// We sync ClientA->Server first
|
|
expect(conflict.SourceModel.modelName)
|
|
.to.equal(ClientB.modelName);
|
|
conflict.resolveUsingTarget(cb);
|
|
},
|
|
done);
|
|
});
|
|
|
|
it('handles DELETE conflict resolved as manual delete', function(done) {
|
|
testDeleteConflictIsResolved(
|
|
function resolveManually(conflict, cb) {
|
|
conflict.resolveManually(null, cb);
|
|
},
|
|
done);
|
|
});
|
|
|
|
it('handles DELETE conflict resolved manually', function(done) {
|
|
testDeleteConflictIsResolved(
|
|
function resolveManually(conflict, cb) {
|
|
conflict.resolveManually({ name: 'manual' }, cb);
|
|
},
|
|
done);
|
|
});
|
|
});
|
|
|
|
function testUpdateConflictIsResolved(resolver, cb) {
|
|
async.series([
|
|
// sync the new model to ClientB
|
|
sync(ClientB, Server),
|
|
verifyInstanceWasReplicated(ClientA, ClientB, sourceInstanceId),
|
|
|
|
// ClientA makes a change
|
|
updateSourceInstanceNameTo('a'),
|
|
sync(ClientA, Server),
|
|
|
|
// ClientB changes the same instance
|
|
updateClientB('b'),
|
|
|
|
function syncAndResolveConflict(next) {
|
|
replicate(ClientB, Server, function(err, conflicts, cps) {
|
|
if (err) return next(err);
|
|
|
|
expect(conflicts).to.have.length(1);
|
|
expect(conflicts[0].SourceModel.modelName)
|
|
.to.equal(ClientB.modelName);
|
|
|
|
debug('Resolving the conflict %j', conflicts[0]);
|
|
resolver(conflicts[0], next);
|
|
});
|
|
},
|
|
|
|
// repeat the last sync, it should pass now
|
|
sync(ClientB, Server),
|
|
// and sync back to ClientA too
|
|
sync(ClientA, Server),
|
|
|
|
verifyInstanceWasReplicated(ClientB, ClientA, sourceInstanceId)
|
|
], cb);
|
|
}
|
|
|
|
function testDeleteConflictIsResolved(resolver, cb) {
|
|
async.series([
|
|
// sync the new model to ClientB
|
|
sync(ClientB, Server),
|
|
verifyInstanceWasReplicated(ClientA, ClientB, sourceInstanceId),
|
|
|
|
// ClientA makes a change
|
|
function deleteInstanceOnClientA(next) {
|
|
ClientA.deleteById(sourceInstanceId, next);
|
|
},
|
|
|
|
sync(ClientA, Server),
|
|
|
|
// ClientB changes the same instance
|
|
updateClientB('b'),
|
|
|
|
function syncAndResolveConflict(next) {
|
|
replicate(ClientB, Server, function(err, conflicts, cps) {
|
|
if (err) return next(err);
|
|
|
|
expect(conflicts).to.have.length(1);
|
|
expect(conflicts[0].SourceModel.modelName)
|
|
.to.equal(ClientB.modelName);
|
|
|
|
debug('Resolving the conflict %j', conflicts[0]);
|
|
resolver(conflicts[0], next);
|
|
});
|
|
},
|
|
|
|
// repeat the last sync, it should pass now
|
|
sync(ClientB, Server),
|
|
// and sync back to ClientA too
|
|
sync(ClientA, Server),
|
|
|
|
verifyInstanceWasReplicated(ClientB, ClientA, sourceInstanceId)
|
|
], cb);
|
|
}
|
|
|
|
function updateClientB(name) {
|
|
return function updateInstanceB(next) {
|
|
ClientB.findById(sourceInstanceId, function(err, instance) {
|
|
if (err) return next(err);
|
|
instance.name = name;
|
|
instance.save(next);
|
|
});
|
|
};
|
|
}
|
|
|
|
function sync(client, server) {
|
|
return function syncBothWays(next) {
|
|
async.series([
|
|
// NOTE(bajtos) It's important to replicate from the client to the
|
|
// server first, so that we can resolve any conflicts at the client
|
|
replicateExpectingSuccess(client, server),
|
|
replicateExpectingSuccess(server, client)
|
|
], next);
|
|
};
|
|
}
|
|
|
|
});
|
|
|
|
function updateSourceInstanceNameTo(value) {
|
|
return function updateInstance(next) {
|
|
debug('update source instance name to %j', value);
|
|
sourceInstance.name = value;
|
|
sourceInstance.save(next);
|
|
};
|
|
}
|
|
|
|
function deleteSourceInstance(value) {
|
|
return function deleteInstance(next) {
|
|
debug('delete source instance', value);
|
|
sourceInstance.remove(function(err) {
|
|
sourceInstance = null;
|
|
next(err);
|
|
});
|
|
};
|
|
}
|
|
|
|
function verifySourceWasReplicated(target) {
|
|
if (!target) target = TargetModel;
|
|
return function verify(next) {
|
|
target.findById(sourceInstanceId, function(err, targetInstance) {
|
|
if (err) return next(err);
|
|
expect(targetInstance && targetInstance.toObject())
|
|
.to.eql(sourceInstance && sourceInstance.toObject());
|
|
next();
|
|
});
|
|
};
|
|
}
|
|
});
|
|
|
|
var _since = {};
|
|
function replicate(source, target, since, next) {
|
|
if (typeof since === 'function') {
|
|
next = since;
|
|
since = undefined;
|
|
}
|
|
|
|
var sinceIx = source.modelName + ':to:' + target.modelName;
|
|
if (since === undefined) {
|
|
since = useSinceFilter ?
|
|
_since[sinceIx] || -1 :
|
|
-1;
|
|
}
|
|
|
|
debug('replicate from %s to %s since %j',
|
|
source.modelName, target.modelName, since);
|
|
|
|
source.replicate(since, target, function(err, conflicts, cps) {
|
|
if (err) return next(err);
|
|
if (conflicts.length === 0) {
|
|
_since[sinceIx] = cps;
|
|
}
|
|
next(err, conflicts, cps);
|
|
});
|
|
}
|
|
|
|
function createModel(Model, data) {
|
|
return function create(next) {
|
|
Model.create(data, next);
|
|
};
|
|
}
|
|
|
|
function replicateExpectingSuccess(source, target, since) {
|
|
if (!source) source = SourceModel;
|
|
if (!target) target = TargetModel;
|
|
|
|
return function doReplicate(next) {
|
|
replicate(source, target, since, function(err, conflicts, cps) {
|
|
if (err) return next(err);
|
|
if (conflicts.length) {
|
|
return next(new Error('Unexpected conflicts\n' +
|
|
conflicts.map(JSON.stringify).join('\n')));
|
|
}
|
|
next();
|
|
});
|
|
};
|
|
}
|
|
|
|
function setupRaceConditionInReplication(fn) {
|
|
var bulkUpdate = TargetModel.bulkUpdate;
|
|
TargetModel.bulkUpdate = function(data, cb) {
|
|
// simulate the situation when a 3rd party modifies the database
|
|
// while a replication run is in progress
|
|
var self = this;
|
|
fn(function(err) {
|
|
if (err) return cb(err);
|
|
bulkUpdate.call(self, data, cb);
|
|
});
|
|
|
|
// apply the 3rd party modification only once
|
|
TargetModel.bulkUpdate = bulkUpdate;
|
|
};
|
|
}
|
|
|
|
function verifyInstanceWasReplicated(source, target, id) {
|
|
return function verify(next) {
|
|
source.findById(id, function(err, expected) {
|
|
if (err) return next(err);
|
|
target.findById(id, function(err, actual) {
|
|
if (err) return next(err);
|
|
expect(actual && actual.toObject())
|
|
.to.eql(expected && expected.toObject());
|
|
debug('replicated instance: %j', actual);
|
|
next();
|
|
});
|
|
});
|
|
};
|
|
}
|
|
|
|
function spyAndStoreSinceArg(Model, methodName, store) {
|
|
var orig = Model[methodName];
|
|
Model[methodName] = function(since) {
|
|
store.push(since);
|
|
orig.apply(this, arguments);
|
|
};
|
|
}
|
|
|
|
function getPropValue(obj, name) {
|
|
return Array.isArray(obj) ?
|
|
obj.map(function(it) { return getPropValue(it, name); }) :
|
|
obj[name];
|
|
}
|
|
|
|
function getIds(list) {
|
|
return getPropValue(list, 'id');
|
|
}
|
|
});
|