Merge pull request #1472 from lehni/feature/better-transactions
Add a better way to handle transactions by binding them to models
This commit is contained in:
commit
12c3e3aadb
157
lib/dao.js
157
lib/dao.js
|
@ -130,6 +130,45 @@ function errorModelNotFound(idValue) {
|
|||
return error;
|
||||
}
|
||||
|
||||
function invokeConnectorMethod(connector, method, Model, args, options, cb) {
|
||||
var dataSource = Model.getDataSource();
|
||||
// If the DataSource is a transaction and no transaction object is provide in
|
||||
// the options yet, add it to the options, see: DataSource#transaction()
|
||||
var opts = dataSource.isTransaction && !options.transaction ? Object.assign(
|
||||
options, {transaction: dataSource.currentTransaction}) : options;
|
||||
var optionsSupported = connector[method].length >= args.length + 3;
|
||||
var transaction = opts.transaction;
|
||||
if (transaction) {
|
||||
if (!optionsSupported) {
|
||||
return process.nextTick(function() {
|
||||
cb(new Error(g.f(
|
||||
'The connector does not support {{method}} within a transaction', method)));
|
||||
});
|
||||
}
|
||||
// transaction isn't always a Transaction instance. Some tests provide a
|
||||
// string to test if options get passed through, so check for ensureActive:
|
||||
if (transaction.ensureActive && !transaction.ensureActive(cb)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
var modelName = Model.modelName;
|
||||
var fullArgs;
|
||||
if (!optionsSupported && method === 'count') {
|
||||
// NOTE: The old count() signature is irregular, with `where` coming last:
|
||||
// [modelName, cb, where]
|
||||
var where = args[0];
|
||||
fullArgs = [modelName, cb, where];
|
||||
} else {
|
||||
// Standard signature: [modelName, ...args, (opts, ) cb]
|
||||
fullArgs = [modelName].concat(args);
|
||||
if (optionsSupported) {
|
||||
fullArgs.push(opts);
|
||||
}
|
||||
fullArgs.push(cb);
|
||||
}
|
||||
connector[method].apply(connector, fullArgs);
|
||||
}
|
||||
|
||||
DataAccessObject._forDB = function(data) {
|
||||
if (!(this.getDataSource().isRelational && this.getDataSource().isRelational())) {
|
||||
return data;
|
||||
|
@ -365,7 +404,6 @@ DataAccessObject.create = function(data, options, cb) {
|
|||
obj.trigger('create', function(createDone) {
|
||||
obj.trigger('save', function(saveDone) {
|
||||
var _idName = idName(Model);
|
||||
var modelName = Model.modelName;
|
||||
var val = removeUndefined(obj.toObject(true));
|
||||
function createCallback(err, id, rev) {
|
||||
if (id) {
|
||||
|
@ -431,12 +469,8 @@ DataAccessObject.create = function(data, options, cb) {
|
|||
};
|
||||
Model.notifyObserversOf('persist', context, function(err) {
|
||||
if (err) return cb(err);
|
||||
|
||||
if (connector.create.length === 4) {
|
||||
connector.create(modelName, obj.constructor._forDB(context.data), options, createCallback);
|
||||
} else {
|
||||
connector.create(modelName, obj.constructor._forDB(context.data), createCallback);
|
||||
}
|
||||
invokeConnectorMethod(connector, 'create', Model, [obj.constructor._forDB(context.data)],
|
||||
options, createCallback);
|
||||
});
|
||||
}, obj, cb);
|
||||
}, obj, cb);
|
||||
|
@ -579,8 +613,6 @@ DataAccessObject.upsert = function(data, options, cb) {
|
|||
Model.applyProperties(update, inst);
|
||||
Model = Model.lookupModel(update);
|
||||
|
||||
var connector = self.getConnector();
|
||||
|
||||
if (doValidate === false) {
|
||||
callConnector();
|
||||
} else {
|
||||
|
@ -611,11 +643,7 @@ DataAccessObject.upsert = function(data, options, cb) {
|
|||
};
|
||||
Model.notifyObserversOf('persist', context, function(err) {
|
||||
if (err) return done(err);
|
||||
if (connector.updateOrCreate.length === 4) {
|
||||
connector.updateOrCreate(Model.modelName, update, options, done);
|
||||
} else {
|
||||
connector.updateOrCreate(Model.modelName, update, done);
|
||||
}
|
||||
invokeConnectorMethod(connector, 'updateOrCreate', Model, [update], options, done);
|
||||
});
|
||||
}
|
||||
function done(err, data, info) {
|
||||
|
@ -728,7 +756,6 @@ DataAccessObject.upsertWithWhere = function(where, data, options, cb) {
|
|||
var self = this;
|
||||
var Model = this;
|
||||
var connector = Model.getConnector();
|
||||
var modelName = Model.modelName;
|
||||
var query = {where: where};
|
||||
var context = {
|
||||
Model: Model,
|
||||
|
@ -795,7 +822,7 @@ DataAccessObject.upsertWithWhere = function(where, data, options, cb) {
|
|||
};
|
||||
Model.notifyObserversOf('persist', context, function(err) {
|
||||
if (err) return done(err);
|
||||
connector.upsertWithWhere(modelName, ctx.where, update, options, done);
|
||||
invokeConnectorMethod(connector, 'upsertWithWhere', Model, [ctx.where, update], options, done);
|
||||
});
|
||||
}
|
||||
function done(err, data, info) {
|
||||
|
@ -944,8 +971,6 @@ DataAccessObject.replaceOrCreate = function replaceOrCreate(data, options, cb) {
|
|||
Model.applyProperties(update, inst);
|
||||
Model = Model.lookupModel(update);
|
||||
|
||||
var connector = self.getConnector();
|
||||
|
||||
if (options.validate === false) {
|
||||
return callConnector();
|
||||
}
|
||||
|
@ -972,7 +997,7 @@ DataAccessObject.replaceOrCreate = function replaceOrCreate(data, options, cb) {
|
|||
};
|
||||
Model.notifyObserversOf('persist', context, function(err) {
|
||||
if (err) return done(err);
|
||||
connector.replaceOrCreate(Model.modelName, context.data, options, done);
|
||||
invokeConnectorMethod(connector, 'replaceOrCreate', Model, [context.data], options, done);
|
||||
});
|
||||
}
|
||||
function done(err, data, info) {
|
||||
|
@ -1097,7 +1122,6 @@ DataAccessObject.findOrCreate = function findOrCreate(query, data, options, cb)
|
|||
var connector = Model.getConnector();
|
||||
|
||||
function _findOrCreate(query, data, currentInstance) {
|
||||
var modelName = self.modelName;
|
||||
function findOrCreateCallback(err, data, created) {
|
||||
if (err) return cb(err);
|
||||
var context = {
|
||||
|
@ -1155,12 +1179,8 @@ DataAccessObject.findOrCreate = function findOrCreate(query, data, options, cb)
|
|||
|
||||
Model.notifyObserversOf('persist', context, function(err) {
|
||||
if (err) return cb(err);
|
||||
|
||||
if (connector.findOrCreate.length === 5) {
|
||||
connector.findOrCreate(modelName, query, self._forDB(context.data), options, findOrCreateCallback);
|
||||
} else {
|
||||
connector.findOrCreate(modelName, query, self._forDB(context.data), findOrCreateCallback);
|
||||
}
|
||||
invokeConnectorMethod(connector, 'findOrCreate', Model, [query, self._forDB(context.data)],
|
||||
options, findOrCreateCallback);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -1776,7 +1796,7 @@ DataAccessObject._coerce = function(where, options) {
|
|||
}
|
||||
} else {
|
||||
if (val != null) {
|
||||
const allowExtendedOperators = self._allowExtendedOperators(options);
|
||||
var allowExtendedOperators = self._allowExtendedOperators(options);
|
||||
if (operator === null && val instanceof RegExp) {
|
||||
// Normalize {name: /A/} to {name: {regexp: /A/}}
|
||||
operator = 'regexp';
|
||||
|
@ -1952,8 +1972,8 @@ DataAccessObject.find = function find(query, options, cb) {
|
|||
cb(err);
|
||||
} else if (Array.isArray(data)) {
|
||||
memory.define({
|
||||
properties: self.dataSource.definitions[self.modelName].properties,
|
||||
settings: self.dataSource.definitions[self.modelName].settings,
|
||||
properties: self.dataSource.definitions[modelName].properties,
|
||||
settings: self.dataSource.definitions[modelName].settings,
|
||||
model: self,
|
||||
});
|
||||
|
||||
|
@ -1993,11 +2013,7 @@ DataAccessObject.find = function find(query, options, cb) {
|
|||
}
|
||||
|
||||
var geoCallback = options.notify === false ? geoCallbackWithoutNotify : geoCallbackWithNotify;
|
||||
if (connector.all.length === 4) {
|
||||
connector.all(self.modelName, {}, options, geoCallback);
|
||||
} else {
|
||||
connector.all(self.modelName, {}, geoCallback);
|
||||
}
|
||||
invokeConnectorMethod(connector, 'all', self, [{}], options, geoCallback);
|
||||
}
|
||||
// already handled
|
||||
return cb.promise;
|
||||
|
@ -2100,11 +2116,7 @@ DataAccessObject.find = function find(query, options, cb) {
|
|||
};
|
||||
|
||||
if (options.notify === false) {
|
||||
if (connector.all.length === 4) {
|
||||
connector.all(self.modelName, query, options, allCb);
|
||||
} else {
|
||||
connector.all(self.modelName, query, allCb);
|
||||
}
|
||||
invokeConnectorMethod(connector, 'all', self, [query], options, allCb);
|
||||
} else {
|
||||
var context = {
|
||||
Model: this,
|
||||
|
@ -2114,10 +2126,7 @@ DataAccessObject.find = function find(query, options, cb) {
|
|||
};
|
||||
this.notifyObserversOf('access', context, function(err, ctx) {
|
||||
if (err) return cb(err);
|
||||
|
||||
connector.all.length === 4 ?
|
||||
connector.all(self.modelName, ctx.query, options, allCb) :
|
||||
connector.all(self.modelName, ctx.query, allCb);
|
||||
invokeConnectorMethod(connector, 'all', self, [ctx.query], options, allCb);
|
||||
});
|
||||
}
|
||||
return cb.promise;
|
||||
|
@ -2258,11 +2267,7 @@ DataAccessObject.destroyAll = function destroyAll(where, options, cb) {
|
|||
};
|
||||
|
||||
if (whereIsEmpty(where)) {
|
||||
if (connector.destroyAll.length === 4) {
|
||||
connector.destroyAll(Model.modelName, {}, options, done);
|
||||
} else {
|
||||
connector.destroyAll(Model.modelName, {}, done);
|
||||
}
|
||||
invokeConnectorMethod(connector, 'destroyAll', Model, [{}], options, done);
|
||||
} else {
|
||||
try {
|
||||
// Support an optional where object
|
||||
|
@ -2276,11 +2281,7 @@ DataAccessObject.destroyAll = function destroyAll(where, options, cb) {
|
|||
});
|
||||
}
|
||||
|
||||
if (connector.destroyAll.length === 4) {
|
||||
connector.destroyAll(Model.modelName, where, options, done);
|
||||
} else {
|
||||
connector.destroyAll(Model.modelName, where, done);
|
||||
}
|
||||
invokeConnectorMethod(connector, 'destroyAll', Model, [where], options, done);
|
||||
}
|
||||
|
||||
function done(err, info) {
|
||||
|
@ -2444,17 +2445,7 @@ DataAccessObject.count = function(where, options, cb) {
|
|||
};
|
||||
this.notifyObserversOf('access', context, function(err, ctx) {
|
||||
if (err) return cb(err);
|
||||
where = ctx.query.where;
|
||||
|
||||
if (connector.count.length <= 3) {
|
||||
// Old signature, please note where is the last
|
||||
// count(model, cb, where)
|
||||
connector.count(Model.modelName, cb, where);
|
||||
} else {
|
||||
// New signature
|
||||
// count(model, where, options, cb)
|
||||
connector.count(Model.modelName, where, options, cb);
|
||||
}
|
||||
invokeConnectorMethod(connector, 'count', Model, [ctx.query.where], options, cb);
|
||||
});
|
||||
return cb.promise;
|
||||
};
|
||||
|
@ -2507,7 +2498,6 @@ DataAccessObject.prototype.save = function(options, cb) {
|
|||
|
||||
var inst = this;
|
||||
var connector = inst.getConnector();
|
||||
var modelName = Model.modelName;
|
||||
|
||||
var context = {
|
||||
Model: Model,
|
||||
|
@ -2591,12 +2581,8 @@ DataAccessObject.prototype.save = function(options, cb) {
|
|||
|
||||
Model.notifyObserversOf('persist', context, function(err) {
|
||||
if (err) return cb(err);
|
||||
|
||||
if (connector.save.length === 4) {
|
||||
connector.save(modelName, inst.constructor._forDB(data), options, saveCallback);
|
||||
} else {
|
||||
connector.save(modelName, inst.constructor._forDB(data), saveCallback);
|
||||
}
|
||||
invokeConnectorMethod(connector, 'save', Model, [Model._forDB(data)],
|
||||
options, saveCallback);
|
||||
});
|
||||
}, data, cb);
|
||||
}, data, cb);
|
||||
|
@ -2770,12 +2756,7 @@ DataAccessObject.updateAll = function(where, data, options, cb) {
|
|||
};
|
||||
Model.notifyObserversOf('persist', context, function(err, ctx) {
|
||||
if (err) return cb(err);
|
||||
|
||||
if (connector.update.length === 5) {
|
||||
connector.update(Model.modelName, where, data, options, updateCallback);
|
||||
} else {
|
||||
connector.update(Model.modelName, where, data, updateCallback);
|
||||
}
|
||||
invokeConnectorMethod(connector, 'update', Model, [where, data], options, updateCallback);
|
||||
});
|
||||
}
|
||||
return cb.promise;
|
||||
|
@ -2907,11 +2888,7 @@ DataAccessObject.prototype.remove =
|
|||
});
|
||||
}
|
||||
|
||||
if (connector.destroy.length === 4) {
|
||||
connector.destroy(inst.constructor.modelName, id, options, destroyCallback);
|
||||
} else {
|
||||
connector.destroy(inst.constructor.modelName, id, destroyCallback);
|
||||
}
|
||||
invokeConnectorMethod(connector, 'destroy', Model, [id], options, destroyCallback);
|
||||
}, null, cb);
|
||||
}
|
||||
return cb.promise;
|
||||
|
@ -3040,7 +3017,6 @@ DataAccessObject.replaceById = function(id, data, options, cb) {
|
|||
if (isPKMissing(Model, cb))
|
||||
return cb.promise;
|
||||
|
||||
var model = Model.modelName;
|
||||
var hookState = {};
|
||||
|
||||
if (id !== data[pkName]) {
|
||||
|
@ -3155,8 +3131,8 @@ DataAccessObject.replaceById = function(id, data, options, cb) {
|
|||
options: options,
|
||||
};
|
||||
Model.notifyObserversOf('persist', ctx, function(err) {
|
||||
connector.replaceById(model, id,
|
||||
inst.constructor._forDB(context.data), options, replaceCallback);
|
||||
invokeConnectorMethod(connector, 'replaceById', Model, [id, Model._forDB(context.data)],
|
||||
options, replaceCallback);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -3216,7 +3192,6 @@ function(data, options, cb) {
|
|||
|
||||
var allowExtendedOperators = Model._allowExtendedOperators(options);
|
||||
var strict = this.__strict;
|
||||
var model = Model.modelName;
|
||||
var hookState = {};
|
||||
|
||||
// Convert the data to be plain object so that update won't be confused
|
||||
|
@ -3352,13 +3327,9 @@ function(data, options, cb) {
|
|||
options: options,
|
||||
};
|
||||
Model.notifyObserversOf('persist', ctx, function(err) {
|
||||
if (connector.updateAttributes.length === 5) {
|
||||
connector.updateAttributes(model, getIdValue(inst.constructor, inst),
|
||||
inst.constructor._forDB(context.data), options, updateAttributesCallback);
|
||||
} else {
|
||||
connector.updateAttributes(model, getIdValue(inst.constructor, inst),
|
||||
inst.constructor._forDB(context.data), updateAttributesCallback);
|
||||
}
|
||||
invokeConnectorMethod(connector, 'updateAttributes', Model,
|
||||
[getIdValue(Model, inst), Model._forDB(context.data)],
|
||||
options, updateAttributesCallback);
|
||||
});
|
||||
}, data, cb);
|
||||
}, data, cb);
|
||||
|
|
|
@ -27,6 +27,7 @@ var traverse = require('traverse');
|
|||
var g = require('strong-globalize')();
|
||||
var juggler = require('..');
|
||||
var deprecated = require('depd')('loopback-datasource-juggler');
|
||||
var Transaction = require('loopback-connector').Transaction;
|
||||
|
||||
if (process.env.DEBUG === 'loopback') {
|
||||
// For back-compatibility
|
||||
|
@ -2102,10 +2103,51 @@ DataSource.prototype.copyModel = function copyModel(Master) {
|
|||
|
||||
/**
|
||||
* Run a transaction against the DataSource.
|
||||
* @returns {EventEmitter}
|
||||
* @private
|
||||
*
|
||||
* This method can be used in different ways based on the passed arguments and
|
||||
* type of underlying data source:
|
||||
*
|
||||
* If no `execute()` function is provided and the underlying DataSource is a
|
||||
* database that supports transactions, a Promise is returned that resolves to
|
||||
* an EventEmitter representing the transaction once it is ready.
|
||||
* `transaction.models` can be used to receive versions of the DataSource's
|
||||
* model classes which are bound to the created transaction, so that all their
|
||||
* database methods automatically use the transaction. At the end of all
|
||||
* database transactions, `transaction.commit()` can be called to commit the
|
||||
* transactions, or `transaction.rollback()` to roll them back.
|
||||
*
|
||||
* If no `execute()` function is provided on a transient or memory DataSource,
|
||||
* the EventEmitter representing the transaction is returned immediately. For
|
||||
* backward compatibility, this object also supports `transaction.exec()`
|
||||
* instead of `transaction.commit()`, and calling `transaction.rollback()` is
|
||||
* not required on such transient and memory DataSource instances.
|
||||
*
|
||||
* If an `execute()` function is provided, then it is called as soon as the
|
||||
* transaction is ready, receiving `transaction.models` as its first
|
||||
* argument. `transaction.commit()` and `transaction.rollback()` are then
|
||||
* automatically called at the end of `execute()`, based on whether exceptions
|
||||
* happen during execution or not. If no callback is provided to be called at
|
||||
* the end of the execution, a Promise object is returned that is resolved or
|
||||
* rejected as soon as the execution is completed, and the transaction is
|
||||
* committed or rolled back.
|
||||
*
|
||||
* @param {Function} execute The execute function, called with (models). Note
|
||||
* that the instances in `models` are bound to the created transaction, and
|
||||
* are therefore not identical with the models in `app.models`, but slaves
|
||||
* thereof (optional).
|
||||
* @options {Object} options The options to be passed to `beginTransaction()`
|
||||
* when creating the transaction on database sources (optional).
|
||||
* @param {Function} cb Callback called with (err)
|
||||
* @returns {Promise | EventEmitter}
|
||||
*/
|
||||
DataSource.prototype.transaction = function() {
|
||||
DataSource.prototype.transaction = function(execute, options, cb) {
|
||||
if (cb === undefined && typeof options === 'function') {
|
||||
cb = options;
|
||||
options = {};
|
||||
} else {
|
||||
options = options || {};
|
||||
}
|
||||
|
||||
var dataSource = this;
|
||||
var transaction = new EventEmitter();
|
||||
|
||||
|
@ -2115,26 +2157,133 @@ DataSource.prototype.transaction = function() {
|
|||
|
||||
transaction.isTransaction = true;
|
||||
transaction.origin = dataSource;
|
||||
transaction.name = dataSource.name;
|
||||
transaction.settings = dataSource.settings;
|
||||
transaction.connected = false;
|
||||
transaction.connecting = false;
|
||||
transaction.connector = dataSource.connector.transaction();
|
||||
|
||||
// create blank models pool
|
||||
transaction.modelBuilder = new ModelBuilder();
|
||||
transaction.models = transaction.modelBuilder.models;
|
||||
transaction.definitions = transaction.modelBuilder.definitions;
|
||||
|
||||
for (var i in dataSource.modelBuilder.models) {
|
||||
dataSource.copyModel.call(transaction, dataSource.modelBuilder.models[i]);
|
||||
}
|
||||
|
||||
transaction.exec = function(cb) {
|
||||
transaction.connector.exec(cb);
|
||||
// Don't allow creating transactions on a transaction data-source:
|
||||
transaction.transaction = function() {
|
||||
throw new Error(g.f('Nesting transactions is not supported'));
|
||||
};
|
||||
|
||||
// Create a blank pool for the slave models bound to this transaction.
|
||||
var modelBuilder = new ModelBuilder();
|
||||
var slaveModels = modelBuilder.models;
|
||||
transaction.modelBuilder = modelBuilder;
|
||||
transaction.models = slaveModels;
|
||||
transaction.definitions = modelBuilder.definitions;
|
||||
|
||||
// For performance reasons, use a getter per model and only call copyModel()
|
||||
// for the models that are actually used. These getters are then replaced
|
||||
// with the actual values on first use.
|
||||
var masterModels = dataSource.modelBuilder.models;
|
||||
Object.keys(masterModels).forEach(function(name) {
|
||||
Object.defineProperty(slaveModels, name, {
|
||||
enumerable: true,
|
||||
configurable: true,
|
||||
get: function() {
|
||||
// Delete getter so copyModel() can redefine slaveModels[name].
|
||||
// NOTE: No need to set the new value as copyModel() takes care of it.
|
||||
delete slaveModels[name];
|
||||
return dataSource.copyModel.call(transaction, masterModels[name]);
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
var done = function(err) {
|
||||
if (err) {
|
||||
transaction.rollback(function(error) {
|
||||
cb(err || error);
|
||||
});
|
||||
} else {
|
||||
transaction.commit(cb);
|
||||
}
|
||||
// Make sure cb() isn't called twice, e.g. if `execute` returns a
|
||||
// thenable object and also calls the passed `cb` function.
|
||||
done = function() {};
|
||||
};
|
||||
|
||||
function handleExecute() {
|
||||
if (execute) {
|
||||
cb = cb || utils.createPromiseCallback();
|
||||
try {
|
||||
var result = execute(slaveModels, done);
|
||||
if (result && typeof result.then === 'function') {
|
||||
result.then(function() { done(); }, done);
|
||||
}
|
||||
} catch (err) {
|
||||
done(err);
|
||||
}
|
||||
return cb.promise;
|
||||
} else if (cb) {
|
||||
cb(null, transaction);
|
||||
} else {
|
||||
return transaction;
|
||||
}
|
||||
}
|
||||
|
||||
function transactionCreated(err, tx) {
|
||||
if (err) {
|
||||
cb(err);
|
||||
} else {
|
||||
// Expose transaction on the created transaction dataSource so it can be
|
||||
// retrieved again in determineOptions() in dao.js, as well as referenced
|
||||
// in transaction.commit() and transaction.rollback() below.
|
||||
transaction.currentTransaction = tx;
|
||||
// Handle timeout and pass it on as an error.
|
||||
tx.observe('timeout', function(context, next) {
|
||||
const err = new Error(g.f('Transaction is rolled back due to timeout'));
|
||||
err.code = 'TRANSACTION_TIMEOUT';
|
||||
// Pass on the error to next(), so that the final 'timeout' observer in
|
||||
// loopback-connector does not trigger a rollback by itself that we
|
||||
// can't get a callback for when it's finished.
|
||||
next(err);
|
||||
// Call done(err) after, to execute the rollback here and reject the
|
||||
// promise with the error when it's completed.
|
||||
done(err);
|
||||
});
|
||||
handleExecute();
|
||||
}
|
||||
}
|
||||
|
||||
function ensureTransaction(transaction, cb) {
|
||||
if (!transaction) {
|
||||
process.nextTick(function() {
|
||||
cb(new Error(g.f(
|
||||
'Transaction is not ready, wait for the returned promise to resolve')));
|
||||
});
|
||||
}
|
||||
return transaction;
|
||||
}
|
||||
|
||||
var connector = dataSource.connector;
|
||||
if (connector.transaction) {
|
||||
// Create a transient or memory source transaction.
|
||||
transaction.connector = connector.transaction();
|
||||
transaction.commit =
|
||||
transaction.exec = function(cb) {
|
||||
this.connector.exec(cb);
|
||||
};
|
||||
transaction.rollback = function(cb) {
|
||||
// No need to do anything here.
|
||||
cb();
|
||||
};
|
||||
return handleExecute();
|
||||
} else if (connector.beginTransaction) {
|
||||
// Create a database source transaction.
|
||||
transaction.exec =
|
||||
transaction.commit = function(cb) {
|
||||
ensureTransaction(this.currentTransaction, cb).commit(cb);
|
||||
};
|
||||
transaction.rollback = function(cb) {
|
||||
ensureTransaction(this.currentTransaction, cb).rollback(cb);
|
||||
};
|
||||
// Always use callback / promise due to the use of beginTransaction()
|
||||
cb = cb || utils.createPromiseCallback();
|
||||
Transaction.begin(connector, options, transactionCreated);
|
||||
return cb.promise;
|
||||
} else {
|
||||
throw new Error(g.f('DataSource does not support transactions'));
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
|
@ -134,31 +134,24 @@ if (Transaction) {
|
|||
* @returns {Promise|undefined} Returns a callback promise.
|
||||
*/
|
||||
Transaction.prototype.commit = function(cb) {
|
||||
var self = this;
|
||||
cb = cb || utils.createPromiseCallback();
|
||||
// Report an error if the transaction is not active
|
||||
if (!self.connection) {
|
||||
process.nextTick(function() {
|
||||
cb(new Error(g.f('The {{transaction}} is not active: %s', self.id)));
|
||||
});
|
||||
return cb.promise;
|
||||
}
|
||||
if (this.ensureActive(cb)) {
|
||||
var context = {
|
||||
transaction: self,
|
||||
transaction: this,
|
||||
operation: 'commit',
|
||||
};
|
||||
|
||||
function work(done) {
|
||||
self.connector.commit(self.connection, done);
|
||||
}
|
||||
|
||||
self.notifyObserversAround('commit', context, work, function(err) {
|
||||
this.notifyObserversAround('commit', context,
|
||||
done => {
|
||||
this.connector.commit(this.connection, done);
|
||||
},
|
||||
err => {
|
||||
// Deference the connection to mark the transaction is not active
|
||||
// The connection should have been released back the pool
|
||||
self.connection = null;
|
||||
this.connection = null;
|
||||
cb(err);
|
||||
});
|
||||
|
||||
}
|
||||
);
|
||||
}
|
||||
return cb.promise;
|
||||
};
|
||||
|
||||
|
@ -181,34 +174,37 @@ if (Transaction) {
|
|||
* @returns {Promise|undefined} Returns a callback promise.
|
||||
*/
|
||||
Transaction.prototype.rollback = function(cb) {
|
||||
var self = this;
|
||||
cb = cb || utils.createPromiseCallback();
|
||||
// Report an error if the transaction is not active
|
||||
if (!self.connection) {
|
||||
process.nextTick(function() {
|
||||
cb(new Error(g.f('The {{transaction}} is not active: %s', self.id)));
|
||||
});
|
||||
return cb.promise;
|
||||
}
|
||||
if (this.ensureActive(cb)) {
|
||||
var context = {
|
||||
transaction: self,
|
||||
transaction: this,
|
||||
operation: 'rollback',
|
||||
};
|
||||
|
||||
function work(done) {
|
||||
self.connector.rollback(self.connection, done);
|
||||
}
|
||||
|
||||
self.notifyObserversAround('rollback', context, work, function(err) {
|
||||
this.notifyObserversAround('rollback', context,
|
||||
done => {
|
||||
this.connector.rollback(this.connection, done);
|
||||
},
|
||||
err => {
|
||||
// Deference the connection to mark the transaction is not active
|
||||
// The connection should have been released back the pool
|
||||
self.connection = null;
|
||||
this.connection = null;
|
||||
cb(err);
|
||||
});
|
||||
|
||||
}
|
||||
);
|
||||
}
|
||||
return cb.promise;
|
||||
};
|
||||
|
||||
Transaction.prototype.ensureActive = function(cb) {
|
||||
// Report an error if the transaction is not active
|
||||
if (!this.connection) {
|
||||
process.nextTick(() => {
|
||||
cb(new Error(g.f('The {{transaction}} is not active: %s', this.id)));
|
||||
});
|
||||
}
|
||||
return !!this.connection;
|
||||
};
|
||||
|
||||
Transaction.prototype.toJSON = function() {
|
||||
return this.id;
|
||||
};
|
||||
|
|
|
@ -48,7 +48,7 @@
|
|||
"depd": "^1.0.0",
|
||||
"inflection": "^1.6.0",
|
||||
"lodash": "^4.17.4",
|
||||
"loopback-connector": "^4.0.0",
|
||||
"loopback-connector": "^4.3.0",
|
||||
"minimatch": "^3.0.3",
|
||||
"qs": "^6.5.0",
|
||||
"shortid": "^2.2.6",
|
||||
|
|
|
@ -0,0 +1,225 @@
|
|||
// Copyright IBM Corp. 2017. All Rights Reserved.
|
||||
// Node module: loopback-datasource-juggler
|
||||
// This file is licensed under the MIT License.
|
||||
// License text available at https://opensource.org/licenses/MIT
|
||||
|
||||
'use strict';
|
||||
/* global getSchema:false */
|
||||
const DataSource = require('..').DataSource;
|
||||
const EventEmitter = require('events');
|
||||
const Connector = require('loopback-connector').Connector;
|
||||
const Transaction = require('loopback-connector').Transaction;
|
||||
const should = require('./init.js');
|
||||
|
||||
describe('Transactions on memory connector', function() {
|
||||
let db, tx;
|
||||
|
||||
before(() => {
|
||||
db = getSchema();
|
||||
db.define('Model');
|
||||
});
|
||||
|
||||
it('returns an EventEmitter object', done => {
|
||||
tx = db.transaction();
|
||||
tx.should.be.instanceOf(EventEmitter);
|
||||
done();
|
||||
});
|
||||
|
||||
it('exposes and caches slave models', done => {
|
||||
testModelCaching(tx.models, db.models);
|
||||
done();
|
||||
});
|
||||
|
||||
it('changes count when committing', done => {
|
||||
db.models.Model.count((err, count) => {
|
||||
should.not.exist(err);
|
||||
should.exist(count);
|
||||
count.should.equal(0);
|
||||
tx.models.Model.create(Array(1), () => {
|
||||
// Only called after tx.commit()!
|
||||
});
|
||||
tx.commit(err => {
|
||||
should.not.exist(err);
|
||||
db.models.Model.count((err, count) => {
|
||||
should.not.exist(err);
|
||||
should.exist(count);
|
||||
count.should.equal(1);
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('Transactions on test connector without execute()', () => {
|
||||
let db, tx;
|
||||
|
||||
before(() => {
|
||||
db = createDataSource();
|
||||
});
|
||||
|
||||
beforeEach(resetState);
|
||||
|
||||
it('resolves to an EventEmitter', done => {
|
||||
const promise = db.transaction();
|
||||
promise.should.be.Promise();
|
||||
promise.then(transaction => {
|
||||
should.exist(transaction);
|
||||
transaction.should.be.instanceof(EventEmitter);
|
||||
tx = transaction;
|
||||
done();
|
||||
}, done);
|
||||
});
|
||||
|
||||
it('exposes and caches slave models', done => {
|
||||
testModelCaching(tx.models, db.models);
|
||||
done();
|
||||
});
|
||||
|
||||
it('does not allow nesting of transactions', done => {
|
||||
(() => tx.transaction()).should.throw('Nesting transactions is not supported');
|
||||
done();
|
||||
});
|
||||
|
||||
it('calls commit() on the connector', done => {
|
||||
db.transaction().then(tx => {
|
||||
tx.commit(err => {
|
||||
callCount.should.deepEqual({commit: 1, rollback: 0, create: 0});
|
||||
done(err);
|
||||
});
|
||||
}, done);
|
||||
});
|
||||
|
||||
it('calls rollback() on the connector', done => {
|
||||
db.transaction().then(tx => {
|
||||
tx.rollback(err => {
|
||||
callCount.should.deepEqual({commit: 0, rollback: 1, create: 0});
|
||||
done(err);
|
||||
});
|
||||
}, done);
|
||||
});
|
||||
});
|
||||
|
||||
describe('Transactions on test connector with execute()', () => {
|
||||
let db;
|
||||
|
||||
before(() => {
|
||||
db = createDataSource();
|
||||
});
|
||||
|
||||
beforeEach(resetState);
|
||||
|
||||
it('passes models and calls commit() automatically', done => {
|
||||
db.transaction(models => {
|
||||
testModelCaching(models, db.models);
|
||||
return models.Model.create({});
|
||||
}, err => {
|
||||
callCount.should.deepEqual({commit: 1, rollback: 0, create: 1});
|
||||
transactionPassed.should.be.true();
|
||||
done(err);
|
||||
});
|
||||
});
|
||||
|
||||
it('calls rollback() automatically when throwing an error', done => {
|
||||
let error;
|
||||
db.transaction(models => {
|
||||
error = new Error('exception');
|
||||
throw error;
|
||||
}, err => {
|
||||
error.should.equal(err);
|
||||
callCount.should.deepEqual({commit: 0, rollback: 1, create: 0});
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('reports execution timeouts', done => {
|
||||
let timedOut = false;
|
||||
db.transaction(models => {
|
||||
setTimeout(() => {
|
||||
models.Model.create({}, function(err) {
|
||||
if (!timedOut) {
|
||||
done(new Error('Timeout was ineffective'));
|
||||
} else {
|
||||
should.exist(err);
|
||||
err.message.should.startWith('The transaction is not active:');
|
||||
done();
|
||||
}
|
||||
});
|
||||
}, 50);
|
||||
}, {
|
||||
timeout: 25,
|
||||
}, err => {
|
||||
timedOut = true;
|
||||
should.exist(err);
|
||||
err.code.should.equal('TRANSACTION_TIMEOUT');
|
||||
err.message.should.equal('Transaction is rolled back due to timeout');
|
||||
callCount.should.deepEqual({commit: 0, rollback: 1, create: 0});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
function createDataSource() {
|
||||
let db = new DataSource({
|
||||
initialize: (dataSource, cb) => {
|
||||
dataSource.connector = new TestConnector();
|
||||
cb();
|
||||
},
|
||||
});
|
||||
db.define('Model');
|
||||
return db;
|
||||
}
|
||||
|
||||
function testModelCaching(txModels, dbModels) {
|
||||
should.exist(txModels);
|
||||
// Test models caching mechanism:
|
||||
// Model property should be a accessor with a getter first:
|
||||
const accessor = Object.getOwnPropertyDescriptor(txModels, 'Model');
|
||||
should.exist(accessor);
|
||||
should.exist(accessor.get);
|
||||
accessor.get.should.be.Function();
|
||||
const Model = txModels.Model;
|
||||
should.exist(Model);
|
||||
// After accessing it once, it should be a normal cached property:
|
||||
const desc = Object.getOwnPropertyDescriptor(txModels, 'Model');
|
||||
should.exist(desc.value);
|
||||
Model.should.equal(txModels.Model);
|
||||
Model.prototype.should.be.instanceof(dbModels.Model);
|
||||
}
|
||||
|
||||
let callCount;
|
||||
let transactionPassed;
|
||||
|
||||
function resetState() {
|
||||
callCount = {commit: 0, rollback: 0, create: 0};
|
||||
transactionPassed = false;
|
||||
}
|
||||
|
||||
class TestConnector extends Connector {
|
||||
constructor() {
|
||||
super('test');
|
||||
}
|
||||
|
||||
beginTransaction(isolationLevel, cb) {
|
||||
this.currentTransaction = new Transaction(this, this);
|
||||
process.nextTick(() => cb(null, this.currentTransaction));
|
||||
}
|
||||
|
||||
commit(tx, cb) {
|
||||
callCount.commit++;
|
||||
cb();
|
||||
}
|
||||
|
||||
rollback(tx, cb) {
|
||||
callCount.rollback++;
|
||||
cb();
|
||||
}
|
||||
|
||||
create(model, data, options, cb) {
|
||||
callCount.create++;
|
||||
const transaction = options.transaction;
|
||||
const current = this.currentTransaction;
|
||||
transactionPassed = transaction &&
|
||||
(current === transaction || current === transaction.connection);
|
||||
cb();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue