diff --git a/common/models/change.js b/common/models/change.js index cf47082d..fcc9c72a 100644 --- a/common/models/change.js +++ b/common/models/change.js @@ -186,27 +186,51 @@ module.exports = function(Change) { cb = cb || utils.createPromiseCallback(); - change.currentRevision(function(err, rev) { + var model = this.getModelCtor(); + var id = this.getModelId(); + + model.findById(id, function(err, inst) { if (err) return cb(err); - // avoid setting rev and prev to the same value - if (currentRev === rev) { - change.debug('rev and prev are equal (not updating anything)'); - return cb(null, change); - } + change.tenant = getTenant(model, inst); - // FIXME(@bajtos) Allo callers to pass in the checkpoint value - // (or even better - a memoized async function to get the cp value) - // That will enable `rectifyAll` to cache the checkpoint value - change.constructor.getCheckpointModel().current( - function(err, checkpoint) { - if (err) return cb(err); - doRectify(checkpoint, rev); + change.currentRevision(inst, function(err, rev) { + if (err) return cb(err); + + // avoid setting rev and prev to the same value + if (currentRev === rev) { + change.debug('rev and prev are equal (not updating anything)'); + return cb(null, change); } - ); + + // FIXME(@bajtos) Allow callers to pass in the checkpoint value + // (or even better - a memoized async function to get the cp value) + // That will enable `rectifyAll` to cache the checkpoint value + change.constructor.getCheckpointModel().current( + function(err, checkpoint) { + if (err) return cb(err); + doRectify(checkpoint, rev); + } + ); + }); }); return cb.promise; + function getTenant(model, inst) { + var tenant = null; + var tenantProperty; + + if (model && model.settings && model.settings.tenantProperty) { + tenantProperty = model.settings.tenantProperty; + } + + if (tenantProperty && inst && inst[tenantProperty]) { + tenant = inst[tenantProperty]; + } + + return tenant; + } + function doRectify(checkpoint, rev) { if (rev) { if (currentRev === rev) { @@ -230,7 +254,7 @@ module.exports = function(Change) { if (currentRev) { change.prev = currentRev; } else if (!change.prev) { - change.debug('ERROR - could not determing prev'); + change.debug('ERROR - could not determining prev'); change.prev = Change.UNKNOWN; } change.debug('updated prev'); @@ -254,23 +278,35 @@ module.exports = function(Change) { /** * Get a change's current revision based on current data. + * @param {Object} instance Optional instance object to get the revision string for, if not provided it will be queried * @callback {Function} callback * @param {Error} err * @param {String} rev The current revision */ - Change.prototype.currentRevision = function(cb) { + Change.prototype.currentRevision = function(instance, cb) { + // if only one argument was provided, then set that as the callback + if (typeof instance === 'function') { + cb = instance; + instance = null; + } + cb = cb || utils.createPromiseCallback(); - var model = this.getModelCtor(); - var id = this.getModelId(); - model.findById(id, function(err, inst) { - if (err) return cb(err); - if (inst) { - cb(null, Change.revisionForInst(inst)); - } else { - cb(null, null); - } - }); + + if (instance) { + cb(null, Change.revisionForInst(instance)); + } else { + var model = this.getModelCtor(); + var id = this.getModelId(); + model.findById(id, function(err, inst) { + if (err) return cb(err); + if (inst) { + cb(null, Change.revisionForInst(inst)); + } else { + cb(null, null); + } + }); + } return cb.promise; }; diff --git a/common/models/change.json b/common/models/change.json index b968703a..2e8ce88a 100644 --- a/common/models/change.json +++ b/common/models/change.json @@ -20,6 +20,10 @@ }, "modelId": { "type": "string" + }, + "tenant": { + "type": "string", + "default": null } } } diff --git a/lib/persisted-model.js b/lib/persisted-model.js index ccbe2596..2cafed93 100644 --- a/lib/persisted-model.js +++ b/lib/persisted-model.js @@ -996,12 +996,13 @@ module.exports = function(registry) { * to reduce the number of results returned. * @param {Number} since Return only changes since this checkpoint. * @param {Object} filter Include only changes that match this filter, the same as for [#persistedmodel-find](find()). + * @param {String} tenant Include only changes for this tenant * @callback {Function} callback Callback function called with `(err, changes)` arguments. Required. * @param {Error} err Error object; see [Error object](http://docs.strongloop.com/display/LB/Error+object). * @param {Array} changes An array of [Change](#change) objects. */ - PersistedModel.changes = function(since, filter, callback) { + PersistedModel.changes = function(since, filter, tenant, callback) { if (typeof since === 'function') { filter = {}; callback = since; @@ -1012,10 +1013,24 @@ module.exports = function(registry) { since = -1; filter = {}; } + if (typeof tenant === 'function') { + callback = tenant; + tenant = null; + } var idName = this.dataSource.idName(this.modelName); var Change = this.getChangeModel(); var model = this; + var changeFilter = { + where: { + checkpoint: {gte: since}, + modelName: this.modelName + } + }; + + if (tenant) { + changeFilter.where.tenant = tenant; + } filter = filter || {}; filter.fields = {}; @@ -1023,10 +1038,7 @@ module.exports = function(registry) { filter.fields[idName] = true; // TODO(ritch) this whole thing could be optimized a bit more - Change.find({ where: { - checkpoint: { gte: since }, - modelName: this.modelName - }}, function(err, changes) { + Change.find(changeFilter, function(err, changes) { if (err) return callback(err); if (!Array.isArray(changes) || changes.length === 0) return callback(null, []); var ids = changes.map(function(change) { @@ -1172,7 +1184,7 @@ module.exports = function(registry) { async.waterfall(tasks, done); function getSourceChanges(cb) { - sourceModel.changes(since.source, options.filter, debug.enabled ? log : cb); + sourceModel.changes(since.source, options.filter, options.tenant, debug.enabled ? log : cb); function log(err, result) { if (err) return cb(err); diff --git a/test/change.test.js b/test/change.test.js index 220d8055..5ada9f45 100644 --- a/test/change.test.js +++ b/test/change.test.js @@ -6,10 +6,10 @@ var async = require('async'); var expect = require('chai').expect; -var Change; -var TestModel; - describe('Change', function() { + var Change; + var TestModel; + beforeEach(function() { var memory = loopback.createDataSource({ connector: loopback.Memory @@ -233,6 +233,14 @@ describe('Change', function() { }); }); + it('should create a new change with tenant set as null', function(done) { + var test = this; + change.rectify(function(err, ch) { + assert.equal(ch.tenant, null); + done(); + }); + }); + // This test is a low-level equivalent of the test in replication.test.js // called "replicates multiple updates within the same CP" it('should merge updates within the same checkpoint', function(done) { @@ -320,7 +328,7 @@ describe('Change', function() { change.rectify() .then(function(ch) { assert.equal(ch.rev, test.revisionForModel); - + assert.equal(ch.tenant, null); done(); }) .catch(done); @@ -343,6 +351,22 @@ describe('Change', function() { }); }); + describe('change.currentRevision(instance, callback)', function() { + it('should get the correct revision', function(done) { + var test = this; + var change = new Change({ + modelName: this.modelName, + modelId: this.modelId + }); + + change.currentRevision(this.model, function(err, rev) { + assert.equal(rev, test.revisionForModel); + + done(); + }); + }); + }); + describe('change.currentRevision - promise variant', function() { it('should get the correct revision', function(done) { var test = this; @@ -544,6 +568,7 @@ describe('Change', function() { modelName: updateRecord.modelName, prev: 'foo', // this is the current local revision rev: 'foo-new', + tenant: null, }); done(); @@ -573,6 +598,7 @@ describe('Change', function() { modelName: updateRecord.modelName, prev: 'foo', // this is the current local revision rev: 'foo-new', + tenant: null, }); done(); @@ -601,6 +627,7 @@ describe('Change', function() { modelName: updateRecord.modelName, prev: null, // this is the current local revision rev: 'new-rev', + tenant: null, }); done(); @@ -608,3 +635,62 @@ describe('Change', function() { }); }); }); + +describe('Change with multi tenancy', function() { + var ChangeWithTenant; + var TestModelWithTenant; + + beforeEach(function() { + var memory = loopback.createDataSource({ + connector: loopback.Memory + }); + TestModelWithTenant = loopback.PersistedModel.extend('ChangeTestModel', + { + id: {id: true, type: 'string', defaultFn: 'guid'} + }, + { + trackChanges: true, + tenantProperty: 'tenantId' + }); + this.modelName = TestModelWithTenant.modelName; + TestModelWithTenant.attachTo(memory); + ChangeWithTenant = TestModelWithTenant.getChangeModel(); + }); + + describe('change.rectify - tenant variant', function() { + var change; + var tenantId = '123'; + + beforeEach(function(done) { + ChangeWithTenant = TestModelWithTenant.getChangeModel(); + var test = this; + test.data = { + foo: 'bar', + tenantId: tenantId + }; + + TestModelWithTenant.create(test.data, function(err, model) { + if (err) return done(err); + test.revisionForModel = ChangeWithTenant.revisionForInst(model); + + ChangeWithTenant.findOrCreateChange(TestModelWithTenant.modelName, model.id) + .then(function(ch) { + change = ch; + done(); + }).catch(done); + }); + }); + + it('should create a new change with the correct tenant and revision', function(done) { + var test = this; + change.rectify() + .then(function(ch) { + assert.equal(ch.rev, test.revisionForModel); + assert.equal(ch.tenant, tenantId); + + done(); + }) + .catch(done); + }); + }); +}); diff --git a/test/replication.test.js b/test/replication.test.js index 3a9585b0..e9c8fb09 100644 --- a/test/replication.test.js +++ b/test/replication.test.js @@ -1642,3 +1642,71 @@ describe('Replication / Change APIs', function() { return getPropValue(list, 'id'); } }); + +describe('Replication / Change APIs with multi tenancy', function() { + this.timeout(10000); + var dataSource, SourceModelWithTenant, TargetModelWithTenant; + var useSinceFilter; + var tid = 0; // per-test unique id used e.g. to build unique model names + + beforeEach(function() { + tid++; + useSinceFilter = false; + var test = this; + + dataSource = this.dataSource = loopback.createDataSource({ + connector: loopback.Memory + }); + SourceModelWithTenant = this.SourceModelWithTenant = PersistedModel.extend( + 'SourceModelWithTenant-' + tid, + { id: { id: true, type: String, defaultFn: 'guid' } }, + { trackChanges: true, tenantProperty: 'tenantId' }); + + SourceModelWithTenant.attachTo(dataSource); + + TargetModelWithTenant = this.TargetModelWithTenant = PersistedModel.extend( + 'TargetModelWithTenant-' + tid, + { id: { id: true, type: String, defaultFn: 'guid' } }, + { trackChanges: true, tenantProperty: 'tenantId' }); + + var TargetWithTenantChange = TargetModelWithTenant.Change; + TargetWithTenantChange.Checkpoint = loopback.Checkpoint.extend('TargetWithTenantCheckpoint'); + TargetWithTenantChange.Checkpoint.attachTo(dataSource); + + TargetModelWithTenant.attachTo(dataSource); + + test.startingCheckpoint = -1; + }); + + describe('Model.changes(since, filter, tenant, callback)', function() { + it('Get changes since the given checkpoint for the given tenant', function(done) { + var test = this; + this.SourceModelWithTenant.create([{name: 'foo', tenantId: '123'}, {name: 'foo', tenantId: '456'}], function(err) { + if (err) return done(err); + + setTimeout(function() { + test.SourceModelWithTenant.changes(test.startingCheckpoint, {}, '123', function(err, changes) { + assert.equal(changes.length, 1); + + done(); + }); + }, 1); + }); + }); + + it('excludes changes from older checkpoints', function(done) { + var FUTURE_CHECKPOINT = 999; + + SourceModelWithTenant.create({ name: 'foo', tenantId: '123' }, function(err) { + if (err) return done(err); + SourceModelWithTenant.changes(FUTURE_CHECKPOINT, {}, '123', function(err, changes) { + if (err) return done(err); + + expect(changes).to.be.empty; //jshint ignore:line + + done(); + }); + }); + }); + }); +});