Merge 205559b9e1
into 359a6a5762
This commit is contained in:
commit
23df23069b
|
@ -1396,220 +1396,14 @@ module.exports = function(registry) {
|
|||
/**
|
||||
* Apply an update list.
|
||||
*
|
||||
* **Note: this is not atomic**
|
||||
*
|
||||
* @param {Array} updates An updates list, usually from [createUpdates()](#persistedmodel-createupdates).
|
||||
* @param {Object} [options] An optional options object to pass to underlying data-access calls.
|
||||
* @param {Function} callback Callback function.
|
||||
*/
|
||||
|
||||
PersistedModel.bulkUpdate = function(updates, options, callback) {
|
||||
var tasks = [];
|
||||
var Model = this;
|
||||
var Change = this.getChangeModel();
|
||||
var conflicts = [];
|
||||
|
||||
var lastArg = arguments[arguments.length - 1];
|
||||
|
||||
if (typeof lastArg === 'function' && arguments.length > 1) {
|
||||
callback = lastArg;
|
||||
}
|
||||
|
||||
if (typeof options === 'function') {
|
||||
options = {};
|
||||
}
|
||||
|
||||
options = options || {};
|
||||
|
||||
buildLookupOfAffectedModelData(Model, updates, function(err, currentMap) {
|
||||
if (err) return callback(err);
|
||||
|
||||
updates.forEach(function(update) {
|
||||
var id = update.change.modelId;
|
||||
var current = currentMap[id];
|
||||
switch (update.type) {
|
||||
case Change.UPDATE:
|
||||
tasks.push(function(cb) {
|
||||
applyUpdate(Model, id, current, update.data, update.change, conflicts, options, cb);
|
||||
});
|
||||
break;
|
||||
|
||||
case Change.CREATE:
|
||||
tasks.push(function(cb) {
|
||||
applyCreate(Model, id, current, update.data, update.change, conflicts, options, cb);
|
||||
});
|
||||
break;
|
||||
case Change.DELETE:
|
||||
tasks.push(function(cb) {
|
||||
applyDelete(Model, id, current, update.change, conflicts, options, cb);
|
||||
});
|
||||
break;
|
||||
}
|
||||
});
|
||||
|
||||
async.parallel(tasks, function(err) {
|
||||
if (err) return callback(err);
|
||||
if (conflicts.length) {
|
||||
err = new Error(g.f('Conflict'));
|
||||
err.statusCode = 409;
|
||||
err.details = {conflicts: conflicts};
|
||||
return callback(err);
|
||||
}
|
||||
callback();
|
||||
});
|
||||
});
|
||||
throwNotAttached(this.modelName, 'bulkUpdate');
|
||||
};
|
||||
|
||||
function buildLookupOfAffectedModelData(Model, updates, callback) {
|
||||
var idName = Model.dataSource.idName(Model.modelName);
|
||||
var affectedIds = updates.map(function(u) { return u.change.modelId; });
|
||||
var whereAffected = {};
|
||||
whereAffected[idName] = {inq: affectedIds};
|
||||
Model.find({where: whereAffected}, function(err, affectedList) {
|
||||
if (err) return callback(err);
|
||||
var dataLookup = {};
|
||||
affectedList.forEach(function(it) {
|
||||
dataLookup[it[idName]] = it;
|
||||
});
|
||||
callback(null, dataLookup);
|
||||
});
|
||||
}
|
||||
|
||||
function applyUpdate(Model, id, current, data, change, conflicts, options, cb) {
|
||||
var Change = Model.getChangeModel();
|
||||
var rev = current ? Change.revisionForInst(current) : null;
|
||||
|
||||
if (rev !== change.prev) {
|
||||
debug('Detected non-rectified change of %s %j',
|
||||
Model.modelName, id);
|
||||
debug('\tExpected revision: %s', change.rev);
|
||||
debug('\tActual revision: %s', rev);
|
||||
conflicts.push(change);
|
||||
return Change.rectifyModelChanges(Model.modelName, [id], cb);
|
||||
}
|
||||
|
||||
// TODO(bajtos) modify `data` so that it instructs
|
||||
// the connector to remove any properties included in "inst"
|
||||
// but not included in `data`
|
||||
// See https://github.com/strongloop/loopback/issues/1215
|
||||
|
||||
Model.updateAll(current.toObject(), data, options, function(err, result) {
|
||||
if (err) return cb(err);
|
||||
|
||||
var count = result && result.count;
|
||||
switch (count) {
|
||||
case 1:
|
||||
// The happy path, exactly one record was updated
|
||||
return cb();
|
||||
|
||||
case 0:
|
||||
debug('UpdateAll detected non-rectified change of %s %j',
|
||||
Model.modelName, id);
|
||||
conflicts.push(change);
|
||||
// NOTE(bajtos) updateAll triggers change rectification
|
||||
// for all model instances, even when no records were updated,
|
||||
// thus we don't need to rectify explicitly ourselves
|
||||
return cb();
|
||||
|
||||
case undefined:
|
||||
case null:
|
||||
return cb(new Error(
|
||||
g.f('Cannot apply bulk updates, ' +
|
||||
'the connector does not correctly report ' +
|
||||
'the number of updated records.')));
|
||||
|
||||
default:
|
||||
debug('%s.updateAll modified unexpected number of instances: %j',
|
||||
Model.modelName, count);
|
||||
return cb(new Error(
|
||||
g.f('Bulk update failed, the connector has modified unexpected ' +
|
||||
'number of records: %s', JSON.stringify(count))));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function applyCreate(Model, id, current, data, change, conflicts, options, cb) {
|
||||
Model.create(data, options, function(createErr) {
|
||||
if (!createErr) return cb();
|
||||
|
||||
// We don't have a reliable way how to detect the situation
|
||||
// where he model was not create because of a duplicate id
|
||||
// The workaround is to query the DB to check if the model already exists
|
||||
Model.findById(id, function(findErr, inst) {
|
||||
if (findErr || !inst) {
|
||||
// There isn't any instance with the same id, thus there isn't
|
||||
// any conflict and we just report back the original error.
|
||||
return cb(createErr);
|
||||
}
|
||||
|
||||
return conflict();
|
||||
});
|
||||
});
|
||||
|
||||
function conflict() {
|
||||
// The instance already exists - report a conflict
|
||||
debug('Detected non-rectified new instance of %s %j',
|
||||
Model.modelName, id);
|
||||
conflicts.push(change);
|
||||
|
||||
var Change = Model.getChangeModel();
|
||||
return Change.rectifyModelChanges(Model.modelName, [id], cb);
|
||||
}
|
||||
}
|
||||
|
||||
function applyDelete(Model, id, current, change, conflicts, options, cb) {
|
||||
if (!current) {
|
||||
// The instance was either already deleted or not created at all,
|
||||
// we are done.
|
||||
return cb();
|
||||
}
|
||||
|
||||
var Change = Model.getChangeModel();
|
||||
var rev = Change.revisionForInst(current);
|
||||
if (rev !== change.prev) {
|
||||
debug('Detected non-rectified change of %s %j',
|
||||
Model.modelName, id);
|
||||
debug('\tExpected revision: %s', change.rev);
|
||||
debug('\tActual revision: %s', rev);
|
||||
conflicts.push(change);
|
||||
return Change.rectifyModelChanges(Model.modelName, [id], cb);
|
||||
}
|
||||
|
||||
Model.deleteAll(current.toObject(), options, function(err, result) {
|
||||
if (err) return cb(err);
|
||||
|
||||
var count = result && result.count;
|
||||
switch (count) {
|
||||
case 1:
|
||||
// The happy path, exactly one record was updated
|
||||
return cb();
|
||||
|
||||
case 0:
|
||||
debug('DeleteAll detected non-rectified change of %s %j',
|
||||
Model.modelName, id);
|
||||
conflicts.push(change);
|
||||
// NOTE(bajtos) deleteAll triggers change rectification
|
||||
// for all model instances, even when no records were updated,
|
||||
// thus we don't need to rectify explicitly ourselves
|
||||
return cb();
|
||||
|
||||
case undefined:
|
||||
case null:
|
||||
return cb(new Error(
|
||||
g.f('Cannot apply bulk updates, ' +
|
||||
'the connector does not correctly report ' +
|
||||
'the number of deleted records.')));
|
||||
|
||||
default:
|
||||
debug('%s.deleteAll modified unexpected number of instances: %j',
|
||||
Model.modelName, count);
|
||||
return cb(new Error(
|
||||
g.f('Bulk update failed, the connector has deleted unexpected ' +
|
||||
'number of records: %s', JSON.stringify(count))));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the `Change` model.
|
||||
* Throws an error if the change model is not correctly setup.
|
||||
|
|
Loading…
Reference in New Issue