Merge pull request #2959 from kobaska/add-multi-tenancy
Support multi-tenancy in change replication - allow apps to extend Change model with extra properties
This commit is contained in:
commit
0448184a6c
|
@ -184,16 +184,32 @@ module.exports = function(Change) {
|
||||||
|
|
||||||
cb = cb || utils.createPromiseCallback();
|
cb = cb || utils.createPromiseCallback();
|
||||||
|
|
||||||
change.currentRevision(function(err, rev) {
|
const model = this.getModelCtor();
|
||||||
|
const id = this.getModelId();
|
||||||
|
|
||||||
|
model.findById(id, function(err, inst) {
|
||||||
if (err) return cb(err);
|
if (err) return cb(err);
|
||||||
|
|
||||||
|
if (inst) {
|
||||||
|
inst.fillCustomChangeProperties(change, function() {
|
||||||
|
const rev = Change.revisionForInst(inst);
|
||||||
|
prepareAndDoRectify(rev);
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
prepareAndDoRectify(null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return cb.promise;
|
||||||
|
|
||||||
|
function prepareAndDoRectify(rev) {
|
||||||
// avoid setting rev and prev to the same value
|
// avoid setting rev and prev to the same value
|
||||||
if (currentRev === rev) {
|
if (currentRev === rev) {
|
||||||
change.debug('rev and prev are equal (not updating anything)');
|
change.debug('rev and prev are equal (not updating anything)');
|
||||||
return cb(null, change);
|
return cb(null, change);
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME(@bajtos) Allo callers to pass in the checkpoint value
|
// FIXME(@bajtos) Allow callers to pass in the checkpoint value
|
||||||
// (or even better - a memoized async function to get the cp value)
|
// (or even better - a memoized async function to get the cp value)
|
||||||
// That will enable `rectifyAll` to cache the checkpoint value
|
// That will enable `rectifyAll` to cache the checkpoint value
|
||||||
change.constructor.getCheckpointModel().current(
|
change.constructor.getCheckpointModel().current(
|
||||||
|
@ -202,8 +218,7 @@ module.exports = function(Change) {
|
||||||
doRectify(checkpoint, rev);
|
doRectify(checkpoint, rev);
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
});
|
}
|
||||||
return cb.promise;
|
|
||||||
|
|
||||||
function doRectify(checkpoint, rev) {
|
function doRectify(checkpoint, rev) {
|
||||||
if (rev) {
|
if (rev) {
|
||||||
|
@ -228,7 +243,7 @@ module.exports = function(Change) {
|
||||||
if (currentRev) {
|
if (currentRev) {
|
||||||
change.prev = currentRev;
|
change.prev = currentRev;
|
||||||
} else if (!change.prev) {
|
} else if (!change.prev) {
|
||||||
change.debug('ERROR - could not determing prev');
|
change.debug('ERROR - could not determine prev');
|
||||||
change.prev = Change.UNKNOWN;
|
change.prev = Change.UNKNOWN;
|
||||||
}
|
}
|
||||||
change.debug('updated prev');
|
change.debug('updated prev');
|
||||||
|
|
|
@ -1052,6 +1052,7 @@ module.exports = function(registry) {
|
||||||
var idName = this.dataSource.idName(this.modelName);
|
var idName = this.dataSource.idName(this.modelName);
|
||||||
var Change = this.getChangeModel();
|
var Change = this.getChangeModel();
|
||||||
var model = this;
|
var model = this;
|
||||||
|
const changeFilter = this.createChangeFilter(since, filter);
|
||||||
|
|
||||||
filter = filter || {};
|
filter = filter || {};
|
||||||
filter.fields = {};
|
filter.fields = {};
|
||||||
|
@ -1059,10 +1060,7 @@ module.exports = function(registry) {
|
||||||
filter.fields[idName] = true;
|
filter.fields[idName] = true;
|
||||||
|
|
||||||
// TODO(ritch) this whole thing could be optimized a bit more
|
// TODO(ritch) this whole thing could be optimized a bit more
|
||||||
Change.find({where: {
|
Change.find(changeFilter, function(err, changes) {
|
||||||
checkpoint: {gte: since},
|
|
||||||
modelName: this.modelName,
|
|
||||||
}}, function(err, changes) {
|
|
||||||
if (err) return callback(err);
|
if (err) return callback(err);
|
||||||
if (!Array.isArray(changes) || changes.length === 0) return callback(null, []);
|
if (!Array.isArray(changes) || changes.length === 0) return callback(null, []);
|
||||||
var ids = changes.map(function(change) {
|
var ids = changes.map(function(change) {
|
||||||
|
@ -1759,11 +1757,12 @@ module.exports = function(registry) {
|
||||||
assert(BaseChangeModel,
|
assert(BaseChangeModel,
|
||||||
'Change model must be defined before enabling change replication');
|
'Change model must be defined before enabling change replication');
|
||||||
|
|
||||||
|
const additionalChangeModelProperties =
|
||||||
|
this.settings.additionalChangeModelProperties || {};
|
||||||
|
|
||||||
this.Change = BaseChangeModel.extend(this.modelName + '-change',
|
this.Change = BaseChangeModel.extend(this.modelName + '-change',
|
||||||
{},
|
additionalChangeModelProperties,
|
||||||
{
|
{trackModel: this}
|
||||||
trackModel: this,
|
|
||||||
}
|
|
||||||
);
|
);
|
||||||
|
|
||||||
if (this.dataSource) {
|
if (this.dataSource) {
|
||||||
|
@ -1928,6 +1927,77 @@ module.exports = function(registry) {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the filter for searching related changes.
|
||||||
|
*
|
||||||
|
* Models should override this function to copy properties
|
||||||
|
* from the model instance filter into the change search filter.
|
||||||
|
*
|
||||||
|
* ```js
|
||||||
|
* module.exports = (TargetModel, config) => {
|
||||||
|
* TargetModel.createChangeFilter = function(since, modelFilter) {
|
||||||
|
* const filter = this.base.createChangeFilter.apply(this, arguments);
|
||||||
|
* if (modelFilter && modelFilter.where && modelFilter.where.tenantId) {
|
||||||
|
* filter.where.tenantId = modelFilter.where.tenantId;
|
||||||
|
* }
|
||||||
|
* return filter;
|
||||||
|
* };
|
||||||
|
* };
|
||||||
|
* ```
|
||||||
|
*
|
||||||
|
* @param {Number} since Return only changes since this checkpoint.
|
||||||
|
* @param {Object} modelFilter Filter describing which model instances to
|
||||||
|
* include in the list of changes.
|
||||||
|
* @returns {Object} The filter object to pass to `Change.find()`. Default:
|
||||||
|
* ```
|
||||||
|
* {where: {checkpoint: {gte: since}, modelName: this.modelName}}
|
||||||
|
* ```
|
||||||
|
*/
|
||||||
|
PersistedModel.createChangeFilter = function(since, modelFilter) {
|
||||||
|
return {
|
||||||
|
where: {
|
||||||
|
checkpoint: {gte: since},
|
||||||
|
modelName: this.modelName,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add custom data to the Change instance.
|
||||||
|
*
|
||||||
|
* Models should override this function to duplicate model instance properties
|
||||||
|
* to the Change instance properties, typically to allow the changes() method
|
||||||
|
* to filter the changes using these duplicated properties directly while
|
||||||
|
* querying the Change model.
|
||||||
|
*
|
||||||
|
* ```js
|
||||||
|
* module.exports = (TargetModel, config) => {
|
||||||
|
* TargetModel.prototype.fillCustomChangeProperties = function(change, cb) {
|
||||||
|
* var inst = this;
|
||||||
|
* const base = this.constructor.base;
|
||||||
|
* base.prototype.fillCustomChangeProperties.call(this, change, err => {
|
||||||
|
* if (err) return cb(err);
|
||||||
|
*
|
||||||
|
* if (inst && inst.tenantId) {
|
||||||
|
* change.tenantId = inst.tenantId;
|
||||||
|
* } else {
|
||||||
|
* change.tenantId = null;
|
||||||
|
* }
|
||||||
|
*
|
||||||
|
* cb();
|
||||||
|
* });
|
||||||
|
* };
|
||||||
|
* };
|
||||||
|
* ```
|
||||||
|
*
|
||||||
|
* @callback {Function} callback
|
||||||
|
* @param {Error} err Error object; see [Error object](http://loopback.io/doc/en/lb3/Error-object.html).
|
||||||
|
*/
|
||||||
|
PersistedModel.prototype.fillCustomChangeProperties = function(change, cb) {
|
||||||
|
// no-op by default
|
||||||
|
cb();
|
||||||
|
};
|
||||||
|
|
||||||
PersistedModel.setup();
|
PersistedModel.setup();
|
||||||
|
|
||||||
return PersistedModel;
|
return PersistedModel;
|
||||||
|
|
|
@ -9,9 +9,9 @@ var async = require('async');
|
||||||
var expect = require('./helpers/expect');
|
var expect = require('./helpers/expect');
|
||||||
var loopback = require('../');
|
var loopback = require('../');
|
||||||
|
|
||||||
var Change, TestModel;
|
|
||||||
|
|
||||||
describe('Change', function() {
|
describe('Change', function() {
|
||||||
|
let Change, TestModel;
|
||||||
|
|
||||||
beforeEach(function() {
|
beforeEach(function() {
|
||||||
var memory = loopback.createDataSource({
|
var memory = loopback.createDataSource({
|
||||||
connector: loopback.Memory,
|
connector: loopback.Memory,
|
||||||
|
@ -321,7 +321,6 @@ describe('Change', function() {
|
||||||
change.rectify()
|
change.rectify()
|
||||||
.then(function(ch) {
|
.then(function(ch) {
|
||||||
assert.equal(ch.rev, test.revisionForModel);
|
assert.equal(ch.rev, test.revisionForModel);
|
||||||
|
|
||||||
done();
|
done();
|
||||||
})
|
})
|
||||||
.catch(done);
|
.catch(done);
|
||||||
|
@ -609,3 +608,68 @@ describe('Change', function() {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('Change with with custom properties', function() {
|
||||||
|
let Change, TestModel;
|
||||||
|
|
||||||
|
beforeEach(function() {
|
||||||
|
let memory = loopback.createDataSource({
|
||||||
|
connector: loopback.Memory,
|
||||||
|
});
|
||||||
|
|
||||||
|
TestModel = loopback.PersistedModel.extend('ChangeTestModelWithTenant',
|
||||||
|
{
|
||||||
|
id: {id: true, type: 'string', defaultFn: 'guid'},
|
||||||
|
tenantId: 'string',
|
||||||
|
},
|
||||||
|
{
|
||||||
|
trackChanges: true,
|
||||||
|
additionalChangeModelProperties: {tenantId: 'string'},
|
||||||
|
});
|
||||||
|
this.modelName = TestModel.modelName;
|
||||||
|
|
||||||
|
TestModel.prototype.fillCustomChangeProperties = function(change, cb) {
|
||||||
|
var inst = this;
|
||||||
|
|
||||||
|
if (inst && inst.tenantId) {
|
||||||
|
change.tenantId = inst.tenantId;
|
||||||
|
} else {
|
||||||
|
change.tenantId = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
cb();
|
||||||
|
};
|
||||||
|
|
||||||
|
TestModel.attachTo(memory);
|
||||||
|
TestModel._defineChangeModel();
|
||||||
|
Change = TestModel.getChangeModel();
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('change.rectify', function() {
|
||||||
|
const TENANT_ID = '123';
|
||||||
|
let change;
|
||||||
|
|
||||||
|
beforeEach(givenChangeInstance);
|
||||||
|
|
||||||
|
it('stores the custom property in the Change instance', function() {
|
||||||
|
return change.rectify().then(function(ch) {
|
||||||
|
expect(ch.toObject()).to.have.property('tenantId', TENANT_ID);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
function givenChangeInstance() {
|
||||||
|
const data = {
|
||||||
|
foo: 'bar',
|
||||||
|
tenantId: TENANT_ID,
|
||||||
|
};
|
||||||
|
|
||||||
|
return TestModel.create(data)
|
||||||
|
.then(function(model) {
|
||||||
|
const modelName = TestModel.modelName;
|
||||||
|
return Change.findOrCreateChange(modelName, model.id);
|
||||||
|
}).then(function(ch) {
|
||||||
|
change = ch;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
|
@ -1905,3 +1905,131 @@ describe('Replication / Change APIs', function() {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('Replication / Change APIs with custom change properties', function() {
|
||||||
|
this.timeout(10000);
|
||||||
|
var dataSource, useSinceFilter, SourceModel, TargetModel, startingCheckpoint;
|
||||||
|
var tid = 0; // per-test unique id used e.g. to build unique model names
|
||||||
|
|
||||||
|
beforeEach(function() {
|
||||||
|
tid++;
|
||||||
|
useSinceFilter = false;
|
||||||
|
var test = this;
|
||||||
|
|
||||||
|
dataSource = this.dataSource = loopback.createDataSource({
|
||||||
|
connector: loopback.Memory,
|
||||||
|
});
|
||||||
|
SourceModel = this.SourceModel = PersistedModel.extend(
|
||||||
|
'SourceModelWithCustomChangeProperties-' + tid,
|
||||||
|
{
|
||||||
|
id: {id: true, type: String, defaultFn: 'guid'},
|
||||||
|
customProperty: {type: 'string'},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
trackChanges: true,
|
||||||
|
additionalChangeModelProperties: {customProperty: {type: 'string'}},
|
||||||
|
});
|
||||||
|
|
||||||
|
SourceModel.createChangeFilter = function(since, modelFilter) {
|
||||||
|
const filter = this.base.createChangeFilter.apply(this, arguments);
|
||||||
|
if (modelFilter && modelFilter.where && modelFilter.where.customProperty)
|
||||||
|
filter.where.customProperty = modelFilter.where.customProperty;
|
||||||
|
return filter;
|
||||||
|
};
|
||||||
|
|
||||||
|
SourceModel.prototype.fillCustomChangeProperties = function(change, cb) {
|
||||||
|
const customProperty = this.customProperty;
|
||||||
|
const base = this.constructor.base;
|
||||||
|
base.prototype.fillCustomChangeProperties.call(this, change, err => {
|
||||||
|
if (err) return cb(err);
|
||||||
|
change.customProperty = customProperty;
|
||||||
|
cb();
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
SourceModel.attachTo(dataSource);
|
||||||
|
|
||||||
|
TargetModel = this.TargetModel = PersistedModel.extend(
|
||||||
|
'TargetModelWithCustomChangeProperties-' + tid,
|
||||||
|
{
|
||||||
|
id: {id: true, type: String, defaultFn: 'guid'},
|
||||||
|
customProperty: {type: 'string'},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
trackChanges: true,
|
||||||
|
additionalChangeModelProperties: {customProperty: {type: 'string'}},
|
||||||
|
});
|
||||||
|
|
||||||
|
var ChangeModelForTarget = TargetModel.Change;
|
||||||
|
ChangeModelForTarget.Checkpoint = loopback.Checkpoint.extend('TargetCheckpoint');
|
||||||
|
ChangeModelForTarget.Checkpoint.attachTo(dataSource);
|
||||||
|
|
||||||
|
TargetModel.attachTo(dataSource);
|
||||||
|
|
||||||
|
startingCheckpoint = -1;
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('Model._defineChangeModel()', function() {
|
||||||
|
it('defines change model with custom properties', function() {
|
||||||
|
var changeModel = SourceModel.getChangeModel();
|
||||||
|
var changeModelProperties = changeModel.definition.properties;
|
||||||
|
|
||||||
|
expect(changeModelProperties).to.have.property('customProperty');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('Model.changes(since, filter, callback)', function() {
|
||||||
|
beforeEach(givenSomeSourceModelInstances);
|
||||||
|
|
||||||
|
it('queries changes using customized filter', function(done) {
|
||||||
|
var filterUsed = mockChangeFind(this.SourceModel);
|
||||||
|
|
||||||
|
SourceModel.changes(
|
||||||
|
startingCheckpoint,
|
||||||
|
{where: {customProperty: '123'}},
|
||||||
|
function(err, changes) {
|
||||||
|
if (err) return done(err);
|
||||||
|
expect(filterUsed[0]).to.eql({
|
||||||
|
where: {
|
||||||
|
checkpoint: {gte: -1},
|
||||||
|
modelName: SourceModel.modelName,
|
||||||
|
customProperty: '123',
|
||||||
|
},
|
||||||
|
});
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('query returns the matching changes', function(done) {
|
||||||
|
SourceModel.changes(
|
||||||
|
startingCheckpoint,
|
||||||
|
{where: {customProperty: '123'}},
|
||||||
|
function(err, changes) {
|
||||||
|
expect(changes).to.have.length(1);
|
||||||
|
expect(changes[0]).to.have.property('customProperty', '123');
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
function givenSomeSourceModelInstances(done) {
|
||||||
|
const data = [
|
||||||
|
{name: 'foo', customProperty: '123'},
|
||||||
|
{name: 'foo', customPropertyValue: '456'},
|
||||||
|
];
|
||||||
|
this.SourceModel.create(data, done);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
function mockChangeFind(Model) {
|
||||||
|
var filterUsed = [];
|
||||||
|
|
||||||
|
Model.getChangeModel().find = function(filter, cb) {
|
||||||
|
filterUsed.push(filter);
|
||||||
|
if (cb) {
|
||||||
|
process.nextTick(cb);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
return filterUsed;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
Loading…
Reference in New Issue