Add transaction support

This commit is contained in:
Raymond Feng 2015-05-15 10:27:08 -07:00
parent 772132497d
commit 8815e9c961
9 changed files with 399 additions and 9 deletions

View File

@ -2,3 +2,5 @@ exports.Connector = require('./lib/connector');
// Set up SqlConnector as an alias to SQLConnector
exports.SQLConnector = exports.SqlConnector = require('./lib/sql');
exports.ParameterizedSQL = exports.SQLConnector.ParameterizedSQL;
exports.Transaction = require('./lib/transaction');

View File

@ -175,9 +175,9 @@ Connector.prototype.define = function(modelDefinition) {
* @param {Object} propertyDefinition The object for property definition
*/
Connector.prototype.defineProperty = function(model, propertyName, propertyDefinition) {
var modelDef = this.getModelDefinition(model);
modelDef.properties[propertyName] = propertyDefinition;
};
var modelDef = this.getModelDefinition(model);
modelDef.properties[propertyName] = propertyDefinition;
};
/**
* Disconnect from the connector

View File

@ -98,3 +98,4 @@ ParameterizedSQL.join = function(sqls, separator) {
};
ParameterizedSQL.PLACEHOLDER = PLACEHOLDER;

View File

@ -4,6 +4,7 @@ var assert = require('assert');
var Connector = require('./connector');
var debug = require('debug')('loopback:connector:sql');
var ParameterizedSQL = require('./parameterized-sql');
var Transaction = require('./transaction');
module.exports = SQLConnector;
@ -25,6 +26,8 @@ SQLConnector.ParameterizedSQL = ParameterizedSQL;
// The generic placeholder
var PLACEHOLDER = SQLConnector.PLACEHOLDER = ParameterizedSQL.PLACEHOLDER;
SQLConnector.Transaction = Transaction;
/**
* Set the relational property to indicate the backend is a relational DB
* @type {boolean}
@ -55,7 +58,6 @@ SQLConnector.prototype.getTypes = function() {
* Returns {Function}
*/
SQLConnector.prototype.getDefaultIdType = function(prop) {
/*jshint unused:false */
return Number;
};
@ -467,7 +469,6 @@ SQLConnector.prototype.exists = function(model, id, options, cb) {
' WHERE ' + this.idColumnEscaped(model)
);
selectStmt.merge(this.buildWhere(model, where));
selectStmt = this.applyPagination(model, selectStmt, {
limit: 1,
offset: 0,
@ -506,6 +507,7 @@ SQLConnector.prototype.destroy = function(model, id, options, cb) {
where[idName] = id;
this.destroyAll(model, where, options, cb);
};
// Alias to `destroy`. Juggler checks `destroy` only.
Connector.defineAliases(SQLConnector.prototype, 'destroy',
['delete', 'deleteById', 'destroyById']);
@ -542,6 +544,7 @@ SQLConnector.prototype.destroyAll = function(model, where, options, cb) {
}
});
};
// Alias to `destroyAll`. Juggler checks `destroyAll` only.
Connector.defineAliases(SQLConnector.prototype, 'destroyAll', ['deleteAll']);
@ -606,6 +609,7 @@ SQLConnector.prototype.update = function(model, where, data, options, cb) {
}
});
};
// Alias to `update`. Juggler checks `update` only.
Connector.defineAliases(SQLConnector.prototype, 'update', ['updateAll']);
@ -1041,6 +1045,7 @@ SQLConnector.prototype.all = function find(model, filter, options, cb) {
}
});
};
// Alias to `all`. Juggler checks `all` only.
Connector.defineAliases(SQLConnector.prototype, 'all', ['findAll']);

87
lib/transaction.js Normal file
View File

@ -0,0 +1,87 @@
var assert = require('assert');
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var debug = require('debug')('loopback:connector:transaction');
module.exports = Transaction;
/**
* Create a new Transaction object
* @param {Connector} connector The connector instance
* @param {*} connection A connection to the DB
* @constructor
*/
function Transaction(connector, connection) {
this.connector = connector;
this.connection = connection;
EventEmitter.call(this);
}
util.inherits(Transaction, EventEmitter);
// Isolation levels
Transaction.SERIALIZABLE = 'SERIALIZABLE';
Transaction.REPEATABLE_READ = 'REPEATABLE READ';
Transaction.READ_COMMITTED = 'READ COMMITTED';
Transaction.READ_UNCOMMITTED = 'READ UNCOMMITTED';
Transaction.hookTypes = {
BEFORE_COMMIT: 'before commit',
AFTER_COMMIT: 'after commit',
BEFORE_ROLLBACK: 'before rollback',
AFTER_ROLLBACK: 'after rollback',
TIMEOUT: 'timeout'
};
/**
* Commit a transaction and release it back to the pool
* @param cb
* @returns {*}
*/
Transaction.prototype.commit = function(cb) {
return this.connector.commit(this.connection, cb);
};
/**
* Rollback a transaction and release it back to the pool
* @param cb
* @returns {*|boolean}
*/
Transaction.prototype.rollback = function(cb) {
return this.connector.rollback(this.connection, cb);
};
/**
* Begin a new transaction
* @param {Connector} connector The connector instance
* @param {Object} [options] Options {isolationLevel: '...', timeout: 1000}
* @param cb
*/
Transaction.begin = function(connector, options, cb) {
if (typeof isolationLevel === 'function' && cb === undefined) {
cb = options;
options = {};
}
if (typeof options === 'string') {
options = {isolationLevel: options};
}
var isolationLevel = options.isolationLevel || Transaction.READ_COMMITTED;
assert(isolationLevel === Transaction.SERIALIZABLE ||
isolationLevel === Transaction.REPEATABLE_READ ||
isolationLevel === Transaction.READ_COMMITTED ||
isolationLevel === Transaction.READ_UNCOMMITTED, 'Invalid isolationLevel');
debug('Starting a transaction with options: %j', options);
assert(typeof connector.beginTransaction === 'function',
'beginTransaction must be function implemented by the connector');
connector.beginTransaction(isolationLevel, function(err, connection) {
if (err) {
return cb(err);
}
var tx = connection;
if (!(connection instanceof Transaction)) {
tx = new Transaction(connector, connection);
}
cb(err, tx);
});
};

View File

@ -45,4 +45,3 @@ describe('sql connector', function() {
});
});

View File

@ -3,6 +3,29 @@
*/
var util = require('util');
var SQLConnector = require('../../lib/sql');
var debug = require('debug')('loopback:connector:test-sql');
var transactionId = 0;
function MockTransaction(connector, name) {
this.connector = connector;
this.name = name;
this.data = [];
}
MockTransaction.prototype.commit = function(cb) {
var self = this;
this.data.forEach(function(d) {
self.connector.collection.data.push(d);
});
this.data = [];
cb();
};
MockTransaction.prototype.rollback = function(cb) {
this.data = [];
cb();
};
exports.initialize = function initializeDataSource(dataSource, callback) {
process.nextTick(function() {
@ -18,6 +41,9 @@ exports.initialize = function initializeDataSource(dataSource, callback) {
function TestConnector(settings) {
SQLConnector.call(this, 'testdb', settings);
this._tables = {};
this.collection = {
data: []
};
}
util.inherits(TestConnector, SQLConnector);
@ -67,6 +93,59 @@ TestConnector.prototype._buildLimit = function(model, limit, offset) {
return 'LIMIT ' + (offset ? (offset + ',' + limit) : limit);
};
TestConnector.prototype.applyPagination =
function(model, stmt, filter) {
/*jshint unused:false */
var limitClause = this._buildLimit(model, filter.limit,
filter.offset || filter.skip);
return stmt.merge(limitClause);
};
TestConnector.prototype.escapeName = function(name) {
return '`' + name + '`';
};
TestConnector.prototype.dbName = function(name) {
return name.toUpperCase();
};
TestConnector.prototype.getPlaceholderForValue = function(key) {
return '$' + key;
};
TestConnector.prototype.escapeValue = function(value) {
if (typeof value === 'number' || typeof value === 'boolean') {
return value;
}
if (typeof value === 'string') {
return "'" + value + "'";
}
if (value == null) {
return 'NULL';
}
if (typeof value === 'object') {
return String(value);
}
return value;
};
TestConnector.prototype.toColumnValue = function(prop, val, escaping) {
return escaping ? this.escapeValue(val) : val;
};
TestConnector.prototype._buildLimit = function(model, limit, offset) {
if (isNaN(limit)) {
limit = 0;
}
if (isNaN(offset)) {
offset = 0;
}
if (!limit && !offset) {
return '';
}
return 'LIMIT ' + (offset ? (offset + ',' + limit) : limit);
};
TestConnector.prototype.applyPagination =
function(model, stmt, filter) {
/*jshint unused:false */
@ -101,6 +180,49 @@ TestConnector.prototype.createTable = function(model, cb) {
});
};
TestConnector.prototype.executeSQL = function(sql, params, options, callback) {
callback(null, []);
TestConnector.prototype.getInsertedId = function(model, info) {
return info;
};
TestConnector.prototype.fromColumnValue = function(propertyDef, value) {
return value;
};
TestConnector.prototype.beginTransaction = function(isolationLevel, cb) {
var name = 'tx_' + transactionId++;
cb(null, new MockTransaction(this, name));
};
TestConnector.prototype.commit = function(tx, cb) {
tx.commit(cb);
};
TestConnector.prototype.rollback = function(tx, cb) {
tx.rollback(cb);
};
TestConnector.prototype.executeSQL = function(sql, params, options, callback) {
var transaction = options.transaction;
if (transaction && transaction.connector === this && transaction.connection) {
if (sql.indexOf('INSERT') === 0) {
transaction.connection.data.push({title: 't1', content: 'c1'});
debug('INSERT', transaction.connection.data, sql,
transaction.connection.name);
callback(null, 1);
}
else {
debug('SELECT', transaction.connection.data, sql,
transaction.connection.name);
callback(null, transaction.connection.data);
}
} else {
if (sql.indexOf('INSERT') === 0) {
this.collection.data.push({title: 't1', content: 'c1'});
debug('INSERT', this.collection.data, sql);
callback(null, 1);
} else {
debug('SELECT', this.collection.data, sql);
callback(null, this.collection.data);
}
}
};

View File

@ -307,4 +307,3 @@ describe('sql connector', function() {
}).to.throw('callback must be a function');
});
});

