Merge pull request #2940 from kobaska/add-optional-options-to-bulkupdate

Add options to bulkUpdate
This commit is contained in:
Miroslav Bajtoš 2016-11-15 17:34:06 +01:00 committed by GitHub
commit c0e96ffa12
2 changed files with 121 additions and 14 deletions

View File

@ -1075,7 +1075,7 @@ module.exports = function(registry) {
* *
* @param {Number} [since] Since this checkpoint * @param {Number} [since] Since this checkpoint
* @param {Model} targetModel Target this model class * @param {Model} targetModel Target this model class
* @param {Object} [options] * @param {Object} [options] An optional options object to pass to underlying data-access calls.
* @param {Object} [options.filter] Replicate models that match this filter * @param {Object} [options.filter] Replicate models that match this filter
* @callback {Function} [callback] Callback function called with `(err, conflicts)` arguments. * @callback {Function} [callback] Callback function called with `(err, conflicts)` arguments.
* @param {Error} err Error object; see [Error object](http://docs.strongloop.com/display/LB/Error+object). * @param {Error} err Error object; see [Error object](http://docs.strongloop.com/display/LB/Error+object).
@ -1100,6 +1100,10 @@ module.exports = function(registry) {
since = { source: since, target: since }; since = { source: since, target: since };
} }
if (typeof options === 'function') {
options = {};
}
options = options || {}; options = options || {};
var sourceModel = this; var sourceModel = this;
@ -1214,7 +1218,7 @@ module.exports = function(registry) {
function bulkUpdate(_updates, cb) { function bulkUpdate(_updates, cb) {
debug('\tstarting bulk update'); debug('\tstarting bulk update');
updates = _updates; updates = _updates;
targetModel.bulkUpdate(updates, function(err) { targetModel.bulkUpdate(updates, options, function(err) {
var conflicts = err && err.details && err.details.conflicts; var conflicts = err && err.details && err.details.conflicts;
if (conflicts && err.statusCode == 409) { if (conflicts && err.statusCode == 409) {
diff.conflicts = conflicts; diff.conflicts = conflicts;
@ -1328,15 +1332,28 @@ module.exports = function(registry) {
* **Note: this is not atomic** * **Note: this is not atomic**
* *
* @param {Array} updates An updates list, usually from [createUpdates()](#persistedmodel-createupdates). * @param {Array} updates An updates list, usually from [createUpdates()](#persistedmodel-createupdates).
* @param {Object} [options] An optional options object to pass to underlying data-access calls.
* @param {Function} callback Callback function. * @param {Function} callback Callback function.
*/ */
PersistedModel.bulkUpdate = function(updates, callback) { PersistedModel.bulkUpdate = function(updates, options, callback) {
var tasks = []; var tasks = [];
var Model = this; var Model = this;
var Change = this.getChangeModel(); var Change = this.getChangeModel();
var conflicts = []; var conflicts = [];
var lastArg = arguments[arguments.length - 1];
if (typeof lastArg === 'function' && arguments.length > 1) {
callback = lastArg;
}
if (typeof options === 'function') {
options = {};
}
options = options || {};
buildLookupOfAffectedModelData(Model, updates, function(err, currentMap) { buildLookupOfAffectedModelData(Model, updates, function(err, currentMap) {
if (err) return callback(err); if (err) return callback(err);
@ -1346,18 +1363,18 @@ module.exports = function(registry) {
switch (update.type) { switch (update.type) {
case Change.UPDATE: case Change.UPDATE:
tasks.push(function(cb) { tasks.push(function(cb) {
applyUpdate(Model, id, current, update.data, update.change, conflicts, cb); applyUpdate(Model, id, current, update.data, update.change, conflicts, options, cb);
}); });
break; break;
case Change.CREATE: case Change.CREATE:
tasks.push(function(cb) { tasks.push(function(cb) {
applyCreate(Model, id, current, update.data, update.change, conflicts, cb); applyCreate(Model, id, current, update.data, update.change, conflicts, options, cb);
}); });
break; break;
case Change.DELETE: case Change.DELETE:
tasks.push(function(cb) { tasks.push(function(cb) {
applyDelete(Model, id, current, update.change, conflicts, cb); applyDelete(Model, id, current, update.change, conflicts, options, cb);
}); });
break; break;
} }
@ -1391,7 +1408,7 @@ module.exports = function(registry) {
}); });
} }
function applyUpdate(Model, id, current, data, change, conflicts, cb) { function applyUpdate(Model, id, current, data, change, conflicts, options, cb) {
var Change = Model.getChangeModel(); var Change = Model.getChangeModel();
var rev = current ? Change.revisionForInst(current) : null; var rev = current ? Change.revisionForInst(current) : null;
@ -1409,7 +1426,7 @@ module.exports = function(registry) {
// but not included in `data` // but not included in `data`
// See https://github.com/strongloop/loopback/issues/1215 // See https://github.com/strongloop/loopback/issues/1215
Model.updateAll(current.toObject(), data, function(err, result) { Model.updateAll(current.toObject(), data, options, function(err, result) {
if (err) return cb(err); if (err) return cb(err);
var count = result && result.count; var count = result && result.count;
@ -1444,8 +1461,8 @@ module.exports = function(registry) {
}); });
} }
function applyCreate(Model, id, current, data, change, conflicts, cb) { function applyCreate(Model, id, current, data, change, conflicts, options, cb) {
Model.create(data, function(createErr) { Model.create(data, options, function(createErr) {
if (!createErr) return cb(); if (!createErr) return cb();
// We don't have a reliable way how to detect the situation // We don't have a reliable way how to detect the situation
@ -1473,7 +1490,7 @@ module.exports = function(registry) {
} }
} }
function applyDelete(Model, id, current, change, conflicts, cb) { function applyDelete(Model, id, current, change, conflicts, options, cb) {
if (!current) { if (!current) {
// The instance was either already deleted or not created at all, // The instance was either already deleted or not created at all,
// we are done. // we are done.
@ -1491,7 +1508,7 @@ module.exports = function(registry) {
return Change.rectifyModelChanges(Model.modelName, [id], cb); return Change.rectifyModelChanges(Model.modelName, [id], cb);
} }
Model.deleteAll(current.toObject(), function(err, result) { Model.deleteAll(current.toObject(), options, function(err, result) {
if (err) return cb(err); if (err) return cb(err);
var count = result && result.count; var count = result && result.count;

View File

@ -1537,6 +1537,96 @@ describe('Replication / Change APIs', function() {
} }
}); });
describe('ensure options object is set on context during bulkUpdate', function() {
var syncPropertyExists = false;
var OptionsSourceModel;
beforeEach(function() {
OptionsSourceModel = PersistedModel.extend(
'OptionsSourceModel-' + tid,
{ id: { id: true, type: String, defaultFn: 'guid' } },
{ trackChanges: true });
OptionsSourceModel.attachTo(dataSource);
OptionsSourceModel.observe('before save', function updateTimestamp(ctx, next) {
if (ctx.options.sync) {
syncPropertyExists = true;
} else {
syncPropertyExists = false;
}
next();
});
});
it('bulkUpdate should call Model updates with the provided options object', function(done) {
var testData = {name: 'Janie', surname: 'Doe'};
var updates = [
{
data: null,
change: null,
type: 'create'
}
];
var options = {
sync: true
};
async.waterfall([
function(callback) {
TargetModel.create(testData, callback);
},
function(data, callback) {
updates[0].data = data;
TargetModel.getChangeModel().find({where: {modelId: data.id}}, callback);
},
function(data, callback) {
updates[0].change = data;
OptionsSourceModel.bulkUpdate(updates, options, callback);
}],
function(err, result) {
if (err) return done(err);
expect(syncPropertyExists).to.eql(true);
done();
}
);
});
});
describe('ensure bulkUpdate works with just 2 args', function() {
it('bulkUpdate should successfully finish without options', function(done) {
var testData = {name: 'Janie', surname: 'Doe'};
var updates = [
{
data: null,
change: null,
type: 'create'
}
];
async.waterfall([
function(callback) {
TargetModel.create(testData, callback);
},
function(data, callback) {
updates[0].data = data;
TargetModel.getChangeModel().find({where: {modelId: data.id}}, callback);
},
function(data, callback) {
updates[0].change = data;
SourceModel.bulkUpdate(updates, callback);
}
], function(err, result) {
if (err) return done(err);
done();
}
);
});
});
var _since = {}; var _since = {};
function replicate(source, target, since, next) { function replicate(source, target, since, next) {
if (typeof since === 'function') { if (typeof since === 'function') {
@ -1591,14 +1681,14 @@ describe('Replication / Change APIs', function() {
function setupRaceConditionInReplication(fn) { function setupRaceConditionInReplication(fn) {
var bulkUpdate = TargetModel.bulkUpdate; var bulkUpdate = TargetModel.bulkUpdate;
TargetModel.bulkUpdate = function(data, cb) { TargetModel.bulkUpdate = function(data, options, cb) {
// simulate the situation when a 3rd party modifies the database // simulate the situation when a 3rd party modifies the database
// while a replication run is in progress // while a replication run is in progress
var self = this; var self = this;
fn(function(err) { fn(function(err) {
if (err) return cb(err); if (err) return cb(err);
bulkUpdate.call(self, data, cb); bulkUpdate.call(self, data, options, cb);
}); });
// apply the 3rd party modification only once // apply the 3rd party modification only once