diff --git a/lib/mysql.js b/lib/mysql.js index 51c8125..75b1e16 100644 --- a/lib/mysql.js +++ b/lib/mysql.js @@ -26,7 +26,7 @@ exports.initialize = function initializeDataSource(dataSource, callback) { dataSource.EnumFactory = EnumFactory; // factory for Enums. Note that currently Enums can not be registered. - process.nextTick(function () { + process.nextTick(function() { callback && callback(); }); }; @@ -102,7 +102,7 @@ function MySQL(settings) { this.client = mysql.createPool(options); - this.client.on('error', function (err) { + this.client.on('error', function(err) { dataSource.emit('error', err); dataSource.connected = false; dataSource.connecting = false; @@ -121,7 +121,7 @@ require('util').inherits(MySQL, SqlConnector); * @param {String} sql The SQL statement * @param {Function} [callback] The callback after the SQL statement is executed */ -MySQL.prototype.executeSQL = function (sql, params, options, callback) { +MySQL.prototype.executeSQL = function(sql, params, options, callback) { var self = this; var client = this.client; var debugEnabled = debug.enabled; @@ -133,47 +133,51 @@ MySQL.prototype.executeSQL = function (sql, params, options, callback) { debug('SQL: %s, params: %j', sql, params); } - function releaseConnectionAndCallback(connection, err, result) { - connection.release(); + var transaction = options.transaction; + + function handleResponse(connection, err, result) { + if (!transaction) { + connection.release(); + } callback && callback(err, result); } - function runQuery(connection) { - connection.query(sql, params, function (err, data) { + function runQuery(connection, release) { + connection.query(sql, params, function(err, data) { if (debugEnabled) { if (err) { debug('Error: %j', err); } debug('Data: ', data); } - releaseConnectionAndCallback(connection, err, data); + handleResponse(connection, err, data); }); } - client.getConnection(function (err, connection) { + function executeWithConnection(err, connection) { if (err) { return callback && callback(err); } if (self.settings.createDatabase) { // Call USE db ... - connection.query('USE ??', [db], function (err) { + connection.query('USE ??', [db], function(err) { if (err) { if (err && err.message.match(/(^|: )unknown database/i)) { var charset = self.settings.charset; var collation = self.settings.collation; var q = 'CREATE DATABASE ?? CHARACTER SET ?? COLLATE ??'; - connection.query(q, [db, charset, collation], function (err) { + connection.query(q, [db, charset, collation], function(err) { if (!err) { - connection.query('USE ??', [db], function (err) { + connection.query('USE ??', [db], function(err) { runQuery(connection); }); } else { - releaseConnectionAndCallback(connection, err); + handleResponse(connection, err); } }); return; } else { - releaseConnectionAndCallback(connection, err); + handleResponse(connection, err); return; } } @@ -183,7 +187,17 @@ MySQL.prototype.executeSQL = function (sql, params, options, callback) { // Bypass USE db runQuery(connection); } - }); + } + + if (transaction && transaction.connection && + transaction.connector === this) { + if (debugEnabled) { + debug('Execute SQL within a transaction'); + } + executeWithConnection(null, transaction.connection); + } else { + client.getConnection(executeWithConnection); + } }; /** @@ -373,7 +387,7 @@ MySQL.prototype.fromColumnValue = function(prop, val) { * @param {string} name A database identifier * @returns {string} The escaped database identifier */ -MySQL.prototype.escapeName = function (name) { +MySQL.prototype.escapeName = function(name) { return this.client.escapeId(name); }; @@ -384,7 +398,7 @@ MySQL.prototype.escapeName = function (name) { * @param {number} offset The offset * @returns {string} The LIMIT clause */ -MySQL.prototype._buildLimit = function (model, limit, offset) { +MySQL.prototype._buildLimit = function(model, limit, offset) { if (isNaN(limit)) { limit = 0; } @@ -408,7 +422,7 @@ MySQL.prototype.applyPagination = function(model, stmt, filter) { * @param {String} key Optional key, such as 1 or id * @returns {String} The place holder */ -MySQL.prototype.getPlaceholderForIdentifier = function (key) { +MySQL.prototype.getPlaceholderForIdentifier = function(key) { return '??'; }; @@ -417,7 +431,7 @@ MySQL.prototype.getPlaceholderForIdentifier = function (key) { * @param {String} key Optional key, such as 1 or id * @returns {String} The place holder */ -MySQL.prototype.getPlaceholderForValue = function (key) { +MySQL.prototype.getPlaceholderForValue = function(key) { return '?'; }; @@ -427,11 +441,10 @@ MySQL.prototype.getCountForAffectedRows = function(model, info) { return affectedRows; }; - /** * Disconnect from MySQL */ -MySQL.prototype.disconnect = function (cb) { +MySQL.prototype.disconnect = function(cb) { if (this.debug) { debug('disconnect'); } @@ -448,4 +461,5 @@ MySQL.prototype.ping = function(cb) { require('./migration')(MySQL, mysql); require('./discovery')(MySQL, mysql); +require('./transaction')(MySQL, mysql); diff --git a/lib/transaction.js b/lib/transaction.js new file mode 100644 index 0000000..f1fbe5e --- /dev/null +++ b/lib/transaction.js @@ -0,0 +1,63 @@ +var debug = require('debug')('loopback:connector:mysql:transaction'); +module.exports = mixinTransaction; + +/*! + * @param {MySQL} MySQL connector class + * @param {Object} mysql mysql driver + */ +function mixinTransaction(MySQL, mysql) { + + /** + * Begin a new transaction + * @param isolationLevel + * @param cb + */ + MySQL.prototype.beginTransaction = function(isolationLevel, cb) { + debug('Begin a transaction with isolation level: %s', isolationLevel); + this.client.getConnection(function(err, connection) { + if(err) return cb(err); + if(isolationLevel) { + connection.query( + 'SET SESSION TRANSACTION ISOLATION LEVEL ' + isolationLevel, + function(err) { + if (err) return cb(err); + connection.beginTransaction(function(err) { + if (err) return cb(err); + return cb(null, connection); + }); + }); + } else { + connection.beginTransaction(function(err) { + if (err) return cb(err); + return cb(null, connection); + }); + } + }); + }; + + /** + * + * @param connection + * @param cb + */ + MySQL.prototype.commit = function(connection, cb) { + debug('Commit a transaction'); + connection.commit(function(err) { + connection.release(); + cb(err); + }); + }; + + /** + * + * @param connection + * @param cb + */ + MySQL.prototype.rollback = function(connection, cb) { + debug('Rollback a transaction'); + connection.rollback(function(err) { + connection.release(); + cb(err); + }); + }; +} \ No newline at end of file diff --git a/test/transaction.promise.test.js b/test/transaction.promise.test.js new file mode 100644 index 0000000..5a92fe5 --- /dev/null +++ b/test/transaction.promise.test.js @@ -0,0 +1,180 @@ +if (typeof Promise === 'undefined') { + global.Promise = require('bluebird'); +} +var Transaction = require('loopback-datasource-juggler').Transaction; +require('./init.js'); +require('should'); + +var db, Post, Review; + +describe('transactions', function() { + + before(function(done) { + db = getDataSource({collation: 'utf8_general_ci', createDatabase: true}); + db.once('connected', function() { + Post = db.define('PostTX', { + title: {type: String, length: 255, index: true}, + content: {type: String} + }, {mysql: {engine: 'INNODB'}}); + Review = db.define('ReviewTX', { + author: String, + content: {type: String} + }, {mysql: {engine: 'INNODB'}}); + Post.hasMany(Review, {as: 'reviews', foreignKey: 'postId'}); + db.automigrate(['PostTX', 'ReviewTX'], done); + }); + }); + + var currentTx; + var hooks = []; + // Return an async function to start a transaction and create a post + function createPostInTx(post, timeout) { + return function(done) { + // Transaction.begin(db.connector, Transaction.READ_COMMITTED, + var promise = Post.beginTransaction({ + isolationLevel: Transaction.READ_COMMITTED, + timeout: timeout + }); + promise.then(function(tx) { + (typeof tx.id).should.be.eql('string'); + currentTx = tx; + hooks = []; + tx.observe('before commit', function(context, next) { + hooks.push('before commit'); + next(); + }); + tx.observe('after commit', function(context, next) { + hooks.push('after commit'); + next(); + }); + tx.observe('before rollback', function(context, next) { + hooks.push('before rollback'); + next(); + }); + tx.observe('after rollback', function(context, next) { + hooks.push('after rollback'); + next(); + }); + }).then(function() { + Post.create(post, {transaction: currentTx}).then( + function(p) { + p.reviews.create({ + author: 'John', + content: 'Review for ' + p.title + }, {transaction: currentTx}).then( + function(c) { + done(null, c); + }); + }); + }).catch(done); + }; + } + +// Return an async function to find matching posts and assert number of +// records to equal to the count + function expectToFindPosts(where, count, inTx) { + return function(done) { + var options = {}; + if (inTx) { + options.transaction = currentTx; + } + Post.find({where: where}, options).then( + function(posts) { + posts.length.should.be.eql(count); + if (count) { + // Find related reviews + // Please note the empty {} is required, otherwise, the options + // will be treated as a filter + posts[0].reviews({}, options).then(function(reviews) { + reviews.length.should.be.eql(count); + done(); + }); + } else { + done(); + } + }).catch(done); + }; + } + + describe('commit', function() { + + var post = {title: 't1', content: 'c1'}; + before(createPostInTx(post)); + + it('should not see the uncommitted insert', expectToFindPosts(post, 0)); + + it('should see the uncommitted insert from the same transaction', + expectToFindPosts(post, 1, true)); + + it('should commit a transaction', function(done) { + currentTx.commit().then(function() { + hooks.should.be.eql(['before commit', 'after commit']); + done(); + }).catch(done); + }); + + it('should see the committed insert', expectToFindPosts(post, 1)); + + it('should report error if the transaction is not active', function(done) { + currentTx.commit().catch(function(err) { + (err).should.be.instanceof(Error); + done(); + }); + }); + }); + + describe('rollback', function() { + + var post = {title: 't2', content: 'c2'}; + before(createPostInTx(post)); + + it('should not see the uncommitted insert', expectToFindPosts(post, 0)); + + it('should see the uncommitted insert from the same transaction', + expectToFindPosts(post, 1, true)); + + it('should rollback a transaction', function(done) { + currentTx.rollback().then(function() { + hooks.should.be.eql(['before rollback', 'after rollback']); + done(); + }).catch(done); + }); + + it('should not see the rolledback insert', expectToFindPosts(post, 0)); + + it('should report error if the transaction is not active', function(done) { + currentTx.rollback().catch(function(err) { + (err).should.be.instanceof(Error); + done(); + }); + }); + }); + + describe('timeout', function() { + + var post = {title: 't3', content: 'c3'}; + before(createPostInTx(post, 500)); + + it('should report timeout', function(done) { + setTimeout(function() { + Post.find({where: {title: 't3'}}, {transaction: currentTx}, + function(err, posts) { + if (err) return done(err); + posts.length.should.be.eql(1); + done(); + }); + }, 1000); + done(); + }); + + it('should invoke the timeout hook', function(done) { + currentTx.observe('timeout', function(context, next) { + next(); + done(); + }); + }); + + }); +}) +; + diff --git a/test/transaction.test.js b/test/transaction.test.js new file mode 100644 index 0000000..f8996b8 --- /dev/null +++ b/test/transaction.test.js @@ -0,0 +1,182 @@ +var Transaction = require('loopback-datasource-juggler').Transaction; +require('./init.js'); +require('should'); + +var db, Post, Review; + +describe('transactions', function() { + + before(function(done) { + db = getDataSource({collation: 'utf8_general_ci', createDatabase: true}); + db.once('connected', function() { + Post = db.define('PostTX', { + title: {type: String, length: 255, index: true}, + content: {type: String} + }, {mysql: {engine: 'INNODB'}}); + Review = db.define('ReviewTX', { + author: String, + content: {type: String} + }, {mysql: {engine: 'INNODB'}}); + Post.hasMany(Review, {as: 'reviews', foreignKey: 'postId'}); + db.automigrate(['PostTX', 'ReviewTX'], done); + }); + }); + + var currentTx; + var hooks = []; + // Return an async function to start a transaction and create a post + function createPostInTx(post, timeout) { + return function(done) { + // Transaction.begin(db.connector, Transaction.READ_COMMITTED, + Post.beginTransaction({ + isolationLevel: Transaction.READ_COMMITTED, + timeout: timeout + }, + function(err, tx) { + if (err) return done(err); + (typeof tx.id).should.be.eql('string'); + hooks = []; + tx.observe('before commit', function(context, next) { + hooks.push('before commit'); + next(); + }); + tx.observe('after commit', function(context, next) { + hooks.push('after commit'); + next(); + }); + tx.observe('before rollback', function(context, next) { + hooks.push('before rollback'); + next(); + }); + tx.observe('after rollback', function(context, next) { + hooks.push('after rollback'); + next(); + }); + currentTx = tx; + Post.create(post, {transaction: tx}, + function(err, p) { + if (err) { + done(err); + } else { + p.reviews.create({ + author: 'John', + content: 'Review for ' + p.title + }, {transaction: tx}, + function(err, c) { + done(err); + }); + } + }); + }); + }; + } + + // Return an async function to find matching posts and assert number of + // records to equal to the count + function expectToFindPosts(where, count, inTx) { + return function(done) { + var options = {}; + if (inTx) { + options.transaction = currentTx; + } + Post.find({where: where}, options, + function(err, posts) { + if (err) return done(err); + posts.length.should.be.eql(count); + if (count) { + // Find related reviews + // Please note the empty {} is required, otherwise, the options + // will be treated as a filter + posts[0].reviews({}, options, function(err, reviews) { + if (err) return done(err); + reviews.length.should.be.eql(count); + done(); + }); + } else { + done(); + } + }); + }; + } + + describe('commit', function() { + + var post = {title: 't1', content: 'c1'}; + before(createPostInTx(post)); + + it('should not see the uncommitted insert', expectToFindPosts(post, 0)); + + it('should see the uncommitted insert from the same transaction', + expectToFindPosts(post, 1, true)); + + it('should commit a transaction', function(done) { + currentTx.commit(function(err) { + hooks.should.be.eql(['before commit', 'after commit']); + done(err); + }); + }); + + it('should see the committed insert', expectToFindPosts(post, 1)); + + it('should report error if the transaction is not active', function(done) { + currentTx.commit(function(err) { + (err).should.be.instanceof(Error); + done(); + }); + }); + }); + + describe('rollback', function() { + + var post = {title: 't2', content: 'c2'}; + before(createPostInTx(post)); + + it('should not see the uncommitted insert', expectToFindPosts(post, 0)); + + it('should see the uncommitted insert from the same transaction', + expectToFindPosts(post, 1, true)); + + it('should rollback a transaction', function(done) { + currentTx.rollback(function(err) { + hooks.should.be.eql(['before rollback', 'after rollback']); + done(err); + }); + }); + + it('should not see the rolledback insert', expectToFindPosts(post, 0)); + + it('should report error if the transaction is not active', function(done) { + currentTx.rollback(function(err) { + (err).should.be.instanceof(Error); + done(); + }); + }); + }); + + describe('timeout', function() { + + var post = {title: 't3', content: 'c3'}; + before(createPostInTx(post, 500)); + + it('should report timeout', function(done) { + setTimeout(function() { + Post.find({where: {title: 't3'}}, {transaction: currentTx}, + function(err, posts) { + if (err) return done(err); + posts.length.should.be.eql(1); + done(); + }); + }, 1000); + done(); + }); + + it('should invoke the timeout hook', function(done) { + currentTx.observe('timeout', function(context, next) { + next(); + done(); + }); + }); + + }); +}); +