From 9177e07209c9753b917d2a0c7a02daa51196fcee Mon Sep 17 00:00:00 2001 From: Raymond Feng Date: Fri, 15 May 2015 10:26:49 -0700 Subject: [PATCH] Add transaction apis --- lib/transaction.js | 119 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 118 insertions(+), 1 deletion(-) diff --git a/lib/transaction.js b/lib/transaction.js index 7d1260de..8999851f 100644 --- a/lib/transaction.js +++ b/lib/transaction.js @@ -1,3 +1,9 @@ +var debug = require('debug')('loopback:connector:transaction'); +var uuid = require('node-uuid'); +var utils = require('./utils'); +var jutil = require('./jutil'); +var ObserverMixin = require('./observer'); + var Transaction = require('loopback-connector').Transaction; module.exports = TransactionMixin; @@ -39,17 +45,128 @@ function TransactionMixin() { * associated with a pooled connection. Committing or rolling back a transaction * will release the connection back to the pool. * + * Once the transaction is committed or rolled back, the connection property + * will be set to null to mark the transaction to be inactive. Trying to commit + * or rollback an inactive transaction will receive an error from the callback. + * + * Please also note that the transaction is only honored with the same data + * source/connector instance. CRUD methods will not join the current transaction + * if its model is not attached the same data source. + * */ TransactionMixin.beginTransaction = function(options, cb) { + cb = cb || utils.createPromiseCallback(); if (Transaction) { var connector = this.getConnector(); - Transaction.begin(connector, options, cb); + Transaction.begin(connector, options, function(err, transaction) { + if (err) return cb(err); + if (transaction) { + // Set an informational transaction id + transaction.id = uuid.v1(); + } + if (options.timeout) { + setTimeout(function() { + var context = { + transaction: transaction, + operation: 'timeout' + }; + transaction.notifyObserversOf('timeout', context, function(err) { + if (!err) { + transaction.rollback(function() { + debug('Transaction %s is rolled back due to timeout', + transaction.id); + }); + } + }); + }, options.timeout); + } + cb(err, transaction); + }); } else { process.nextTick(function() { var err = new Error('Transaction is not supported'); cb(err); }); } + return cb.promise; }; +// Promisify the transaction apis +if (Transaction) { + jutil.mixin(Transaction.prototype, ObserverMixin); + /** + * Commit a transaction and release it back to the pool + * @param {Function} cb Callback function + * @returns {Promise|undefined} + */ + Transaction.prototype.commit = function(cb) { + var self = this; + cb = cb || utils.createPromiseCallback(); + // Report an error if the transaction is not active + if (!self.connection) { + return process.nextTick(function() { + cb(new Error('The transaction is not active: ' + self.id)); + }); + } + var context = { + transaction: self, + operation: 'commit' + }; + self.notifyObserversOf('before commit', context, function(err) { + if (err) return cb(err); + self.connector.commit(self.connection, function(err) { + if (err) return cb(err); + self.notifyObserversOf('after commit', context, function(err) { + // Deference the connection to mark the transaction is not active + // The connection should have been released back the pool + self.connection = null; + cb(err); + }); + }); + }); + return cb.promise; + }; + + /** + * Rollback a transaction and release it back to the pool + * @param {Function} cb Callback function + * @returns {Promise|undefined} + */ + Transaction.prototype.rollback = function(cb) { + var self = this; + cb = cb || utils.createPromiseCallback(); + // Report an error if the transaction is not active + if (!self.connection) { + return process.nextTick(function() { + cb(new Error('The transaction is not active: ' + self.id)); + }); + } + var context = { + transaction: self, + operation: 'rollback' + }; + self.notifyObserversOf('before rollback', context, function(err) { + if (err) return cb(err); + self.connector.rollback(self.connection, function(err) { + if (err) return cb(err); + self.notifyObserversOf('after rollback', context, function(err) { + // Deference the connection to mark the transaction is not active + // The connection should have been released back the pool + self.connection = null; + cb(err); + }); + }); + }); + return cb.promise; + }; + + Transaction.prototype.toJSON = function() { + return this.id; + }; + + Transaction.prototype.toString = function() { + return this.id; + }; +} +