Add mutli tenancy to replication

This commit is contained in:
Kogulan Baskaran 2016-11-15 17:45:39 +11:00
parent d06190dae6
commit 939cf5ae5c
5 changed files with 242 additions and 36 deletions

View File

@ -186,27 +186,51 @@ module.exports = function(Change) {
cb = cb || utils.createPromiseCallback();
change.currentRevision(function(err, rev) {
var model = this.getModelCtor();
var id = this.getModelId();
model.findById(id, function(err, inst) {
if (err) return cb(err);
// avoid setting rev and prev to the same value
if (currentRev === rev) {
change.debug('rev and prev are equal (not updating anything)');
return cb(null, change);
}
change.tenant = getTenant(model, inst);
// FIXME(@bajtos) Allo callers to pass in the checkpoint value
// (or even better - a memoized async function to get the cp value)
// That will enable `rectifyAll` to cache the checkpoint value
change.constructor.getCheckpointModel().current(
function(err, checkpoint) {
if (err) return cb(err);
doRectify(checkpoint, rev);
change.currentRevision(inst, function(err, rev) {
if (err) return cb(err);
// avoid setting rev and prev to the same value
if (currentRev === rev) {
change.debug('rev and prev are equal (not updating anything)');
return cb(null, change);
}
);
// FIXME(@bajtos) Allow callers to pass in the checkpoint value
// (or even better - a memoized async function to get the cp value)
// That will enable `rectifyAll` to cache the checkpoint value
change.constructor.getCheckpointModel().current(
function(err, checkpoint) {
if (err) return cb(err);
doRectify(checkpoint, rev);
}
);
});
});
return cb.promise;
function getTenant(model, inst) {
var tenant = null;
var tenantProperty;
if (model && model.settings && model.settings.tenantProperty) {
tenantProperty = model.settings.tenantProperty;
}
if (tenantProperty && inst && inst[tenantProperty]) {
tenant = inst[tenantProperty];
}
return tenant;
}
function doRectify(checkpoint, rev) {
if (rev) {
if (currentRev === rev) {
@ -230,7 +254,7 @@ module.exports = function(Change) {
if (currentRev) {
change.prev = currentRev;
} else if (!change.prev) {
change.debug('ERROR - could not determing prev');
change.debug('ERROR - could not determining prev');
change.prev = Change.UNKNOWN;
}
change.debug('updated prev');
@ -254,23 +278,35 @@ module.exports = function(Change) {
/**
* Get a change's current revision based on current data.
* @param {Object} instance Optional instance object to get the revision string for, if not provided it will be queried
* @callback {Function} callback
* @param {Error} err
* @param {String} rev The current revision
*/
Change.prototype.currentRevision = function(cb) {
Change.prototype.currentRevision = function(instance, cb) {
// if only one argument was provided, then set that as the callback
if (typeof instance === 'function') {
cb = instance;
instance = null;
}
cb = cb || utils.createPromiseCallback();
var model = this.getModelCtor();
var id = this.getModelId();
model.findById(id, function(err, inst) {
if (err) return cb(err);
if (inst) {
cb(null, Change.revisionForInst(inst));
} else {
cb(null, null);
}
});
if (instance) {
cb(null, Change.revisionForInst(instance));
} else {
var model = this.getModelCtor();
var id = this.getModelId();
model.findById(id, function(err, inst) {
if (err) return cb(err);
if (inst) {
cb(null, Change.revisionForInst(inst));
} else {
cb(null, null);
}
});
}
return cb.promise;
};

View File

@ -20,6 +20,10 @@
},
"modelId": {
"type": "string"
},
"tenant": {
"type": "string",
"default": null
}
}
}

View File

@ -996,12 +996,13 @@ module.exports = function(registry) {
* to reduce the number of results returned.
* @param {Number} since Return only changes since this checkpoint.
* @param {Object} filter Include only changes that match this filter, the same as for [#persistedmodel-find](find()).
* @param {String} tenant Include only changes for this tenant
* @callback {Function} callback Callback function called with `(err, changes)` arguments. Required.
* @param {Error} err Error object; see [Error object](http://docs.strongloop.com/display/LB/Error+object).
* @param {Array} changes An array of [Change](#change) objects.
*/
PersistedModel.changes = function(since, filter, callback) {
PersistedModel.changes = function(since, filter, tenant, callback) {
if (typeof since === 'function') {
filter = {};
callback = since;
@ -1012,10 +1013,24 @@ module.exports = function(registry) {
since = -1;
filter = {};
}
if (typeof tenant === 'function') {
callback = tenant;
tenant = null;
}
var idName = this.dataSource.idName(this.modelName);
var Change = this.getChangeModel();
var model = this;
var changeFilter = {
where: {
checkpoint: {gte: since},
modelName: this.modelName
}
};
if (tenant) {
changeFilter.where.tenant = tenant;
}
filter = filter || {};
filter.fields = {};
@ -1023,10 +1038,7 @@ module.exports = function(registry) {
filter.fields[idName] = true;
// TODO(ritch) this whole thing could be optimized a bit more
Change.find({ where: {
checkpoint: { gte: since },
modelName: this.modelName
}}, function(err, changes) {
Change.find(changeFilter, function(err, changes) {
if (err) return callback(err);
if (!Array.isArray(changes) || changes.length === 0) return callback(null, []);
var ids = changes.map(function(change) {
@ -1172,7 +1184,7 @@ module.exports = function(registry) {
async.waterfall(tasks, done);
function getSourceChanges(cb) {
sourceModel.changes(since.source, options.filter, debug.enabled ? log : cb);
sourceModel.changes(since.source, options.filter, options.tenant, debug.enabled ? log : cb);
function log(err, result) {
if (err) return cb(err);

View File

@ -6,10 +6,10 @@
var async = require('async');
var expect = require('chai').expect;
var Change;
var TestModel;
describe('Change', function() {
var Change;
var TestModel;
beforeEach(function() {
var memory = loopback.createDataSource({
connector: loopback.Memory
@ -233,6 +233,14 @@ describe('Change', function() {
});
});
it('should create a new change with tenant set as null', function(done) {
var test = this;
change.rectify(function(err, ch) {
assert.equal(ch.tenant, null);
done();
});
});
// This test is a low-level equivalent of the test in replication.test.js
// called "replicates multiple updates within the same CP"
it('should merge updates within the same checkpoint', function(done) {
@ -320,7 +328,7 @@ describe('Change', function() {
change.rectify()
.then(function(ch) {
assert.equal(ch.rev, test.revisionForModel);
assert.equal(ch.tenant, null);
done();
})
.catch(done);
@ -343,6 +351,22 @@ describe('Change', function() {
});
});
describe('change.currentRevision(instance, callback)', function() {
it('should get the correct revision', function(done) {
var test = this;
var change = new Change({
modelName: this.modelName,
modelId: this.modelId
});
change.currentRevision(this.model, function(err, rev) {
assert.equal(rev, test.revisionForModel);
done();
});
});
});
describe('change.currentRevision - promise variant', function() {
it('should get the correct revision', function(done) {
var test = this;
@ -544,6 +568,7 @@ describe('Change', function() {
modelName: updateRecord.modelName,
prev: 'foo', // this is the current local revision
rev: 'foo-new',
tenant: null,
});
done();
@ -573,6 +598,7 @@ describe('Change', function() {
modelName: updateRecord.modelName,
prev: 'foo', // this is the current local revision
rev: 'foo-new',
tenant: null,
});
done();
@ -601,6 +627,7 @@ describe('Change', function() {
modelName: updateRecord.modelName,
prev: null, // this is the current local revision
rev: 'new-rev',
tenant: null,
});
done();
@ -608,3 +635,62 @@ describe('Change', function() {
});
});
});
describe('Change with multi tenancy', function() {
var ChangeWithTenant;
var TestModelWithTenant;
beforeEach(function() {
var memory = loopback.createDataSource({
connector: loopback.Memory
});
TestModelWithTenant = loopback.PersistedModel.extend('ChangeTestModel',
{
id: {id: true, type: 'string', defaultFn: 'guid'}
},
{
trackChanges: true,
tenantProperty: 'tenantId'
});
this.modelName = TestModelWithTenant.modelName;
TestModelWithTenant.attachTo(memory);
ChangeWithTenant = TestModelWithTenant.getChangeModel();
});
describe('change.rectify - tenant variant', function() {
var change;
var tenantId = '123';
beforeEach(function(done) {
ChangeWithTenant = TestModelWithTenant.getChangeModel();
var test = this;
test.data = {
foo: 'bar',
tenantId: tenantId
};
TestModelWithTenant.create(test.data, function(err, model) {
if (err) return done(err);
test.revisionForModel = ChangeWithTenant.revisionForInst(model);
ChangeWithTenant.findOrCreateChange(TestModelWithTenant.modelName, model.id)
.then(function(ch) {
change = ch;
done();
}).catch(done);
});
});
it('should create a new change with the correct tenant and revision', function(done) {
var test = this;
change.rectify()
.then(function(ch) {
assert.equal(ch.rev, test.revisionForModel);
assert.equal(ch.tenant, tenantId);
done();
})
.catch(done);
});
});
});

View File

@ -1642,3 +1642,71 @@ describe('Replication / Change APIs', function() {
return getPropValue(list, 'id');
}
});
describe('Replication / Change APIs with multi tenancy', function() {
this.timeout(10000);
var dataSource, SourceModelWithTenant, TargetModelWithTenant;
var useSinceFilter;
var tid = 0; // per-test unique id used e.g. to build unique model names
beforeEach(function() {
tid++;
useSinceFilter = false;
var test = this;
dataSource = this.dataSource = loopback.createDataSource({
connector: loopback.Memory
});
SourceModelWithTenant = this.SourceModelWithTenant = PersistedModel.extend(
'SourceModelWithTenant-' + tid,
{ id: { id: true, type: String, defaultFn: 'guid' } },
{ trackChanges: true, tenantProperty: 'tenantId' });
SourceModelWithTenant.attachTo(dataSource);
TargetModelWithTenant = this.TargetModelWithTenant = PersistedModel.extend(
'TargetModelWithTenant-' + tid,
{ id: { id: true, type: String, defaultFn: 'guid' } },
{ trackChanges: true, tenantProperty: 'tenantId' });
var TargetWithTenantChange = TargetModelWithTenant.Change;
TargetWithTenantChange.Checkpoint = loopback.Checkpoint.extend('TargetWithTenantCheckpoint');
TargetWithTenantChange.Checkpoint.attachTo(dataSource);
TargetModelWithTenant.attachTo(dataSource);
test.startingCheckpoint = -1;
});
describe('Model.changes(since, filter, tenant, callback)', function() {
it('Get changes since the given checkpoint for the given tenant', function(done) {
var test = this;
this.SourceModelWithTenant.create([{name: 'foo', tenantId: '123'}, {name: 'foo', tenantId: '456'}], function(err) {
if (err) return done(err);
setTimeout(function() {
test.SourceModelWithTenant.changes(test.startingCheckpoint, {}, '123', function(err, changes) {
assert.equal(changes.length, 1);
done();
});
}, 1);
});
});
it('excludes changes from older checkpoints', function(done) {
var FUTURE_CHECKPOINT = 999;
SourceModelWithTenant.create({ name: 'foo', tenantId: '123' }, function(err) {
if (err) return done(err);
SourceModelWithTenant.changes(FUTURE_CHECKPOINT, {}, '123', function(err, changes) {
if (err) return done(err);
expect(changes).to.be.empty; //jshint ignore:line
done();
});
});
});
});
});