From bc923bd7813a2123ae87fe957626ab073af1ddcc Mon Sep 17 00:00:00 2001 From: Kogulan Baskaran Date: Tue, 15 Nov 2016 13:02:23 +1100 Subject: [PATCH] Add options to bulkUpdate --- lib/persisted-model.js | 41 +++++++++++++----- test/replication.test.js | 94 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 121 insertions(+), 14 deletions(-) diff --git a/lib/persisted-model.js b/lib/persisted-model.js index ccbe2596..36cb11b6 100644 --- a/lib/persisted-model.js +++ b/lib/persisted-model.js @@ -1075,7 +1075,7 @@ module.exports = function(registry) { * * @param {Number} [since] Since this checkpoint * @param {Model} targetModel Target this model class - * @param {Object} [options] + * @param {Object} [options] An optional options object to pass to underlying data-access calls. * @param {Object} [options.filter] Replicate models that match this filter * @callback {Function} [callback] Callback function called with `(err, conflicts)` arguments. * @param {Error} err Error object; see [Error object](http://docs.strongloop.com/display/LB/Error+object). @@ -1100,6 +1100,10 @@ module.exports = function(registry) { since = { source: since, target: since }; } + if (typeof options === 'function') { + options = {}; + } + options = options || {}; var sourceModel = this; @@ -1214,7 +1218,7 @@ module.exports = function(registry) { function bulkUpdate(_updates, cb) { debug('\tstarting bulk update'); updates = _updates; - targetModel.bulkUpdate(updates, function(err) { + targetModel.bulkUpdate(updates, options, function(err) { var conflicts = err && err.details && err.details.conflicts; if (conflicts && err.statusCode == 409) { diff.conflicts = conflicts; @@ -1328,15 +1332,28 @@ module.exports = function(registry) { * **Note: this is not atomic** * * @param {Array} updates An updates list, usually from [createUpdates()](#persistedmodel-createupdates). + * @param {Object} [options] An optional options object to pass to underlying data-access calls. * @param {Function} callback Callback function. */ - PersistedModel.bulkUpdate = function(updates, callback) { + PersistedModel.bulkUpdate = function(updates, options, callback) { var tasks = []; var Model = this; var Change = this.getChangeModel(); var conflicts = []; + var lastArg = arguments[arguments.length - 1]; + + if (typeof lastArg === 'function' && arguments.length > 1) { + callback = lastArg; + } + + if (typeof options === 'function') { + options = {}; + } + + options = options || {}; + buildLookupOfAffectedModelData(Model, updates, function(err, currentMap) { if (err) return callback(err); @@ -1346,18 +1363,18 @@ module.exports = function(registry) { switch (update.type) { case Change.UPDATE: tasks.push(function(cb) { - applyUpdate(Model, id, current, update.data, update.change, conflicts, cb); + applyUpdate(Model, id, current, update.data, update.change, conflicts, options, cb); }); break; case Change.CREATE: tasks.push(function(cb) { - applyCreate(Model, id, current, update.data, update.change, conflicts, cb); + applyCreate(Model, id, current, update.data, update.change, conflicts, options, cb); }); break; case Change.DELETE: tasks.push(function(cb) { - applyDelete(Model, id, current, update.change, conflicts, cb); + applyDelete(Model, id, current, update.change, conflicts, options, cb); }); break; } @@ -1391,7 +1408,7 @@ module.exports = function(registry) { }); } - function applyUpdate(Model, id, current, data, change, conflicts, cb) { + function applyUpdate(Model, id, current, data, change, conflicts, options, cb) { var Change = Model.getChangeModel(); var rev = current ? Change.revisionForInst(current) : null; @@ -1409,7 +1426,7 @@ module.exports = function(registry) { // but not included in `data` // See https://github.com/strongloop/loopback/issues/1215 - Model.updateAll(current.toObject(), data, function(err, result) { + Model.updateAll(current.toObject(), data, options, function(err, result) { if (err) return cb(err); var count = result && result.count; @@ -1444,8 +1461,8 @@ module.exports = function(registry) { }); } - function applyCreate(Model, id, current, data, change, conflicts, cb) { - Model.create(data, function(createErr) { + function applyCreate(Model, id, current, data, change, conflicts, options, cb) { + Model.create(data, options, function(createErr) { if (!createErr) return cb(); // We don't have a reliable way how to detect the situation @@ -1473,7 +1490,7 @@ module.exports = function(registry) { } } - function applyDelete(Model, id, current, change, conflicts, cb) { + function applyDelete(Model, id, current, change, conflicts, options, cb) { if (!current) { // The instance was either already deleted or not created at all, // we are done. @@ -1491,7 +1508,7 @@ module.exports = function(registry) { return Change.rectifyModelChanges(Model.modelName, [id], cb); } - Model.deleteAll(current.toObject(), function(err, result) { + Model.deleteAll(current.toObject(), options, function(err, result) { if (err) return cb(err); var count = result && result.count; diff --git a/test/replication.test.js b/test/replication.test.js index 3a9585b0..d559fd3c 100644 --- a/test/replication.test.js +++ b/test/replication.test.js @@ -1537,6 +1537,96 @@ describe('Replication / Change APIs', function() { } }); + describe('ensure options object is set on context during bulkUpdate', function() { + var syncPropertyExists = false; + var OptionsSourceModel; + + beforeEach(function() { + OptionsSourceModel = PersistedModel.extend( + 'OptionsSourceModel-' + tid, + { id: { id: true, type: String, defaultFn: 'guid' } }, + { trackChanges: true }); + + OptionsSourceModel.attachTo(dataSource); + + OptionsSourceModel.observe('before save', function updateTimestamp(ctx, next) { + if (ctx.options.sync) { + syncPropertyExists = true; + } else { + syncPropertyExists = false; + } + next(); + }); + }); + + it('bulkUpdate should call Model updates with the provided options object', function(done) { + var testData = {name: 'Janie', surname: 'Doe'}; + var updates = [ + { + data: null, + change: null, + type: 'create' + } + ]; + + var options = { + sync: true + }; + + async.waterfall([ + function(callback) { + TargetModel.create(testData, callback); + }, + function(data, callback) { + updates[0].data = data; + TargetModel.getChangeModel().find({where: {modelId: data.id}}, callback); + }, + function(data, callback) { + updates[0].change = data; + OptionsSourceModel.bulkUpdate(updates, options, callback); + }], + function(err, result) { + if (err) return done(err); + + expect(syncPropertyExists).to.eql(true); + + done(); + } + ); + }); + }); + + describe('ensure bulkUpdate works with just 2 args', function() { + it('bulkUpdate should successfully finish without options', function(done) { + var testData = {name: 'Janie', surname: 'Doe'}; + var updates = [ + { + data: null, + change: null, + type: 'create' + } + ]; + + async.waterfall([ + function(callback) { + TargetModel.create(testData, callback); + }, + function(data, callback) { + updates[0].data = data; + TargetModel.getChangeModel().find({where: {modelId: data.id}}, callback); + }, + function(data, callback) { + updates[0].change = data; + SourceModel.bulkUpdate(updates, callback); + } + ], function(err, result) { + if (err) return done(err); + done(); + } + ); + }); + }); + var _since = {}; function replicate(source, target, since, next) { if (typeof since === 'function') { @@ -1591,14 +1681,14 @@ describe('Replication / Change APIs', function() { function setupRaceConditionInReplication(fn) { var bulkUpdate = TargetModel.bulkUpdate; - TargetModel.bulkUpdate = function(data, cb) { + TargetModel.bulkUpdate = function(data, options, cb) { // simulate the situation when a 3rd party modifies the database // while a replication run is in progress var self = this; fn(function(err) { if (err) return cb(err); - bulkUpdate.call(self, data, cb); + bulkUpdate.call(self, data, options, cb); }); // apply the 3rd party modification only once