Promisify 'PersistedModel - replication'

This commit is contained in:
Pradnya Baviskar 2015-07-20 17:30:05 +05:30 committed by Miroslav Bajtoš
parent 9d776d7c18
commit 64a1dbadc8
2 changed files with 121 additions and 26 deletions

View File

@ -8,6 +8,7 @@ var async = require('async');
var deprecated = require('depd')('loopback'); var deprecated = require('depd')('loopback');
var debug = require('debug')('loopback:persisted-model'); var debug = require('debug')('loopback:persisted-model');
var PassThrough = require('stream').PassThrough; var PassThrough = require('stream').PassThrough;
var utils = require('./utils');
module.exports = function(registry) { module.exports = function(registry) {
var Model = registry.getModel('Model'); var Model = registry.getModel('Model');
@ -916,9 +917,7 @@ module.exports = function(registry) {
options = options || {}; options = options || {};
var sourceModel = this; var sourceModel = this;
callback = callback || function defaultReplicationCallback(err) { callback = callback || utils.createPromiseCallback();
if (err) throw err;
};
debug('replicating %s since %s to %s since %s', debug('replicating %s since %s to %s since %s',
sourceModel.modelName, sourceModel.modelName,
@ -944,6 +943,7 @@ module.exports = function(registry) {
var MAX_ATTEMPTS = 3; var MAX_ATTEMPTS = 3;
run(1, since); run(1, since);
return callback.promise;
function run(attempt, since) { function run(attempt, since) {
debug('\titeration #%s', attempt); debug('\titeration #%s', attempt);

View File

@ -82,28 +82,23 @@ describe('Replication / Change APIs', function() {
}); });
describe('Model.replicate(since, targetModel, options, callback)', function() { describe('Model.replicate(since, targetModel, options, callback)', function() {
it('Replicate data using the target model', function(done) {
var test = this; function assertTargetModelEqualsSourceModel(conflicts, sourceModel,
var options = {}; targetModel, done) {
var sourceData; var sourceData;
var targetData; var targetData;
this.SourceModel.create({name: 'foo'}, function(err) {
if (err) return done(err);
test.SourceModel.replicate(test.startingCheckpoint, test.TargetModel,
options, function(err, conflicts) {
if (err) return done(err);
assert(conflicts.length === 0); assert(conflicts.length === 0);
async.parallel([ async.parallel([
function(cb) { function(cb) {
test.SourceModel.find(function(err, result) { sourceModel.find(function(err, result) {
if (err) return cb(err); if (err) return cb(err);
sourceData = result; sourceData = result;
cb(); cb();
}); });
}, },
function(cb) { function(cb) {
test.TargetModel.find(function(err, result) { targetModel.find(function(err, result) {
if (err) return cb(err); if (err) return cb(err);
targetData = result; targetData = result;
cb(); cb();
@ -115,6 +110,38 @@ describe('Replication / Change APIs', function() {
assert.deepEqual(sourceData, targetData); assert.deepEqual(sourceData, targetData);
done(); done();
}); });
}
it('Replicate data using the target model', function(done) {
var test = this;
var options = {};
this.SourceModel.create({name: 'foo'}, function(err) {
if (err) return done(err);
test.SourceModel.replicate(test.startingCheckpoint, test.TargetModel,
options, function(err, conflicts) {
if (err) return done(err);
assertTargetModelEqualsSourceModel(conflicts, test.SourceModel,
test.TargetModel, done);
});
});
});
it('Replicate data using the target model - promise variant', function(done) {
var test = this;
var options = {};
this.SourceModel.create({name: 'foo'}, function(err) {
if (err) return done(err);
test.SourceModel.replicate(test.startingCheckpoint, test.TargetModel,
options)
.then(function(conflicts) {
assertTargetModelEqualsSourceModel(conflicts, test.SourceModel,
test.TargetModel, done);
})
.catch(function(err) {
done(err);
}); });
}); });
}); });
@ -147,6 +174,38 @@ describe('Replication / Change APIs', function() {
], done); ], done);
}); });
it('applies "since" filter on source changes - promise variant', function(done) {
async.series([
function createModelInSourceCp1(next) {
SourceModel.create({ id: '1' }, next);
},
function checkpoint(next) {
SourceModel.checkpoint(next);
},
function createModelInSourceCp2(next) {
SourceModel.create({ id: '2' }, next);
},
function replicateLastChangeOnly(next) {
SourceModel.currentCheckpoint(function(err, cp) {
if (err) return done(err);
SourceModel.replicate(cp, TargetModel, {})
.then(function(next) {
done();
})
.catch(err);
});
},
function verify(next) {
TargetModel.find(function(err, list) {
if (err) return done(err);
// '1' should be skipped by replication
expect(getIds(list)).to.eql(['2']);
next();
});
}
], done);
});
it('applies "since" filter on target changes', function(done) { it('applies "since" filter on target changes', function(done) {
// Because the "since" filter is just an optimization, // Because the "since" filter is just an optimization,
// there isn't really any observable behaviour we could // there isn't really any observable behaviour we could
@ -161,6 +220,23 @@ describe('Replication / Change APIs', function() {
}); });
}); });
it('applies "since" filter on target changes - promise variant', function(done) {
// Because the "since" filter is just an optimization,
// there isn't really any observable behaviour we could
// check to assert correct implementation.
var diffSince = [];
spyAndStoreSinceArg(TargetModel, 'diff', diffSince);
SourceModel.replicate(10, TargetModel, {})
.then(function() {
expect(diffSince).to.eql([10]);
done();
})
.catch(function(err) {
done(err);
});
});
it('uses different "since" value for source and target', function(done) { it('uses different "since" value for source and target', function(done) {
var sourceSince = []; var sourceSince = [];
var targetSince = []; var targetSince = [];
@ -177,6 +253,25 @@ describe('Replication / Change APIs', function() {
}); });
}); });
it('uses different "since" value for source and target - promise variant', function(done) {
var sourceSince = [];
var targetSince = [];
spyAndStoreSinceArg(SourceModel, 'changes', sourceSince);
spyAndStoreSinceArg(TargetModel, 'diff', targetSince);
var since = { source: 1, target: 2 };
SourceModel.replicate(since, TargetModel, {})
.then(function() {
expect(sourceSince).to.eql([1]);
expect(targetSince).to.eql([2]);
done();
})
.catch(function(err) {
done(err);
});
});
it('picks up changes made during replication', function(done) { it('picks up changes made during replication', function(done) {
setupRaceConditionInReplication(function(cb) { setupRaceConditionInReplication(function(cb) {
// simulate the situation when another model is created // simulate the situation when another model is created