175
test/transaction.test.js Normal file
View File

@ -0,0 +1,175 @@
var Transaction = require('../index').Transaction;
var expect = require('chai').expect;
var testConnector = require('./connectors/test-sql-connector');
var juggler = require('loopback-datasource-juggler');
var db, Post;
describe('transactions', function() {
before(function(done) {
db = new juggler.DataSource({
connector: testConnector,
debug: true
});
db.once('connected', function() {
Post = db.define('PostTX', {
title: {type: String, length: 255, index: true},
content: {type: String}
});
done();
});
});
var currentTx;
var hooks = [];
// Return an async function to start a transaction and create a post
function createPostInTx(post, timeout) {
return function(done) {
// Transaction.begin(db.connector, Transaction.READ_COMMITTED,
Post.beginTransaction({
isolationLevel: Transaction.READ_COMMITTED,
timeout: timeout
},
function(err, tx) {
if (err) return done(err);
expect(typeof tx.id).to.eql('string');
hooks = [];
tx.observe('before commit', function(context, next) {
hooks.push('before commit');
next();
});
tx.observe('after commit', function(context, next) {
hooks.push('after commit');
next();
});
tx.observe('before rollback', function(context, next) {
hooks.push('before rollback');
next();
});
tx.observe('after rollback', function(context, next) {
hooks.push('after rollback');
next();
});
currentTx = tx;
Post.create(post, {transaction: tx},
function(err, p) {
if (err) {
done(err);
} else {
done();
}
});
});
};
}
// Return an async function to find matching posts and assert number of
// records to equal to the count
function expectToFindPosts(where, count, inTx) {
return function(done) {
var options = {};
if (inTx) {
options.transaction = currentTx;
}
Post.find({where: where}, options,
function(err, posts) {
if (err) return done(err);
expect(posts.length).to.be.eql(count);
done();
});
};
}
describe('commit', function() {
var post = {title: 't1', content: 'c1'};
before(createPostInTx(post));
it('should not see the uncommitted insert', expectToFindPosts(post, 0));
it('should see the uncommitted insert from the same transaction',
expectToFindPosts(post, 1, true));
it('should commit a transaction', function(done) {
currentTx.commit(function(err) {
expect(hooks).to.eql(['before commit', 'after commit']);
done(err);
});
});
it('should see the committed insert', expectToFindPosts(post, 1));
it('should report error if the transaction is not active', function(done) {
currentTx.commit(function(err) {
expect(err).to.be.instanceof(Error);
done();
});
});
});
describe('rollback', function() {
before(function() {
// Reset the collection
db.connector.collection.data = [];
});
var post = {title: 't2', content: 'c2'};
before(createPostInTx(post));
it('should not see the uncommitted insert', expectToFindPosts(post, 0));
it('should see the uncommitted insert from the same transaction',
expectToFindPosts(post, 1, true));
it('should rollback a transaction', function(done) {
currentTx.rollback(function(err) {
expect(hooks).to.eql(['before rollback', 'after rollback']);
done(err);
});
});
it('should not see the rolledback insert', expectToFindPosts(post, 0));
it('should report error if the transaction is not active', function(done) {
currentTx.rollback(function(err) {
expect(err).to.be.instanceof(Error);
done();
});
});
});
describe('timeout', function() {
before(function() {
// Reset the collection
db.connector.collection.data = [];
});
var post = {title: 't3', content: 'c3'};
before(createPostInTx(post, 50));
it('should report timeout', function(done) {
setTimeout(function() {
Post.find({where: {title: 't3'}}, {transaction: currentTx},
function(err, posts) {
if (err) return done(err);
expect(posts.length).to.be.eql(1);
done();
});
}, 100);
done();
});
it('should invoke the timeout hook', function(done) {
currentTx.observe('timeout', function(context, next) {
next();
done();
});
});
});
});