Add options to bulkUpdate

This commit is contained in:
Kogulan Baskaran 2016-11-15 13:02:23 +11:00 committed by Miroslav Bajtoš
parent 54ee8d8bb5
commit b4f1b2f02c
2 changed files with 118 additions and 14 deletions

View File

@ -1073,7 +1073,7 @@ module.exports = function(registry) {
*
* @param {Number} [since] Since this checkpoint
* @param {Model} targetModel Target this model class
* @param {Object} [options]
* @param {Object} [options] An optional options object to pass to underlying data-access calls.
* @param {Object} [options.filter] Replicate models that match this filter
* @callback {Function} [callback] Callback function called with `(err, conflicts)` arguments.
* @param {Error} err Error object; see [Error object](http://loopback.io/doc/en/lb2/Error-object.html).
@ -1100,6 +1100,10 @@ module.exports = function(registry) {
since = { source: since, target: since };
}
if (typeof options === 'function') {
options = {};
}
options = options || {};
var sourceModel = this;
@ -1212,7 +1216,7 @@ module.exports = function(registry) {
function bulkUpdate(_updates, cb) {
debug('\tstarting bulk update');
updates = _updates;
targetModel.bulkUpdate(updates, function(err) {
targetModel.bulkUpdate(updates, options, function(err) {
var conflicts = err && err.details && err.details.conflicts;
if (conflicts && err.statusCode == 409) {
diff.conflicts = conflicts;
@ -1326,15 +1330,28 @@ module.exports = function(registry) {
* **Note: this is not atomic**
*
* @param {Array} updates An updates list, usually from [createUpdates()](#persistedmodel-createupdates).
* @param {Object} [options] An optional options object to pass to underlying data-access calls.
* @param {Function} callback Callback function.
*/
PersistedModel.bulkUpdate = function(updates, callback) {
PersistedModel.bulkUpdate = function(updates, options, callback) {
var tasks = [];
var Model = this;
var Change = this.getChangeModel();
var conflicts = [];
var lastArg = arguments[arguments.length - 1];
if (typeof lastArg === 'function' && arguments.length > 1) {
callback = lastArg;
}
if (typeof options === 'function') {
options = {};
}
options = options || {};
buildLookupOfAffectedModelData(Model, updates, function(err, currentMap) {
if (err) return callback(err);
@ -1344,18 +1361,18 @@ module.exports = function(registry) {
switch (update.type) {
case Change.UPDATE:
tasks.push(function(cb) {
applyUpdate(Model, id, current, update.data, update.change, conflicts, cb);
applyUpdate(Model, id, current, update.data, update.change, conflicts, options, cb);
});
break;
case Change.CREATE:
tasks.push(function(cb) {
applyCreate(Model, id, current, update.data, update.change, conflicts, cb);
applyCreate(Model, id, current, update.data, update.change, conflicts, options, cb);
});
break;
case Change.DELETE:
tasks.push(function(cb) {
applyDelete(Model, id, current, update.change, conflicts, cb);
applyDelete(Model, id, current, update.change, conflicts, options, cb);
});
break;
}
@ -1389,7 +1406,7 @@ module.exports = function(registry) {
});
}
function applyUpdate(Model, id, current, data, change, conflicts, cb) {
function applyUpdate(Model, id, current, data, change, conflicts, options, cb) {
var Change = Model.getChangeModel();
var rev = current ? Change.revisionForInst(current) : null;
@ -1407,7 +1424,7 @@ module.exports = function(registry) {
// but not included in `data`
// See https://github.com/strongloop/loopback/issues/1215
Model.updateAll(current.toObject(), data, function(err, result) {
Model.updateAll(current.toObject(), data, options, function(err, result) {
if (err) return cb(err);
var count = result && result.count;
@ -1442,8 +1459,8 @@ module.exports = function(registry) {
});
}
function applyCreate(Model, id, current, data, change, conflicts, cb) {
Model.create(data, function(createErr) {
function applyCreate(Model, id, current, data, change, conflicts, options, cb) {
Model.create(data, options, function(createErr) {
if (!createErr) return cb();
// We don't have a reliable way how to detect the situation
@ -1471,7 +1488,7 @@ module.exports = function(registry) {
}
}
function applyDelete(Model, id, current, change, conflicts, cb) {
function applyDelete(Model, id, current, change, conflicts, options, cb) {
if (!current) {
// The instance was either already deleted or not created at all,
// we are done.
@ -1489,7 +1506,7 @@ module.exports = function(registry) {
return Change.rectifyModelChanges(Model.modelName, [id], cb);
}
Model.deleteAll(current.toObject(), function(err, result) {
Model.deleteAll(current.toObject(), options, function(err, result) {
if (err) return cb(err);
var count = result && result.count;

View File

@ -1535,6 +1535,93 @@ describe('Replication / Change APIs', function() {
}
});
describe('ensure options object is set on context during bulkUpdate', function() {
var syncPropertyExists = false;
var OptionsSourceModel;
beforeEach(function() {
OptionsSourceModel = PersistedModel.extend(
'OptionsSourceModel-' + tid,
{ id: { id: true, type: String, defaultFn: 'guid' }},
{ trackChanges: true });
OptionsSourceModel.attachTo(dataSource);
OptionsSourceModel.observe('before save', function updateTimestamp(ctx, next) {
if (ctx.options.sync) {
syncPropertyExists = true;
} else {
syncPropertyExists = false;
}
next();
});
});
it('bulkUpdate should call Model updates with the provided options object', function(done) {
var testData = { name: 'Janie', surname: 'Doe' };
var updates = [
{
data: null,
change: null,
type: 'create',
},
];
var options = {
sync: true,
};
async.waterfall([
function(callback) {
TargetModel.create(testData, callback);
},
function(data, callback) {
updates[0].data = data;
TargetModel.getChangeModel().find({ where: { modelId: data.id }}, callback);
},
function(data, callback) {
updates[0].change = data;
OptionsSourceModel.bulkUpdate(updates, options, callback);
}],
function(err, result) {
if (err) return done(err);
expect(syncPropertyExists).to.eql(true);
done();
}
);
});
});
describe('ensure bulkUpdate works with just 2 args', function() {
it('bulkUpdate should successfully finish without options', function(done) {
var testData = { name: 'Janie', surname: 'Doe' };
var updates = [{
data: null,
change: null,
type: 'create',
}];
async.waterfall([
function(callback) {
TargetModel.create(testData, callback);
},
function(data, callback) {
updates[0].data = data;
TargetModel.getChangeModel().find({ where: { modelId: data.id }}, callback);
},
function(data, callback) {
updates[0].change = data;
SourceModel.bulkUpdate(updates, callback);
},
], function(err, result) {
if (err) return done(err);
done();
});
});
});
var _since = {};
function replicate(source, target, since, next) {
if (typeof since === 'function') {
@ -1589,14 +1676,14 @@ describe('Replication / Change APIs', function() {
function setupRaceConditionInReplication(fn) {
var bulkUpdate = TargetModel.bulkUpdate;
TargetModel.bulkUpdate = function(data, cb) {
TargetModel.bulkUpdate = function(data, options, cb) {
// simulate the situation when a 3rd party modifies the database
// while a replication run is in progress
var self = this;
fn(function(err) {
if (err) return cb(err);
bulkUpdate.call(self, data, cb);
bulkUpdate.call(self, data, options, cb);
});
// apply the 3rd party modification only once