Merge pull request #591 from strongloop/feature/transaction

Add transaction support
This commit is contained in:
Raymond Feng 2015-05-18 14:18:43 -07:00
commit 48e5f63dc0
11 changed files with 868 additions and 339 deletions

View File

@ -7,6 +7,8 @@
"lib/include.js",
"lib/model-builder.js",
"lib/relations.js",
"lib/observer.js",
"lib/transaction.js",
"lib/validations.js"
],
"codeSectionDepth": 4,

View File

@ -12,3 +12,5 @@ var commonTest = './test/common_test';
Object.defineProperty(exports, 'test', {
get: function() {return require(commonTest);}
});
exports.Transaction = require('loopback-connector').Transaction;

View File

@ -366,7 +366,7 @@ Memory.prototype.all = function all(model, filter, options, callback) {
process.nextTick(function () {
if (filter && filter.include) {
self._models[model].model.include(nodes, filter.include, callback);
self._models[model].model.include(nodes, filter.include, options, callback);
} else {
callback(null, nodes);
}

View File

@ -2068,10 +2068,10 @@ DataAccessObject.prototype.setAttribute = function setAttribute(name, value) {
* @param {Mixed} value Value of property
* @param {Function} cb Callback function called with (err, instance)
*/
DataAccessObject.prototype.updateAttribute = function updateAttribute(name, value, cb) {
DataAccessObject.prototype.updateAttribute = function updateAttribute(name, value, options, cb) {
var data = {};
data[name] = value;
return this.updateAttributes(data, cb);
return this.updateAttributes(data, options, cb);
};
/**
@ -2324,3 +2324,8 @@ jutil.mixin(DataAccessObject, Inclusion);
* Add 'relation'
*/
jutil.mixin(DataAccessObject, Relation);
/*
* Add 'transaction'
*/
jutil.mixin(DataAccessObject, require('./transaction'));

View File

@ -2,7 +2,6 @@ var async = require('async');
var utils = require('./utils');
var isPlainObject = utils.isPlainObject;
var defineCachedRelations = utils.defineCachedRelations;
var debug = require('debug')('loopback:include');
/*!
* Normalize the include to be an array
@ -145,11 +144,15 @@ Inclusion.normalizeInclude = normalizeInclude;
*
* @param {Array} objects Array of instances
* @param {String|Object|Array} include Which relations to load.
* @param {Object} [options] Options for CRUD
* @param {Function} cb Callback called when relations are loaded
*
*/
Inclusion.include = function (objects, include, cb) {
debug('include', include);
Inclusion.include = function (objects, include, options, cb) {
if (typeof options === 'function' && cb === undefined) {
cb = options;
options = {};
}
var self = this;
if (!include || (Array.isArray(include) && include.length === 0) ||
@ -163,12 +166,12 @@ Inclusion.include = function (objects, include, cb) {
include = normalizeInclude(include);
async.each(include, function(item, callback) {
processIncludeItem(objects, item, callback);
processIncludeItem(objects, item, options, callback);
}, function(err) {
cb && cb(err, objects);
});
function processIncludeItem(objs, include, cb) {
function processIncludeItem(objs, include, options, cb) {
var relations = self.relations;
var relationName;
@ -214,8 +217,7 @@ Inclusion.include = function (objects, include, cb) {
// Just skip if inclusion is disabled
if (relation.options.disableInclude) {
cb();
return;
return cb();
}
//prepare filter and fields for making DB Call
var filter = (scope && scope.conditions()) || {};
@ -372,7 +374,7 @@ Inclusion.include = function (objects, include, cb) {
//process.
if (subInclude && targets) {
tasks.push(function subIncludesTask(next) {
relation.modelTo.include(targets, subInclude, next);
relation.modelTo.include(targets, subInclude, options, next);
});
}
//process & link each target with object
@ -450,7 +452,7 @@ Inclusion.include = function (objects, include, cb) {
//simultaneously process subIncludes
if (subInclude && targets) {
tasks.push(function subIncludesTask(next) {
relation.modelTo.include(targets, subInclude, next);
relation.modelTo.include(targets, subInclude, options, next);
});
}
//process each target object
@ -511,7 +513,7 @@ Inclusion.include = function (objects, include, cb) {
//simultaneously process subIncludes
if (subInclude && targets) {
tasks.push(function subIncludesTask(next) {
relation.modelTo.include(targets, subInclude, next);
relation.modelTo.include(targets, subInclude, options, next);
});
}
//process each target object
@ -597,7 +599,7 @@ Inclusion.include = function (objects, include, cb) {
//simultaneously process subIncludes
if (subInclude && targets) {
tasks.push(function subIncludesTask(next) {
Model.include(targets, subInclude, next);
Model.include(targets, subInclude, options, next);
});
}
//process each target object
@ -665,7 +667,7 @@ Inclusion.include = function (objects, include, cb) {
//simultaneously process subIncludes
if (subInclude && targets) {
tasks.push(function subIncludesTask(next) {
relation.modelTo.include(targets, subInclude, next);
relation.modelTo.include(targets, subInclude, options, next);
});
}
//process each target object
@ -769,10 +771,10 @@ Inclusion.include = function (objects, include, cb) {
related = inst[relationName].bind(inst, filter);
} else {
related = inst[relationName].bind(inst);
related = inst[relationName].bind(inst, undefined);
}
related(function (err, result) {
related(options, function (err, result) {
if (err) {
return callback(err);
} else {

View File

@ -604,87 +604,8 @@ ModelBaseClass.prototype.setStrict = function (strict) {
this.__strict = strict;
};
/**
* Register an asynchronous observer for the given operation (event).
* @param {String} operation The operation name.
* @callback {function} listener The listener function. It will be invoked with
* `this` set to the model constructor, e.g. `User`.
* @param {Object} context Operation-specific context.
* @param {function(Error=)} next The callback to call when the observer
* has finished.
* @end
*/
ModelBaseClass.observe = function(operation, listener) {
if (!this._observers[operation]) {
this._observers[operation] = [];
}
this._observers[operation].push(listener);
};
/**
* Unregister an asynchronous observer for the given operation (event).
* @param {String} operation The operation name.
* @callback {function} listener The listener function.
* @end
*/
ModelBaseClass.removeObserver = function(operation, listener) {
if (!this._observers[operation]) return;
var index = this._observers[operation].indexOf(listener);
if (index != -1) this._observers[operation].splice(index, 1);
};
/**
* Unregister all asynchronous observers for the given operation (event).
* @param {String} operation The operation name.
* @end
*/
ModelBaseClass.clearObservers = function(operation) {
if (!this._observers[operation]) return;
this._observers[operation].length = 0;
};
/**
* Invoke all async observers for the given operation.
* @param {String} operation The operation name.
* @param {Object} context Operation-specific context.
* @param {function(Error=)} callback The callback to call when all observers
* has finished.
*/
ModelBaseClass.notifyObserversOf = function(operation, context, callback) {
var observers = this._observers && this._observers[operation];
if (!callback) callback = utils.createPromiseCallback();
this._notifyBaseObservers(operation, context, function doNotify(err) {
if (err) return callback(err, context);
if (!observers || !observers.length) return callback(null, context);
async.eachSeries(
observers,
function notifySingleObserver(fn, next) {
var retval = fn(context, next);
if (retval && typeof retval.then === 'function') {
retval.then(
function() { next(); },
next // error handler
);
}
},
function(err) { callback(err, context) }
);
});
return callback.promise;
}
ModelBaseClass._notifyBaseObservers = function(operation, context, callback) {
if (this.base && this.base.notifyObserversOf)
this.base.notifyObserversOf(operation, context, callback);
else
callback();
}
// Mixin observer
jutil.mixin(ModelBaseClass, require('./observer'));
jutil.mixin(ModelBaseClass, Hookable);
jutil.mixin(ModelBaseClass, validations.Validatable);

98
lib/observer.js Normal file
View File

@ -0,0 +1,98 @@
var async = require('async');
var utils = require('./utils');
module.exports = ObserverMixin;
/**
* ObserverMixin class. Use to add observe/notifyObserversOf APIs to other
* classes.
*
* @class ObserverMixin
*/
function ObserverMixin() {
}
/**
* Register an asynchronous observer for the given operation (event).
* @param {String} operation The operation name.
* @callback {function} listener The listener function. It will be invoked with
* `this` set to the model constructor, e.g. `User`.
* @param {Object} context Operation-specific context.
* @param {function(Error=)} next The callback to call when the observer
* has finished.
* @end
*/
ObserverMixin.observe = function(operation, listener) {
this._observers = this._observers || {};
if (!this._observers[operation]) {
this._observers[operation] = [];
}
this._observers[operation].push(listener);
};
/**
* Unregister an asynchronous observer for the given operation (event).
* @param {String} operation The operation name.
* @callback {function} listener The listener function.
* @end
*/
ObserverMixin.removeObserver = function(operation, listener) {
if (!(this._observers && this._observers[operation])) return;
var index = this._observers[operation].indexOf(listener);
if (index !== -1) {
return this._observers[operation].splice(index, 1);
}
};
/**
* Unregister all asynchronous observers for the given operation (event).
* @param {String} operation The operation name.
* @end
*/
ObserverMixin.clearObservers = function(operation) {
if (!(this._observers && this._observers[operation])) return;
this._observers[operation].length = 0;
};
/**
* Invoke all async observers for the given operation.
* @param {String} operation The operation name.
* @param {Object} context Operation-specific context.
* @param {function(Error=)} callback The callback to call when all observers
* has finished.
*/
ObserverMixin.notifyObserversOf = function(operation, context, callback) {
var observers = this._observers && this._observers[operation];
if (!callback) callback = utils.createPromiseCallback();
this._notifyBaseObservers(operation, context, function doNotify(err) {
if (err) return callback(err, context);
if (!observers || !observers.length) return callback(null, context);
async.eachSeries(
observers,
function notifySingleObserver(fn, next) {
var retval = fn(context, next);
if (retval && typeof retval.then === 'function') {
retval.then(
function() { next(); },
next // error handler
);
}
},
function(err) { callback(err, context) }
);
});
return callback.promise;
}
ObserverMixin._notifyBaseObservers = function(operation, context, callback) {
if (this.base && this.base.notifyObserversOf)
this.base.notifyObserversOf(operation, context, callback);
else
callback();
}

File diff suppressed because it is too large Load Diff

View File

@ -21,10 +21,11 @@ function ScopeDefinition(definition) {
}
ScopeDefinition.prototype.targetModel = function(receiver) {
var modelTo;
if (typeof this.options.modelTo === 'function') {
var modelTo = this.options.modelTo.call(this, receiver) || this.modelTo;
modelTo = this.options.modelTo.call(this, receiver) || this.modelTo;
} else {
var modelTo = this.modelTo;
modelTo = this.modelTo;
}
if (!(modelTo.prototype instanceof DefaultModelBaseClass)) {
var msg = 'Invalid target model for scope `';
@ -36,16 +37,34 @@ ScopeDefinition.prototype.targetModel = function(receiver) {
return modelTo;
};
ScopeDefinition.prototype.related = function(receiver, scopeParams, condOrRefresh, cb) {
/*!
* Find related model instances
* @param {*} receiver The target model class/prototype
* @param {Object|Function} scopeParams
* @param {Boolean|Object} [condOrRefresh] true for refresh or object as a filter
* @param {Object} [options]
* @param {Function} cb
* @returns {*}
*/
ScopeDefinition.prototype.related = function(receiver, scopeParams, condOrRefresh, options, cb) {
var name = this.name;
var self = receiver;
var actualCond = {};
var actualRefresh = false;
var saveOnCache = true;
if (arguments.length === 3) {
if (typeof condOrRefresh === 'function' &&
options === undefined && cb === undefined) {
// related(receiver, scopeParams, cb)
cb = condOrRefresh;
} else if (arguments.length === 4) {
options = {};
condOrRefresh = undefined;
} else if (typeof options === 'function' && cb === undefined) {
cb = options;
options = {};
}
options = options || {};
if (condOrRefresh !== undefined) {
if (typeof condOrRefresh === 'boolean') {
actualRefresh = condOrRefresh;
} else {
@ -53,16 +72,15 @@ ScopeDefinition.prototype.related = function(receiver, scopeParams, condOrRefres
actualRefresh = true;
saveOnCache = false;
}
} else {
throw new Error('Method can be only called with one or two arguments');
}
cb = cb || utils.createPromiseCallback();
if (!self.__cachedRelations || self.__cachedRelations[name] === undefined
|| actualRefresh) {
// It either doesn't hit the cache or refresh is required
var params = mergeQuery(actualCond, scopeParams, {nestedInclude: true});
var targetModel = this.targetModel(receiver);
targetModel.find(params, function (err, data) {
targetModel.find(params, options, function (err, data) {
if (!err && saveOnCache) {
defineCachedRelations(self);
self.__cachedRelations[name] = data;
@ -73,6 +91,7 @@ ScopeDefinition.prototype.related = function(receiver, scopeParams, condOrRefres
// Return from cache
cb(null, self.__cachedRelations[name]);
}
return cb.promise;
}
/**
@ -149,7 +168,7 @@ function defineScope(cls, targetClass, name, params, methods, options) {
var targetModel = definition.targetModel(this);
var self = this;
var f = function(condOrRefresh, cb) {
var f = function(condOrRefresh, options, cb) {
if (arguments.length === 0) {
if (typeof f.value === 'function') {
return f.value(self);
@ -157,6 +176,18 @@ function defineScope(cls, targetClass, name, params, methods, options) {
return self.__cachedRelations[name];
}
} else {
if (typeof condOrRefresh === 'function'
&& options === undefined && cb === undefined) {
// customer.orders(cb)
cb = condOrRefresh;
options = {};
condOrRefresh = undefined;
} else if (typeof options === 'function' && cb === undefined) {
// customer.orders(condOrRefresh, cb);
cb = options;
options = {};
}
options = options || {}
// Check if there is a through model
// see https://github.com/strongloop/loopback/issues/1076
if (f._scope.collect &&
@ -169,11 +200,7 @@ function defineScope(cls, targetClass, name, params, methods, options) {
};
condOrRefresh = {};
}
if (arguments.length === 1) {
return definition.related(self, f._scope, condOrRefresh);
} else {
return definition.related(self, f._scope, condOrRefresh, cb);
}
return definition.related(self, f._scope, condOrRefresh, options, cb);
}
};
@ -186,23 +213,20 @@ function defineScope(cls, targetClass, name, params, methods, options) {
f._targetClass = i8n.camelize(f._scope.collect);
}
f.getAsync = function (cond, cb) {
if (cb === undefined) {
if (cond === undefined) {
// getAsync()
cb = utils.createPromiseCallback();
cond = true;
} else if (typeof cond !== 'function') {
// getAsync({where:{}})
cb = utils.createPromiseCallback();
} else {
// getAsync(function(){})
cb = cond;
cond = true;
}
f.getAsync = function(condOrRefresh, options, cb) {
if (typeof condOrRefresh === 'function'
&& options === undefined && cb === undefined) {
// customer.orders.getAsync(cb)
cb = condOrRefresh;
options = {};
condOrRefresh = {};
} else if (typeof options === 'function' && cb === undefined) {
// customer.orders.getAsync(condOrRefresh, cb);
cb = options;
options = {};
}
definition.related(self, f._scope, cond, cb);
return cb.promise;
options = options || {}
return definition.related(self, f._scope, condOrRefresh, options, cb);
}
f.build = build;
@ -298,13 +322,19 @@ function defineScope(cls, targetClass, name, params, methods, options) {
return new targetModel(data);
}
function create(data, cb) {
if (typeof data === 'function') {
function create(data, options, cb) {
if (typeof data === 'function' &&
options === undefined && cb === undefined) {
// create(cb)
cb = data;
data = {};
} else if (typeof options === 'function' && cb === undefined) {
// create(data, cb)
cb = options;
options = {};
}
cb = cb || utils.createPromiseCallback();
return this.build(data).save(cb);
options = options || {};
return this.build(data).save(options, cb);
}
/*
@ -313,53 +343,108 @@ function defineScope(cls, targetClass, name, params, methods, options) {
- For every destroy call which results in an error
- If fetching the Elements on which destroyAll is called results in an error
*/
function destroyAll(where, cb) {
if (typeof where === 'function') cb = where, where = {};
cb = cb || utils.createPromiseCallback();
function destroyAll(where, options, cb) {
if (typeof where === 'function') {
// destroyAll(cb)
cb = where;
where = {};
} else if (typeof options === 'function' && cb === undefined) {
// destroyAll(where, cb)
cb = options;
options = {};
}
options = options || {};
var targetModel = definition.targetModel(this._receiver);
var scoped = (this._scope && this._scope.where) || {};
var filter = mergeQuery({ where: scoped }, { where: where || {} });
return targetModel.destroyAll(filter.where, cb);
return targetModel.destroyAll(filter.where, options, cb);
}
function updateAll(where, data, cb) {
if (arguments.length === 2) {
// Handle updateAll(data, cb)
function updateAll(where, data, options, cb) {
if (typeof data === 'function' &&
options === undefined && cb === undefined) {
// updateAll(data, cb)
cb = data;
data = where;
where = {};
options = {};
} else if (typeof options === 'function' && cb === undefined) {
// updateAll(where, data, cb)
cb = options;
options = {};
}
options = options || {};
var targetModel = definition.targetModel(this._receiver);
var scoped = (this._scope && this._scope.where) || {};
var filter = mergeQuery({ where: scoped }, { where: where || {} });
targetModel.updateAll(filter.where, data, cb);
return targetModel.updateAll(filter.where, data, options, cb);
}
function findById(id, cb) {
function findById(id, filter, options, cb) {
if (options === undefined && cb === undefined) {
if (typeof filter === 'function') {
// findById(id, cb)
cb = filter;
filter = {};
}
} else if (cb === undefined) {
if (typeof options === 'function') {
// findById(id, query, cb)
cb = options;
options = {};
if (typeof filter === 'object' && !(filter.include || filter.fields)) {
// If filter doesn't have include or fields, assuming it's options
options = filter;
filter = {};
}
}
}
options = options || {};
filter = filter || {};
var targetModel = definition.targetModel(this._receiver);
var idName = targetModel.definition.idName();
var filter = { where: {} };
filter.where[idName] = id;
this.findOne(filter, cb);
var query = {where: {}};
query.where[idName] = id;
query = mergeQuery(query, filter);
return this.findOne(query, options, cb);
}
function findOne(filter, cb) {
if (typeof filter === 'function') cb = filter, filter = {};
function findOne(filter, options, cb) {
if (typeof filter === 'function') {
// findOne(cb)
cb = filter;
filter = {};
options = {};
} else if (typeof options === 'function' && cb === undefined) {
// findOne(filter, cb)
cb = options;
options = {};
}
options = options || {};
var targetModel = definition.targetModel(this._receiver);
var scoped = (this._scope && this._scope.where) || {};
var filter = mergeQuery({ where: scoped }, filter || {});
targetModel.findOne(filter, cb);
filter = mergeQuery({ where: scoped }, filter || {});
return targetModel.findOne(filter, options, cb);
}
function count(where, cb) {
if (typeof where === 'function') cb = where, where = {};
cb = cb || utils.createPromiseCallback();
function count(where, options, cb) {
if (typeof where === 'function') {
// count(cb)
cb = where;
where = {};
} else if (typeof options === 'function' && cb === undefined) {
// count(where, cb)
cb = options;
options = {};
}
options = options || {};
var targetModel = definition.targetModel(this._receiver);
var scoped = (this._scope && this._scope.where) || {};
var filter = mergeQuery({ where: scoped }, { where: where || {} });
return targetModel.count(filter.where, cb);
return targetModel.count(filter.where, options, cb);
}
return definition;

181
lib/transaction.js Normal file
View File

@ -0,0 +1,181 @@
var debug = require('debug')('loopback:connector:transaction');
var uuid = require('node-uuid');
var utils = require('./utils');
var jutil = require('./jutil');
var ObserverMixin = require('./observer');
var Transaction = require('loopback-connector').Transaction;
module.exports = TransactionMixin;
/**
* TransactionMixin class. Use to add transaction APIs to a model class.
*
* @class TransactionMixin
*/
function TransactionMixin() {
}
/**
* Begin a new transaction
* @param {Object|String} [options] Options can be one of the forms:
* - Object: {isolationLevel: '...', timeout: 1000}
* - String: isolationLevel
*
* Valid values of `isolationLevel` are:
*
* - Transaction.READ_COMMITTED = 'READ COMMITTED'; // default
* - Transaction.READ_UNCOMMITTED = 'READ UNCOMMITTED';
* - Transaction.SERIALIZABLE = 'SERIALIZABLE';
* - Transaction.REPEATABLE_READ = 'REPEATABLE READ';
*
* @param {Function} cb Callback function. It calls back with (err, transaction).
* To pass the transaction context to one of the CRUD methods, use the `options`
* argument with `transaction` property, for example,
*
* ```js
*
* MyModel.beginTransaction('READ COMMITTED', function(err, tx) {
* MyModel.create({x: 1, y: 'a'}, {transaction: tx}, function(err, inst) {
* MyModel.find({x: 1}, {transaction: tx}, function(err, results) {
* // ...
* tx.commit(function(err) {...});
* });
* });
* });
* ```
*
* The transaction can be committed or rolled back. If timeout happens, the
* transaction will be rolled back. Please note a transaction is typically
* associated with a pooled connection. Committing or rolling back a transaction
* will release the connection back to the pool.
*
* Once the transaction is committed or rolled back, the connection property
* will be set to null to mark the transaction to be inactive. Trying to commit
* or rollback an inactive transaction will receive an error from the callback.
*
* Please also note that the transaction is only honored with the same data
* source/connector instance. CRUD methods will not join the current transaction
* if its model is not attached the same data source.
*
*/
TransactionMixin.beginTransaction = function(options, cb) {
cb = cb || utils.createPromiseCallback();
if (Transaction) {
var connector = this.getConnector();
Transaction.begin(connector, options, function(err, transaction) {
if (err) return cb(err);
if (transaction) {
// Set an informational transaction id
transaction.id = uuid.v1();
}
if (options.timeout) {
setTimeout(function() {
var context = {
transaction: transaction,
operation: 'timeout'
};
transaction.notifyObserversOf('timeout', context, function(err) {
if (!err) {
transaction.rollback(function() {
debug('Transaction %s is rolled back due to timeout',
transaction.id);
});
}
});
}, options.timeout);
}
cb(err, transaction);
});
} else {
process.nextTick(function() {
var err = new Error('Transaction is not supported');
cb(err);
});
}
return cb.promise;
};
// Promisify the transaction apis
if (Transaction) {
jutil.mixin(Transaction.prototype, ObserverMixin);
/**
* Commit a transaction and release it back to the pool
* @param {Function} cb Callback function
* @returns {Promise|undefined}
*/
Transaction.prototype.commit = function(cb) {
var self = this;
cb = cb || utils.createPromiseCallback();
// Report an error if the transaction is not active
if (!self.connection) {
process.nextTick(function() {
cb(new Error('The transaction is not active: ' + self.id));
});
return cb.promise;
}
var context = {
transaction: self,
operation: 'commit'
};
self.notifyObserversOf('before commit', context, function(err) {
if (err) return cb(err);
self.connector.commit(self.connection, function(err) {
if (err) return cb(err);
self.notifyObserversOf('after commit', context, function(err) {
// Deference the connection to mark the transaction is not active
// The connection should have been released back the pool
self.connection = null;
cb(err);
});
});
});
return cb.promise;
};
/**
* Rollback a transaction and release it back to the pool
* @param {Function} cb Callback function
* @returns {Promise|undefined}
*/
Transaction.prototype.rollback = function(cb) {
var self = this;
cb = cb || utils.createPromiseCallback();
// Report an error if the transaction is not active
if (!self.connection) {
process.nextTick(function() {
cb(new Error('The transaction is not active: ' + self.id));
});
return cb.promise;
}
var context = {
transaction: self,
operation: 'rollback'
};
self.notifyObserversOf('before rollback', context, function(err) {
if (err) return cb(err);
self.connector.rollback(self.connection, function(err) {
if (err) return cb(err);
self.notifyObserversOf('after rollback', context, function(err) {
// Deference the connection to mark the transaction is not active
// The connection should have been released back the pool
self.connection = null;
cb(err);
});
});
});
return cb.promise;
};
Transaction.prototype.toJSON = function() {
return this.id;
};
Transaction.prototype.toString = function() {
return this.id;
};
}
TransactionMixin.Transaction = Transaction;

View File

@ -36,7 +36,7 @@
"debug": "^2.1.1",
"depd": "^1.0.0",
"inflection": "^1.6.0",
"loopback-connector": "1.x",
"loopback-connector": "^2.0.0",
"node-uuid": "^1.4.2",
"qs": "^2.3.3",
"traverse": "^0.6.6"