From 8815e9c9613e510f61641b5c29ba6edd8d2a798f Mon Sep 17 00:00:00 2001 From: Raymond Feng Date: Fri, 15 May 2015 10:27:08 -0700 Subject: [PATCH] Add transaction support --- index.js | 2 + lib/connector.js | 6 +- lib/parameterized-sql.js | 1 + lib/sql.js | 9 +- lib/transaction.js | 87 +++++++++++++ test/automigrate.test.js | 1 - test/connectors/test-sql-connector.js | 126 ++++++++++++++++++- test/sql.test.js | 1 - test/transaction.test.js | 175 ++++++++++++++++++++++++++ 9 files changed, 399 insertions(+), 9 deletions(-) create mode 100644 lib/transaction.js create mode 100644 test/transaction.test.js diff --git a/index.js b/index.js index 92e0c3a..8cacef7 100644 --- a/index.js +++ b/index.js @@ -2,3 +2,5 @@ exports.Connector = require('./lib/connector'); // Set up SqlConnector as an alias to SQLConnector exports.SQLConnector = exports.SqlConnector = require('./lib/sql'); exports.ParameterizedSQL = exports.SQLConnector.ParameterizedSQL; +exports.Transaction = require('./lib/transaction'); + diff --git a/lib/connector.js b/lib/connector.js index 99e5d97..8b7106a 100644 --- a/lib/connector.js +++ b/lib/connector.js @@ -175,9 +175,9 @@ Connector.prototype.define = function(modelDefinition) { * @param {Object} propertyDefinition The object for property definition */ Connector.prototype.defineProperty = function(model, propertyName, propertyDefinition) { - var modelDef = this.getModelDefinition(model); - modelDef.properties[propertyName] = propertyDefinition; - }; + var modelDef = this.getModelDefinition(model); + modelDef.properties[propertyName] = propertyDefinition; +}; /** * Disconnect from the connector diff --git a/lib/parameterized-sql.js b/lib/parameterized-sql.js index 15d5bf0..57788f2 100644 --- a/lib/parameterized-sql.js +++ b/lib/parameterized-sql.js @@ -98,3 +98,4 @@ ParameterizedSQL.join = function(sqls, separator) { }; ParameterizedSQL.PLACEHOLDER = PLACEHOLDER; + diff --git a/lib/sql.js b/lib/sql.js index 8d26262..db4ddeb 100644 --- a/lib/sql.js +++ b/lib/sql.js @@ -4,6 +4,7 @@ var assert = require('assert'); var Connector = require('./connector'); var debug = require('debug')('loopback:connector:sql'); var ParameterizedSQL = require('./parameterized-sql'); +var Transaction = require('./transaction'); module.exports = SQLConnector; @@ -25,6 +26,8 @@ SQLConnector.ParameterizedSQL = ParameterizedSQL; // The generic placeholder var PLACEHOLDER = SQLConnector.PLACEHOLDER = ParameterizedSQL.PLACEHOLDER; +SQLConnector.Transaction = Transaction; + /** * Set the relational property to indicate the backend is a relational DB * @type {boolean} @@ -55,7 +58,6 @@ SQLConnector.prototype.getTypes = function() { * Returns {Function} */ SQLConnector.prototype.getDefaultIdType = function(prop) { - /*jshint unused:false */ return Number; }; @@ -467,7 +469,6 @@ SQLConnector.prototype.exists = function(model, id, options, cb) { ' WHERE ' + this.idColumnEscaped(model) ); selectStmt.merge(this.buildWhere(model, where)); - selectStmt = this.applyPagination(model, selectStmt, { limit: 1, offset: 0, @@ -506,6 +507,7 @@ SQLConnector.prototype.destroy = function(model, id, options, cb) { where[idName] = id; this.destroyAll(model, where, options, cb); }; + // Alias to `destroy`. Juggler checks `destroy` only. Connector.defineAliases(SQLConnector.prototype, 'destroy', ['delete', 'deleteById', 'destroyById']); @@ -542,6 +544,7 @@ SQLConnector.prototype.destroyAll = function(model, where, options, cb) { } }); }; + // Alias to `destroyAll`. Juggler checks `destroyAll` only. Connector.defineAliases(SQLConnector.prototype, 'destroyAll', ['deleteAll']); @@ -606,6 +609,7 @@ SQLConnector.prototype.update = function(model, where, data, options, cb) { } }); }; + // Alias to `update`. Juggler checks `update` only. Connector.defineAliases(SQLConnector.prototype, 'update', ['updateAll']); @@ -1041,6 +1045,7 @@ SQLConnector.prototype.all = function find(model, filter, options, cb) { } }); }; + // Alias to `all`. Juggler checks `all` only. Connector.defineAliases(SQLConnector.prototype, 'all', ['findAll']); diff --git a/lib/transaction.js b/lib/transaction.js new file mode 100644 index 0000000..e3a3b1f --- /dev/null +++ b/lib/transaction.js @@ -0,0 +1,87 @@ +var assert = require('assert'); +var util = require('util'); +var EventEmitter = require('events').EventEmitter; +var debug = require('debug')('loopback:connector:transaction'); + +module.exports = Transaction; + +/** + * Create a new Transaction object + * @param {Connector} connector The connector instance + * @param {*} connection A connection to the DB + * @constructor + */ +function Transaction(connector, connection) { + this.connector = connector; + this.connection = connection; + EventEmitter.call(this); +} + +util.inherits(Transaction, EventEmitter); + +// Isolation levels +Transaction.SERIALIZABLE = 'SERIALIZABLE'; +Transaction.REPEATABLE_READ = 'REPEATABLE READ'; +Transaction.READ_COMMITTED = 'READ COMMITTED'; +Transaction.READ_UNCOMMITTED = 'READ UNCOMMITTED'; + +Transaction.hookTypes = { + BEFORE_COMMIT: 'before commit', + AFTER_COMMIT: 'after commit', + BEFORE_ROLLBACK: 'before rollback', + AFTER_ROLLBACK: 'after rollback', + TIMEOUT: 'timeout' +}; + +/** + * Commit a transaction and release it back to the pool + * @param cb + * @returns {*} + */ +Transaction.prototype.commit = function(cb) { + return this.connector.commit(this.connection, cb); +}; + +/** + * Rollback a transaction and release it back to the pool + * @param cb + * @returns {*|boolean} + */ +Transaction.prototype.rollback = function(cb) { + return this.connector.rollback(this.connection, cb); +}; + +/** + * Begin a new transaction + * @param {Connector} connector The connector instance + * @param {Object} [options] Options {isolationLevel: '...', timeout: 1000} + * @param cb + */ +Transaction.begin = function(connector, options, cb) { + if (typeof isolationLevel === 'function' && cb === undefined) { + cb = options; + options = {}; + } + if (typeof options === 'string') { + options = {isolationLevel: options}; + } + var isolationLevel = options.isolationLevel || Transaction.READ_COMMITTED; + assert(isolationLevel === Transaction.SERIALIZABLE || + isolationLevel === Transaction.REPEATABLE_READ || + isolationLevel === Transaction.READ_COMMITTED || + isolationLevel === Transaction.READ_UNCOMMITTED, 'Invalid isolationLevel'); + + debug('Starting a transaction with options: %j', options); + assert(typeof connector.beginTransaction === 'function', + 'beginTransaction must be function implemented by the connector'); + connector.beginTransaction(isolationLevel, function(err, connection) { + if (err) { + return cb(err); + } + var tx = connection; + if (!(connection instanceof Transaction)) { + tx = new Transaction(connector, connection); + } + cb(err, tx); + }); +}; diff --git a/test/automigrate.test.js b/test/automigrate.test.js index 1f9808f..0669b45 100644 --- a/test/automigrate.test.js +++ b/test/automigrate.test.js @@ -45,4 +45,3 @@ describe('sql connector', function() { }); }); - diff --git a/test/connectors/test-sql-connector.js b/test/connectors/test-sql-connector.js index b3a5890..390bb37 100644 --- a/test/connectors/test-sql-connector.js +++ b/test/connectors/test-sql-connector.js @@ -3,6 +3,29 @@ */ var util = require('util'); var SQLConnector = require('../../lib/sql'); +var debug = require('debug')('loopback:connector:test-sql'); + +var transactionId = 0; + +function MockTransaction(connector, name) { + this.connector = connector; + this.name = name; + this.data = []; +} + +MockTransaction.prototype.commit = function(cb) { + var self = this; + this.data.forEach(function(d) { + self.connector.collection.data.push(d); + }); + this.data = []; + cb(); +}; + +MockTransaction.prototype.rollback = function(cb) { + this.data = []; + cb(); +}; exports.initialize = function initializeDataSource(dataSource, callback) { process.nextTick(function() { @@ -18,6 +41,9 @@ exports.initialize = function initializeDataSource(dataSource, callback) { function TestConnector(settings) { SQLConnector.call(this, 'testdb', settings); this._tables = {}; + this.collection = { + data: [] + }; } util.inherits(TestConnector, SQLConnector); @@ -67,6 +93,59 @@ TestConnector.prototype._buildLimit = function(model, limit, offset) { return 'LIMIT ' + (offset ? (offset + ',' + limit) : limit); }; +TestConnector.prototype.applyPagination = + function(model, stmt, filter) { + /*jshint unused:false */ + var limitClause = this._buildLimit(model, filter.limit, + filter.offset || filter.skip); + return stmt.merge(limitClause); + }; + +TestConnector.prototype.escapeName = function(name) { + return '`' + name + '`'; +}; + +TestConnector.prototype.dbName = function(name) { + return name.toUpperCase(); +}; + +TestConnector.prototype.getPlaceholderForValue = function(key) { + return '$' + key; +}; + +TestConnector.prototype.escapeValue = function(value) { + if (typeof value === 'number' || typeof value === 'boolean') { + return value; + } + if (typeof value === 'string') { + return "'" + value + "'"; + } + if (value == null) { + return 'NULL'; + } + if (typeof value === 'object') { + return String(value); + } + return value; +}; + +TestConnector.prototype.toColumnValue = function(prop, val, escaping) { + return escaping ? this.escapeValue(val) : val; +}; + +TestConnector.prototype._buildLimit = function(model, limit, offset) { + if (isNaN(limit)) { + limit = 0; + } + if (isNaN(offset)) { + offset = 0; + } + if (!limit && !offset) { + return ''; + } + return 'LIMIT ' + (offset ? (offset + ',' + limit) : limit); +}; + TestConnector.prototype.applyPagination = function(model, stmt, filter) { /*jshint unused:false */ @@ -101,6 +180,49 @@ TestConnector.prototype.createTable = function(model, cb) { }); }; -TestConnector.prototype.executeSQL = function(sql, params, options, callback) { - callback(null, []); +TestConnector.prototype.getInsertedId = function(model, info) { + return info; +}; + +TestConnector.prototype.fromColumnValue = function(propertyDef, value) { + return value; +}; + +TestConnector.prototype.beginTransaction = function(isolationLevel, cb) { + var name = 'tx_' + transactionId++; + cb(null, new MockTransaction(this, name)); +}; + +TestConnector.prototype.commit = function(tx, cb) { + tx.commit(cb); +}; + +TestConnector.prototype.rollback = function(tx, cb) { + tx.rollback(cb); +}; + +TestConnector.prototype.executeSQL = function(sql, params, options, callback) { + var transaction = options.transaction; + if (transaction && transaction.connector === this && transaction.connection) { + if (sql.indexOf('INSERT') === 0) { + transaction.connection.data.push({title: 't1', content: 'c1'}); + debug('INSERT', transaction.connection.data, sql, + transaction.connection.name); + callback(null, 1); + } + else { + debug('SELECT', transaction.connection.data, sql, + transaction.connection.name); + callback(null, transaction.connection.data); + } + } else { + if (sql.indexOf('INSERT') === 0) { + this.collection.data.push({title: 't1', content: 'c1'}); + debug('INSERT', this.collection.data, sql); + callback(null, 1); + } else { + debug('SELECT', this.collection.data, sql); + callback(null, this.collection.data); + } + } }; diff --git a/test/sql.test.js b/test/sql.test.js index 0b62ddd..1a5843c 100644 --- a/test/sql.test.js +++ b/test/sql.test.js @@ -307,4 +307,3 @@ describe('sql connector', function() { }).to.throw('callback must be a function'); }); }); - diff --git a/test/transaction.test.js b/test/transaction.test.js new file mode 100644 index 0000000..6ca84de --- /dev/null +++ b/test/transaction.test.js @@ -0,0 +1,175 @@ +var Transaction = require('../index').Transaction; + +var expect = require('chai').expect; +var testConnector = require('./connectors/test-sql-connector'); + +var juggler = require('loopback-datasource-juggler'); + +var db, Post; + +describe('transactions', function() { + + before(function(done) { + db = new juggler.DataSource({ + connector: testConnector, + debug: true + }); + db.once('connected', function() { + Post = db.define('PostTX', { + title: {type: String, length: 255, index: true}, + content: {type: String} + }); + done(); + }); + }); + + var currentTx; + var hooks = []; + // Return an async function to start a transaction and create a post + function createPostInTx(post, timeout) { + return function(done) { + // Transaction.begin(db.connector, Transaction.READ_COMMITTED, + Post.beginTransaction({ + isolationLevel: Transaction.READ_COMMITTED, + timeout: timeout + }, + function(err, tx) { + if (err) return done(err); + expect(typeof tx.id).to.eql('string'); + hooks = []; + tx.observe('before commit', function(context, next) { + hooks.push('before commit'); + next(); + }); + tx.observe('after commit', function(context, next) { + hooks.push('after commit'); + next(); + }); + tx.observe('before rollback', function(context, next) { + hooks.push('before rollback'); + next(); + }); + tx.observe('after rollback', function(context, next) { + hooks.push('after rollback'); + next(); + }); + currentTx = tx; + Post.create(post, {transaction: tx}, + function(err, p) { + if (err) { + done(err); + } else { + done(); + } + }); + }); + }; + } + + // Return an async function to find matching posts and assert number of + // records to equal to the count + function expectToFindPosts(where, count, inTx) { + return function(done) { + var options = {}; + if (inTx) { + options.transaction = currentTx; + } + Post.find({where: where}, options, + function(err, posts) { + if (err) return done(err); + expect(posts.length).to.be.eql(count); + done(); + }); + }; + } + + describe('commit', function() { + + var post = {title: 't1', content: 'c1'}; + before(createPostInTx(post)); + + it('should not see the uncommitted insert', expectToFindPosts(post, 0)); + + it('should see the uncommitted insert from the same transaction', + expectToFindPosts(post, 1, true)); + + it('should commit a transaction', function(done) { + currentTx.commit(function(err) { + expect(hooks).to.eql(['before commit', 'after commit']); + done(err); + }); + }); + + it('should see the committed insert', expectToFindPosts(post, 1)); + + it('should report error if the transaction is not active', function(done) { + currentTx.commit(function(err) { + expect(err).to.be.instanceof(Error); + done(); + }); + }); + }); + + describe('rollback', function() { + + before(function() { + // Reset the collection + db.connector.collection.data = []; + }); + + var post = {title: 't2', content: 'c2'}; + before(createPostInTx(post)); + + it('should not see the uncommitted insert', expectToFindPosts(post, 0)); + + it('should see the uncommitted insert from the same transaction', + expectToFindPosts(post, 1, true)); + + it('should rollback a transaction', function(done) { + currentTx.rollback(function(err) { + expect(hooks).to.eql(['before rollback', 'after rollback']); + done(err); + }); + }); + + it('should not see the rolledback insert', expectToFindPosts(post, 0)); + + it('should report error if the transaction is not active', function(done) { + currentTx.rollback(function(err) { + expect(err).to.be.instanceof(Error); + done(); + }); + }); + }); + + describe('timeout', function() { + + before(function() { + // Reset the collection + db.connector.collection.data = []; + }); + + var post = {title: 't3', content: 'c3'}; + before(createPostInTx(post, 50)); + + it('should report timeout', function(done) { + setTimeout(function() { + Post.find({where: {title: 't3'}}, {transaction: currentTx}, + function(err, posts) { + if (err) return done(err); + expect(posts.length).to.be.eql(1); + done(); + }); + }, 100); + done(); + }); + + it('should invoke the timeout hook', function(done) { + currentTx.observe('timeout', function(context, next) { + next(); + done(); + }); + }); + + }); +});