Add mutli tenancy to replication
This commit is contained in:
parent
d06190dae6
commit
42cbd233e9
|
@ -186,27 +186,51 @@ module.exports = function(Change) {
|
||||||
|
|
||||||
cb = cb || utils.createPromiseCallback();
|
cb = cb || utils.createPromiseCallback();
|
||||||
|
|
||||||
change.currentRevision(function(err, rev) {
|
var model = this.getModelCtor();
|
||||||
|
var id = this.getModelId();
|
||||||
|
|
||||||
|
model.findById(id, function(err, inst) {
|
||||||
if (err) return cb(err);
|
if (err) return cb(err);
|
||||||
|
|
||||||
// avoid setting rev and prev to the same value
|
change.tenant = getTenant(model, inst);
|
||||||
if (currentRev === rev) {
|
|
||||||
change.debug('rev and prev are equal (not updating anything)');
|
|
||||||
return cb(null, change);
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIXME(@bajtos) Allo callers to pass in the checkpoint value
|
change.currentRevision(inst, function(err, rev) {
|
||||||
// (or even better - a memoized async function to get the cp value)
|
if (err) return cb(err);
|
||||||
// That will enable `rectifyAll` to cache the checkpoint value
|
|
||||||
change.constructor.getCheckpointModel().current(
|
// avoid setting rev and prev to the same value
|
||||||
function(err, checkpoint) {
|
if (currentRev === rev) {
|
||||||
if (err) return cb(err);
|
change.debug('rev and prev are equal (not updating anything)');
|
||||||
doRectify(checkpoint, rev);
|
return cb(null, change);
|
||||||
}
|
}
|
||||||
);
|
|
||||||
|
// FIXME(@bajtos) Allow callers to pass in the checkpoint value
|
||||||
|
// (or even better - a memoized async function to get the cp value)
|
||||||
|
// That will enable `rectifyAll` to cache the checkpoint value
|
||||||
|
change.constructor.getCheckpointModel().current(
|
||||||
|
function(err, checkpoint) {
|
||||||
|
if (err) return cb(err);
|
||||||
|
doRectify(checkpoint, rev);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
return cb.promise;
|
return cb.promise;
|
||||||
|
|
||||||
|
function getTenant(model, inst) {
|
||||||
|
var tenant = null;
|
||||||
|
var tenantProperty;
|
||||||
|
|
||||||
|
if (model && model.settings && model.settings.tenantProperty) {
|
||||||
|
tenantProperty = model.settings.tenantProperty;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tenantProperty && inst && inst[tenantProperty]) {
|
||||||
|
tenant = inst[tenantProperty];
|
||||||
|
}
|
||||||
|
|
||||||
|
return tenant;
|
||||||
|
}
|
||||||
|
|
||||||
function doRectify(checkpoint, rev) {
|
function doRectify(checkpoint, rev) {
|
||||||
if (rev) {
|
if (rev) {
|
||||||
if (currentRev === rev) {
|
if (currentRev === rev) {
|
||||||
|
@ -230,7 +254,7 @@ module.exports = function(Change) {
|
||||||
if (currentRev) {
|
if (currentRev) {
|
||||||
change.prev = currentRev;
|
change.prev = currentRev;
|
||||||
} else if (!change.prev) {
|
} else if (!change.prev) {
|
||||||
change.debug('ERROR - could not determing prev');
|
change.debug('ERROR - could not determining prev');
|
||||||
change.prev = Change.UNKNOWN;
|
change.prev = Change.UNKNOWN;
|
||||||
}
|
}
|
||||||
change.debug('updated prev');
|
change.debug('updated prev');
|
||||||
|
@ -254,23 +278,35 @@ module.exports = function(Change) {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a change's current revision based on current data.
|
* Get a change's current revision based on current data.
|
||||||
|
* @param {Object} instance Optional instance object to get the revision string for, if not provided it will be queried
|
||||||
* @callback {Function} callback
|
* @callback {Function} callback
|
||||||
* @param {Error} err
|
* @param {Error} err
|
||||||
* @param {String} rev The current revision
|
* @param {String} rev The current revision
|
||||||
*/
|
*/
|
||||||
|
|
||||||
Change.prototype.currentRevision = function(cb) {
|
Change.prototype.currentRevision = function(instance, cb) {
|
||||||
|
// if only one argument was provided, then set that as the callback
|
||||||
|
if (typeof instance === 'function') {
|
||||||
|
cb = instance;
|
||||||
|
instance = null;
|
||||||
|
}
|
||||||
|
|
||||||
cb = cb || utils.createPromiseCallback();
|
cb = cb || utils.createPromiseCallback();
|
||||||
var model = this.getModelCtor();
|
|
||||||
var id = this.getModelId();
|
if (instance) {
|
||||||
model.findById(id, function(err, inst) {
|
cb(null, Change.revisionForInst(instance));
|
||||||
if (err) return cb(err);
|
} else {
|
||||||
if (inst) {
|
var model = this.getModelCtor();
|
||||||
cb(null, Change.revisionForInst(inst));
|
var id = this.getModelId();
|
||||||
} else {
|
model.findById(id, function(err, inst) {
|
||||||
cb(null, null);
|
if (err) return cb(err);
|
||||||
}
|
if (inst) {
|
||||||
});
|
cb(null, Change.revisionForInst(inst));
|
||||||
|
} else {
|
||||||
|
cb(null, null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
return cb.promise;
|
return cb.promise;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,10 @@
|
||||||
},
|
},
|
||||||
"modelId": {
|
"modelId": {
|
||||||
"type": "string"
|
"type": "string"
|
||||||
|
},
|
||||||
|
"tenant": {
|
||||||
|
"type": "string",
|
||||||
|
"default": null
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -996,12 +996,13 @@ module.exports = function(registry) {
|
||||||
* to reduce the number of results returned.
|
* to reduce the number of results returned.
|
||||||
* @param {Number} since Return only changes since this checkpoint.
|
* @param {Number} since Return only changes since this checkpoint.
|
||||||
* @param {Object} filter Include only changes that match this filter, the same as for [#persistedmodel-find](find()).
|
* @param {Object} filter Include only changes that match this filter, the same as for [#persistedmodel-find](find()).
|
||||||
|
* @param {String} tenant Include only changes for this tenant
|
||||||
* @callback {Function} callback Callback function called with `(err, changes)` arguments. Required.
|
* @callback {Function} callback Callback function called with `(err, changes)` arguments. Required.
|
||||||
* @param {Error} err Error object; see [Error object](http://docs.strongloop.com/display/LB/Error+object).
|
* @param {Error} err Error object; see [Error object](http://docs.strongloop.com/display/LB/Error+object).
|
||||||
* @param {Array} changes An array of [Change](#change) objects.
|
* @param {Array} changes An array of [Change](#change) objects.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
PersistedModel.changes = function(since, filter, callback) {
|
PersistedModel.changes = function(since, filter, tenant, callback) {
|
||||||
if (typeof since === 'function') {
|
if (typeof since === 'function') {
|
||||||
filter = {};
|
filter = {};
|
||||||
callback = since;
|
callback = since;
|
||||||
|
@ -1012,10 +1013,24 @@ module.exports = function(registry) {
|
||||||
since = -1;
|
since = -1;
|
||||||
filter = {};
|
filter = {};
|
||||||
}
|
}
|
||||||
|
if (typeof tenant === 'function') {
|
||||||
|
callback = tenant;
|
||||||
|
tenant = null;
|
||||||
|
}
|
||||||
|
|
||||||
var idName = this.dataSource.idName(this.modelName);
|
var idName = this.dataSource.idName(this.modelName);
|
||||||
var Change = this.getChangeModel();
|
var Change = this.getChangeModel();
|
||||||
var model = this;
|
var model = this;
|
||||||
|
var changeFilter = {
|
||||||
|
where: {
|
||||||
|
checkpoint: {gte: since},
|
||||||
|
modelName: this.modelName
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if (tenant) {
|
||||||
|
changeFilter.where.tenant = tenant;
|
||||||
|
}
|
||||||
|
|
||||||
filter = filter || {};
|
filter = filter || {};
|
||||||
filter.fields = {};
|
filter.fields = {};
|
||||||
|
@ -1023,10 +1038,7 @@ module.exports = function(registry) {
|
||||||
filter.fields[idName] = true;
|
filter.fields[idName] = true;
|
||||||
|
|
||||||
// TODO(ritch) this whole thing could be optimized a bit more
|
// TODO(ritch) this whole thing could be optimized a bit more
|
||||||
Change.find({ where: {
|
Change.find(changeFilter, function(err, changes) {
|
||||||
checkpoint: { gte: since },
|
|
||||||
modelName: this.modelName
|
|
||||||
}}, function(err, changes) {
|
|
||||||
if (err) return callback(err);
|
if (err) return callback(err);
|
||||||
if (!Array.isArray(changes) || changes.length === 0) return callback(null, []);
|
if (!Array.isArray(changes) || changes.length === 0) return callback(null, []);
|
||||||
var ids = changes.map(function(change) {
|
var ids = changes.map(function(change) {
|
||||||
|
@ -1172,7 +1184,7 @@ module.exports = function(registry) {
|
||||||
async.waterfall(tasks, done);
|
async.waterfall(tasks, done);
|
||||||
|
|
||||||
function getSourceChanges(cb) {
|
function getSourceChanges(cb) {
|
||||||
sourceModel.changes(since.source, options.filter, debug.enabled ? log : cb);
|
sourceModel.changes(since.source, options.filter, options.tenant, debug.enabled ? log : cb);
|
||||||
|
|
||||||
function log(err, result) {
|
function log(err, result) {
|
||||||
if (err) return cb(err);
|
if (err) return cb(err);
|
||||||
|
|
|
@ -6,10 +6,10 @@
|
||||||
var async = require('async');
|
var async = require('async');
|
||||||
var expect = require('chai').expect;
|
var expect = require('chai').expect;
|
||||||
|
|
||||||
var Change;
|
|
||||||
var TestModel;
|
|
||||||
|
|
||||||
describe('Change', function() {
|
describe('Change', function() {
|
||||||
|
var Change;
|
||||||
|
var TestModel;
|
||||||
|
|
||||||
beforeEach(function() {
|
beforeEach(function() {
|
||||||
var memory = loopback.createDataSource({
|
var memory = loopback.createDataSource({
|
||||||
connector: loopback.Memory
|
connector: loopback.Memory
|
||||||
|
@ -233,6 +233,14 @@ describe('Change', function() {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should create a new change with tenant set as null', function(done) {
|
||||||
|
var test = this;
|
||||||
|
change.rectify(function(err, ch) {
|
||||||
|
assert.equal(ch.tenant, null);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
// This test is a low-level equivalent of the test in replication.test.js
|
// This test is a low-level equivalent of the test in replication.test.js
|
||||||
// called "replicates multiple updates within the same CP"
|
// called "replicates multiple updates within the same CP"
|
||||||
it('should merge updates within the same checkpoint', function(done) {
|
it('should merge updates within the same checkpoint', function(done) {
|
||||||
|
@ -320,7 +328,7 @@ describe('Change', function() {
|
||||||
change.rectify()
|
change.rectify()
|
||||||
.then(function(ch) {
|
.then(function(ch) {
|
||||||
assert.equal(ch.rev, test.revisionForModel);
|
assert.equal(ch.rev, test.revisionForModel);
|
||||||
|
assert.equal(ch.tenant, null);
|
||||||
done();
|
done();
|
||||||
})
|
})
|
||||||
.catch(done);
|
.catch(done);
|
||||||
|
@ -343,6 +351,22 @@ describe('Change', function() {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('change.currentRevision(instance, callback)', function() {
|
||||||
|
it('should get the correct revision', function(done) {
|
||||||
|
var test = this;
|
||||||
|
var change = new Change({
|
||||||
|
modelName: this.modelName,
|
||||||
|
modelId: this.modelId
|
||||||
|
});
|
||||||
|
|
||||||
|
change.currentRevision(this.model, function(err, rev) {
|
||||||
|
assert.equal(rev, test.revisionForModel);
|
||||||
|
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe('change.currentRevision - promise variant', function() {
|
describe('change.currentRevision - promise variant', function() {
|
||||||
it('should get the correct revision', function(done) {
|
it('should get the correct revision', function(done) {
|
||||||
var test = this;
|
var test = this;
|
||||||
|
@ -544,6 +568,7 @@ describe('Change', function() {
|
||||||
modelName: updateRecord.modelName,
|
modelName: updateRecord.modelName,
|
||||||
prev: 'foo', // this is the current local revision
|
prev: 'foo', // this is the current local revision
|
||||||
rev: 'foo-new',
|
rev: 'foo-new',
|
||||||
|
tenant: null,
|
||||||
});
|
});
|
||||||
|
|
||||||
done();
|
done();
|
||||||
|
@ -573,6 +598,7 @@ describe('Change', function() {
|
||||||
modelName: updateRecord.modelName,
|
modelName: updateRecord.modelName,
|
||||||
prev: 'foo', // this is the current local revision
|
prev: 'foo', // this is the current local revision
|
||||||
rev: 'foo-new',
|
rev: 'foo-new',
|
||||||
|
tenant: null,
|
||||||
});
|
});
|
||||||
|
|
||||||
done();
|
done();
|
||||||
|
@ -601,6 +627,7 @@ describe('Change', function() {
|
||||||
modelName: updateRecord.modelName,
|
modelName: updateRecord.modelName,
|
||||||
prev: null, // this is the current local revision
|
prev: null, // this is the current local revision
|
||||||
rev: 'new-rev',
|
rev: 'new-rev',
|
||||||
|
tenant: null,
|
||||||
});
|
});
|
||||||
|
|
||||||
done();
|
done();
|
||||||
|
@ -608,3 +635,62 @@ describe('Change', function() {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('Change with multi tenancy', function() {
|
||||||
|
var ChangeWithTenant;
|
||||||
|
var TestModelWithTenant;
|
||||||
|
|
||||||
|
beforeEach(function() {
|
||||||
|
var memory = loopback.createDataSource({
|
||||||
|
connector: loopback.Memory
|
||||||
|
});
|
||||||
|
TestModelWithTenant = loopback.PersistedModel.extend('ChangeTestModel',
|
||||||
|
{
|
||||||
|
id: {id: true, type: 'string', defaultFn: 'guid'}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
trackChanges: true,
|
||||||
|
tenantProperty: 'tenantId'
|
||||||
|
});
|
||||||
|
this.modelName = TestModelWithTenant.modelName;
|
||||||
|
TestModelWithTenant.attachTo(memory);
|
||||||
|
ChangeWithTenant = TestModelWithTenant.getChangeModel();
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('change.rectify - tenant variant', function() {
|
||||||
|
var change;
|
||||||
|
var tenantId = '123';
|
||||||
|
|
||||||
|
beforeEach(function(done) {
|
||||||
|
ChangeWithTenant = TestModelWithTenant.getChangeModel();
|
||||||
|
var test = this;
|
||||||
|
test.data = {
|
||||||
|
foo: 'bar',
|
||||||
|
tenantId: tenantId
|
||||||
|
};
|
||||||
|
|
||||||
|
TestModelWithTenant.create(test.data, function(err, model) {
|
||||||
|
if (err) return done(err);
|
||||||
|
test.revisionForModel = ChangeWithTenant.revisionForInst(model);
|
||||||
|
|
||||||
|
ChangeWithTenant.findOrCreateChange(TestModelWithTenant.modelName, model.id)
|
||||||
|
.then(function(ch) {
|
||||||
|
change = ch;
|
||||||
|
done();
|
||||||
|
}).catch(done);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should create a new change with the correct tenant and revision', function(done) {
|
||||||
|
var test = this;
|
||||||
|
change.rectify()
|
||||||
|
.then(function(ch) {
|
||||||
|
assert.equal(ch.rev, test.revisionForModel);
|
||||||
|
assert.equal(ch.tenant, tenantId);
|
||||||
|
|
||||||
|
done();
|
||||||
|
})
|
||||||
|
.catch(done);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
|
@ -1642,3 +1642,71 @@ describe('Replication / Change APIs', function() {
|
||||||
return getPropValue(list, 'id');
|
return getPropValue(list, 'id');
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('Replication / Change APIs with multi tenancy', function() {
|
||||||
|
this.timeout(10000);
|
||||||
|
var dataSource, SourceModelWithTenant, TargetModelWithTenant;
|
||||||
|
var useSinceFilter;
|
||||||
|
var tid = 0; // per-test unique id used e.g. to build unique model names
|
||||||
|
|
||||||
|
beforeEach(function() {
|
||||||
|
tid++;
|
||||||
|
useSinceFilter = false;
|
||||||
|
var test = this;
|
||||||
|
|
||||||
|
dataSource = this.dataSource = loopback.createDataSource({
|
||||||
|
connector: loopback.Memory
|
||||||
|
});
|
||||||
|
SourceModelWithTenant = this.SourceModelWithTenant = PersistedModel.extend(
|
||||||
|
'SourceModelWithTenant-' + tid,
|
||||||
|
{ id: { id: true, type: String, defaultFn: 'guid' } },
|
||||||
|
{ trackChanges: true, tenantProperty: 'tenantId' });
|
||||||
|
|
||||||
|
SourceModelWithTenant.attachTo(dataSource);
|
||||||
|
|
||||||
|
TargetModelWithTenant = this.TargetModelWithTenant = PersistedModel.extend(
|
||||||
|
'TargetModelWithTenant-' + tid,
|
||||||
|
{ id: { id: true, type: String, defaultFn: 'guid' } },
|
||||||
|
{ trackChanges: true, tenantProperty: 'tenantId' });
|
||||||
|
|
||||||
|
var TargetWithTenantChange = TargetModelWithTenant.Change;
|
||||||
|
TargetWithTenantChange.Checkpoint = loopback.Checkpoint.extend('TargetWithTenantCheckpoint');
|
||||||
|
TargetWithTenantChange.Checkpoint.attachTo(dataSource);
|
||||||
|
|
||||||
|
TargetModelWithTenant.attachTo(dataSource);
|
||||||
|
|
||||||
|
test.startingCheckpoint = -1;
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('Model.changes(since, filter, tenant, callback)', function() {
|
||||||
|
it('Get changes since the given checkpoint for the given tenant', function(done) {
|
||||||
|
var test = this;
|
||||||
|
this.SourceModelWithTenant.create([{name: 'foo', tenantId: '123'}, {name: 'foo', tenantId: '456'}], function(err) {
|
||||||
|
if (err) return done(err);
|
||||||
|
|
||||||
|
setTimeout(function() {
|
||||||
|
test.SourceModelWithTenant.changes(test.startingCheckpoint, {}, '123', function(err, changes) {
|
||||||
|
assert.equal(changes.length, 1);
|
||||||
|
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
}, 1);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('excludes changes from older checkpoints', function(done) {
|
||||||
|
var FUTURE_CHECKPOINT = 999;
|
||||||
|
|
||||||
|
SourceModelWithTenant.create({ name: 'foo', tenantId: '123' }, function(err) {
|
||||||
|
if (err) return done(err);
|
||||||
|
SourceModelWithTenant.changes(FUTURE_CHECKPOINT, {}, '123', function(err, changes) {
|
||||||
|
if (err) return done(err);
|
||||||
|
|
||||||
|
expect(changes).to.be.empty; //jshint ignore:line
|
||||||
|
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
Loading…
Reference in New Issue