From 0ce1fa9f87ffe253363ffaad0f85569a278b2f00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Lehni?= Date: Wed, 6 Sep 2017 07:10:57 +0200 Subject: [PATCH] Add a better way to handle transactions --- lib/dao.js | 157 +++++++++++---------------- lib/datasource.js | 185 ++++++++++++++++++++++++++++---- lib/transaction.js | 88 ++++++++------- package.json | 2 +- test/transaction.test.js | 225 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 499 insertions(+), 158 deletions(-) create mode 100644 test/transaction.test.js diff --git a/lib/dao.js b/lib/dao.js index be4a0ed8..bde651ab 100644 --- a/lib/dao.js +++ b/lib/dao.js @@ -130,6 +130,45 @@ function errorModelNotFound(idValue) { return error; } +function invokeConnectorMethod(connector, method, Model, args, options, cb) { + var dataSource = Model.getDataSource(); + // If the DataSource is a transaction and no transaction object is provide in + // the options yet, add it to the options, see: DataSource#transaction() + var opts = dataSource.isTransaction && !options.transaction ? Object.assign( + options, {transaction: dataSource.currentTransaction}) : options; + var optionsSupported = connector[method].length >= args.length + 3; + var transaction = opts.transaction; + if (transaction) { + if (!optionsSupported) { + return process.nextTick(function() { + cb(new Error(g.f( + 'The connector does not support {{method}} within a transaction', method))); + }); + } + // transaction isn't always a Transaction instance. Some tests provide a + // string to test if options get passed through, so check for ensureActive: + if (transaction.ensureActive && !transaction.ensureActive(cb)) { + return; + } + } + var modelName = Model.modelName; + var fullArgs; + if (!optionsSupported && method === 'count') { + // NOTE: The old count() signature is irregular, with `where` coming last: + // [modelName, cb, where] + var where = args[0]; + fullArgs = [modelName, cb, where]; + } else { + // Standard signature: [modelName, ...args, (opts, ) cb] + fullArgs = [modelName].concat(args); + if (optionsSupported) { + fullArgs.push(opts); + } + fullArgs.push(cb); + } + connector[method].apply(connector, fullArgs); +} + DataAccessObject._forDB = function(data) { if (!(this.getDataSource().isRelational && this.getDataSource().isRelational())) { return data; @@ -365,7 +404,6 @@ DataAccessObject.create = function(data, options, cb) { obj.trigger('create', function(createDone) { obj.trigger('save', function(saveDone) { var _idName = idName(Model); - var modelName = Model.modelName; var val = removeUndefined(obj.toObject(true)); function createCallback(err, id, rev) { if (id) { @@ -431,12 +469,8 @@ DataAccessObject.create = function(data, options, cb) { }; Model.notifyObserversOf('persist', context, function(err) { if (err) return cb(err); - - if (connector.create.length === 4) { - connector.create(modelName, obj.constructor._forDB(context.data), options, createCallback); - } else { - connector.create(modelName, obj.constructor._forDB(context.data), createCallback); - } + invokeConnectorMethod(connector, 'create', Model, [obj.constructor._forDB(context.data)], + options, createCallback); }); }, obj, cb); }, obj, cb); @@ -579,8 +613,6 @@ DataAccessObject.upsert = function(data, options, cb) { Model.applyProperties(update, inst); Model = Model.lookupModel(update); - var connector = self.getConnector(); - if (doValidate === false) { callConnector(); } else { @@ -611,11 +643,7 @@ DataAccessObject.upsert = function(data, options, cb) { }; Model.notifyObserversOf('persist', context, function(err) { if (err) return done(err); - if (connector.updateOrCreate.length === 4) { - connector.updateOrCreate(Model.modelName, update, options, done); - } else { - connector.updateOrCreate(Model.modelName, update, done); - } + invokeConnectorMethod(connector, 'updateOrCreate', Model, [update], options, done); }); } function done(err, data, info) { @@ -728,7 +756,6 @@ DataAccessObject.upsertWithWhere = function(where, data, options, cb) { var self = this; var Model = this; var connector = Model.getConnector(); - var modelName = Model.modelName; var query = {where: where}; var context = { Model: Model, @@ -795,7 +822,7 @@ DataAccessObject.upsertWithWhere = function(where, data, options, cb) { }; Model.notifyObserversOf('persist', context, function(err) { if (err) return done(err); - connector.upsertWithWhere(modelName, ctx.where, update, options, done); + invokeConnectorMethod(connector, 'upsertWithWhere', Model, [ctx.where, update], options, done); }); } function done(err, data, info) { @@ -944,8 +971,6 @@ DataAccessObject.replaceOrCreate = function replaceOrCreate(data, options, cb) { Model.applyProperties(update, inst); Model = Model.lookupModel(update); - var connector = self.getConnector(); - if (options.validate === false) { return callConnector(); } @@ -972,7 +997,7 @@ DataAccessObject.replaceOrCreate = function replaceOrCreate(data, options, cb) { }; Model.notifyObserversOf('persist', context, function(err) { if (err) return done(err); - connector.replaceOrCreate(Model.modelName, context.data, options, done); + invokeConnectorMethod(connector, 'replaceOrCreate', Model, [context.data], options, done); }); } function done(err, data, info) { @@ -1097,7 +1122,6 @@ DataAccessObject.findOrCreate = function findOrCreate(query, data, options, cb) var connector = Model.getConnector(); function _findOrCreate(query, data, currentInstance) { - var modelName = self.modelName; function findOrCreateCallback(err, data, created) { if (err) return cb(err); var context = { @@ -1155,12 +1179,8 @@ DataAccessObject.findOrCreate = function findOrCreate(query, data, options, cb) Model.notifyObserversOf('persist', context, function(err) { if (err) return cb(err); - - if (connector.findOrCreate.length === 5) { - connector.findOrCreate(modelName, query, self._forDB(context.data), options, findOrCreateCallback); - } else { - connector.findOrCreate(modelName, query, self._forDB(context.data), findOrCreateCallback); - } + invokeConnectorMethod(connector, 'findOrCreate', Model, [query, self._forDB(context.data)], + options, findOrCreateCallback); }); } @@ -1776,7 +1796,7 @@ DataAccessObject._coerce = function(where, options) { } } else { if (val != null) { - const allowExtendedOperators = self._allowExtendedOperators(options); + var allowExtendedOperators = self._allowExtendedOperators(options); if (operator === null && val instanceof RegExp) { // Normalize {name: /A/} to {name: {regexp: /A/}} operator = 'regexp'; @@ -1952,8 +1972,8 @@ DataAccessObject.find = function find(query, options, cb) { cb(err); } else if (Array.isArray(data)) { memory.define({ - properties: self.dataSource.definitions[self.modelName].properties, - settings: self.dataSource.definitions[self.modelName].settings, + properties: self.dataSource.definitions[modelName].properties, + settings: self.dataSource.definitions[modelName].settings, model: self, }); @@ -1993,11 +2013,7 @@ DataAccessObject.find = function find(query, options, cb) { } var geoCallback = options.notify === false ? geoCallbackWithoutNotify : geoCallbackWithNotify; - if (connector.all.length === 4) { - connector.all(self.modelName, {}, options, geoCallback); - } else { - connector.all(self.modelName, {}, geoCallback); - } + invokeConnectorMethod(connector, 'all', self, [{}], options, geoCallback); } // already handled return cb.promise; @@ -2100,11 +2116,7 @@ DataAccessObject.find = function find(query, options, cb) { }; if (options.notify === false) { - if (connector.all.length === 4) { - connector.all(self.modelName, query, options, allCb); - } else { - connector.all(self.modelName, query, allCb); - } + invokeConnectorMethod(connector, 'all', self, [query], options, allCb); } else { var context = { Model: this, @@ -2114,10 +2126,7 @@ DataAccessObject.find = function find(query, options, cb) { }; this.notifyObserversOf('access', context, function(err, ctx) { if (err) return cb(err); - - connector.all.length === 4 ? - connector.all(self.modelName, ctx.query, options, allCb) : - connector.all(self.modelName, ctx.query, allCb); + invokeConnectorMethod(connector, 'all', self, [ctx.query], options, allCb); }); } return cb.promise; @@ -2258,11 +2267,7 @@ DataAccessObject.destroyAll = function destroyAll(where, options, cb) { }; if (whereIsEmpty(where)) { - if (connector.destroyAll.length === 4) { - connector.destroyAll(Model.modelName, {}, options, done); - } else { - connector.destroyAll(Model.modelName, {}, done); - } + invokeConnectorMethod(connector, 'destroyAll', Model, [{}], options, done); } else { try { // Support an optional where object @@ -2276,11 +2281,7 @@ DataAccessObject.destroyAll = function destroyAll(where, options, cb) { }); } - if (connector.destroyAll.length === 4) { - connector.destroyAll(Model.modelName, where, options, done); - } else { - connector.destroyAll(Model.modelName, where, done); - } + invokeConnectorMethod(connector, 'destroyAll', Model, [where], options, done); } function done(err, info) { @@ -2444,17 +2445,7 @@ DataAccessObject.count = function(where, options, cb) { }; this.notifyObserversOf('access', context, function(err, ctx) { if (err) return cb(err); - where = ctx.query.where; - - if (connector.count.length <= 3) { - // Old signature, please note where is the last - // count(model, cb, where) - connector.count(Model.modelName, cb, where); - } else { - // New signature - // count(model, where, options, cb) - connector.count(Model.modelName, where, options, cb); - } + invokeConnectorMethod(connector, 'count', Model, [ctx.query.where], options, cb); }); return cb.promise; }; @@ -2507,7 +2498,6 @@ DataAccessObject.prototype.save = function(options, cb) { var inst = this; var connector = inst.getConnector(); - var modelName = Model.modelName; var context = { Model: Model, @@ -2591,12 +2581,8 @@ DataAccessObject.prototype.save = function(options, cb) { Model.notifyObserversOf('persist', context, function(err) { if (err) return cb(err); - - if (connector.save.length === 4) { - connector.save(modelName, inst.constructor._forDB(data), options, saveCallback); - } else { - connector.save(modelName, inst.constructor._forDB(data), saveCallback); - } + invokeConnectorMethod(connector, 'save', Model, [Model._forDB(data)], + options, saveCallback); }); }, data, cb); }, data, cb); @@ -2770,12 +2756,7 @@ DataAccessObject.updateAll = function(where, data, options, cb) { }; Model.notifyObserversOf('persist', context, function(err, ctx) { if (err) return cb(err); - - if (connector.update.length === 5) { - connector.update(Model.modelName, where, data, options, updateCallback); - } else { - connector.update(Model.modelName, where, data, updateCallback); - } + invokeConnectorMethod(connector, 'update', Model, [where, data], options, updateCallback); }); } return cb.promise; @@ -2907,11 +2888,7 @@ DataAccessObject.prototype.remove = }); } - if (connector.destroy.length === 4) { - connector.destroy(inst.constructor.modelName, id, options, destroyCallback); - } else { - connector.destroy(inst.constructor.modelName, id, destroyCallback); - } + invokeConnectorMethod(connector, 'destroy', Model, [id], options, destroyCallback); }, null, cb); } return cb.promise; @@ -3040,7 +3017,6 @@ DataAccessObject.replaceById = function(id, data, options, cb) { if (isPKMissing(Model, cb)) return cb.promise; - var model = Model.modelName; var hookState = {}; if (id !== data[pkName]) { @@ -3155,8 +3131,8 @@ DataAccessObject.replaceById = function(id, data, options, cb) { options: options, }; Model.notifyObserversOf('persist', ctx, function(err) { - connector.replaceById(model, id, - inst.constructor._forDB(context.data), options, replaceCallback); + invokeConnectorMethod(connector, 'replaceById', Model, [id, Model._forDB(context.data)], + options, replaceCallback); }); } } @@ -3216,7 +3192,6 @@ function(data, options, cb) { var allowExtendedOperators = Model._allowExtendedOperators(options); var strict = this.__strict; - var model = Model.modelName; var hookState = {}; // Convert the data to be plain object so that update won't be confused @@ -3352,13 +3327,9 @@ function(data, options, cb) { options: options, }; Model.notifyObserversOf('persist', ctx, function(err) { - if (connector.updateAttributes.length === 5) { - connector.updateAttributes(model, getIdValue(inst.constructor, inst), - inst.constructor._forDB(context.data), options, updateAttributesCallback); - } else { - connector.updateAttributes(model, getIdValue(inst.constructor, inst), - inst.constructor._forDB(context.data), updateAttributesCallback); - } + invokeConnectorMethod(connector, 'updateAttributes', Model, + [getIdValue(Model, inst), Model._forDB(context.data)], + options, updateAttributesCallback); }); }, data, cb); }, data, cb); diff --git a/lib/datasource.js b/lib/datasource.js index de68516d..6e0876e1 100644 --- a/lib/datasource.js +++ b/lib/datasource.js @@ -27,6 +27,7 @@ var traverse = require('traverse'); var g = require('strong-globalize')(); var juggler = require('..'); var deprecated = require('depd')('loopback-datasource-juggler'); +var Transaction = require('loopback-connector').Transaction; if (process.env.DEBUG === 'loopback') { // For back-compatibility @@ -2102,10 +2103,51 @@ DataSource.prototype.copyModel = function copyModel(Master) { /** * Run a transaction against the DataSource. - * @returns {EventEmitter} - * @private + * + * This method can be used in different ways based on the passed arguments and + * type of underlying data source: + * + * If no `execute()` function is provided and the underlying DataSource is a + * database that supports transactions, a Promise is returned that resolves to + * an EventEmitter representing the transaction once it is ready. + * `transaction.models` can be used to receive versions of the DataSource's + * model classes which are bound to the created transaction, so that all their + * database methods automatically use the transaction. At the end of all + * database transactions, `transaction.commit()` can be called to commit the + * transactions, or `transaction.rollback()` to roll them back. + * + * If no `execute()` function is provided on a transient or memory DataSource, + * the EventEmitter representing the transaction is returned immediately. For + * backward compatibility, this object also supports `transaction.exec()` + * instead of `transaction.commit()`, and calling `transaction.rollback()` is + * not required on such transient and memory DataSource instances. + * + * If an `execute()` function is provided, then it is called as soon as the + * transaction is ready, receiving `transaction.models` as its first + * argument. `transaction.commit()` and `transaction.rollback()` are then + * automatically called at the end of `execute()`, based on whether exceptions + * happen during execution or not. If no callback is provided to be called at + * the end of the execution, a Promise object is returned that is resolved or + * rejected as soon as the execution is completed, and the transaction is + * committed or rolled back. + * + * @param {Function} execute The execute function, called with (models). Note + * that the instances in `models` are bound to the created transaction, and + * are therefore not identical with the models in `app.models`, but slaves + * thereof (optional). + * @options {Object} options The options to be passed to `beginTransaction()` + * when creating the transaction on database sources (optional). + * @param {Function} cb Callback called with (err) + * @returns {Promise | EventEmitter} */ -DataSource.prototype.transaction = function() { +DataSource.prototype.transaction = function(execute, options, cb) { + if (cb === undefined && typeof options === 'function') { + cb = options; + options = {}; + } else { + options = options || {}; + } + var dataSource = this; var transaction = new EventEmitter(); @@ -2115,26 +2157,133 @@ DataSource.prototype.transaction = function() { transaction.isTransaction = true; transaction.origin = dataSource; - transaction.name = dataSource.name; - transaction.settings = dataSource.settings; transaction.connected = false; transaction.connecting = false; - transaction.connector = dataSource.connector.transaction(); - // create blank models pool - transaction.modelBuilder = new ModelBuilder(); - transaction.models = transaction.modelBuilder.models; - transaction.definitions = transaction.modelBuilder.definitions; - - for (var i in dataSource.modelBuilder.models) { - dataSource.copyModel.call(transaction, dataSource.modelBuilder.models[i]); - } - - transaction.exec = function(cb) { - transaction.connector.exec(cb); + // Don't allow creating transactions on a transaction data-source: + transaction.transaction = function() { + throw new Error(g.f('Nesting transactions is not supported')); }; - return transaction; + // Create a blank pool for the slave models bound to this transaction. + var modelBuilder = new ModelBuilder(); + var slaveModels = modelBuilder.models; + transaction.modelBuilder = modelBuilder; + transaction.models = slaveModels; + transaction.definitions = modelBuilder.definitions; + + // For performance reasons, use a getter per model and only call copyModel() + // for the models that are actually used. These getters are then replaced + // with the actual values on first use. + var masterModels = dataSource.modelBuilder.models; + Object.keys(masterModels).forEach(function(name) { + Object.defineProperty(slaveModels, name, { + enumerable: true, + configurable: true, + get: function() { + // Delete getter so copyModel() can redefine slaveModels[name]. + // NOTE: No need to set the new value as copyModel() takes care of it. + delete slaveModels[name]; + return dataSource.copyModel.call(transaction, masterModels[name]); + }, + }); + }); + + var done = function(err) { + if (err) { + transaction.rollback(function(error) { + cb(err || error); + }); + } else { + transaction.commit(cb); + } + // Make sure cb() isn't called twice, e.g. if `execute` returns a + // thenable object and also calls the passed `cb` function. + done = function() {}; + }; + + function handleExecute() { + if (execute) { + cb = cb || utils.createPromiseCallback(); + try { + var result = execute(slaveModels, done); + if (result && typeof result.then === 'function') { + result.then(function() { done(); }, done); + } + } catch (err) { + done(err); + } + return cb.promise; + } else if (cb) { + cb(null, transaction); + } else { + return transaction; + } + } + + function transactionCreated(err, tx) { + if (err) { + cb(err); + } else { + // Expose transaction on the created transaction dataSource so it can be + // retrieved again in determineOptions() in dao.js, as well as referenced + // in transaction.commit() and transaction.rollback() below. + transaction.currentTransaction = tx; + // Handle timeout and pass it on as an error. + tx.observe('timeout', function(context, next) { + const err = new Error(g.f('Transaction is rolled back due to timeout')); + err.code = 'TRANSACTION_TIMEOUT'; + // Pass on the error to next(), so that the final 'timeout' observer in + // loopback-connector does not trigger a rollback by itself that we + // can't get a callback for when it's finished. + next(err); + // Call done(err) after, to execute the rollback here and reject the + // promise with the error when it's completed. + done(err); + }); + handleExecute(); + } + } + + function ensureTransaction(transaction, cb) { + if (!transaction) { + process.nextTick(function() { + cb(new Error(g.f( + 'Transaction is not ready, wait for the returned promise to resolve'))); + }); + } + return transaction; + } + + var connector = dataSource.connector; + if (connector.transaction) { + // Create a transient or memory source transaction. + transaction.connector = connector.transaction(); + transaction.commit = + transaction.exec = function(cb) { + this.connector.exec(cb); + }; + transaction.rollback = function(cb) { + // No need to do anything here. + cb(); + }; + return handleExecute(); + } else if (connector.beginTransaction) { + // Create a database source transaction. + transaction.exec = + transaction.commit = function(cb) { + ensureTransaction(this.currentTransaction, cb).commit(cb); + }; + transaction.rollback = function(cb) { + ensureTransaction(this.currentTransaction, cb).rollback(cb); + }; + // Always use callback / promise due to the use of beginTransaction() + cb = cb || utils.createPromiseCallback(); + Transaction.begin(connector, options, transactionCreated); + return cb.promise; + } else { + throw new Error(g.f('DataSource does not support transactions')); + } }; /** diff --git a/lib/transaction.js b/lib/transaction.js index 39371087..ca323aa2 100644 --- a/lib/transaction.js +++ b/lib/transaction.js @@ -134,31 +134,24 @@ if (Transaction) { * @returns {Promise|undefined} Returns a callback promise. */ Transaction.prototype.commit = function(cb) { - var self = this; cb = cb || utils.createPromiseCallback(); - // Report an error if the transaction is not active - if (!self.connection) { - process.nextTick(function() { - cb(new Error(g.f('The {{transaction}} is not active: %s', self.id))); - }); - return cb.promise; + if (this.ensureActive(cb)) { + var context = { + transaction: this, + operation: 'commit', + }; + this.notifyObserversAround('commit', context, + done => { + this.connector.commit(this.connection, done); + }, + err => { + // Deference the connection to mark the transaction is not active + // The connection should have been released back the pool + this.connection = null; + cb(err); + } + ); } - var context = { - transaction: self, - operation: 'commit', - }; - - function work(done) { - self.connector.commit(self.connection, done); - } - - self.notifyObserversAround('commit', context, work, function(err) { - // Deference the connection to mark the transaction is not active - // The connection should have been released back the pool - self.connection = null; - cb(err); - }); - return cb.promise; }; @@ -181,34 +174,37 @@ if (Transaction) { * @returns {Promise|undefined} Returns a callback promise. */ Transaction.prototype.rollback = function(cb) { - var self = this; cb = cb || utils.createPromiseCallback(); - // Report an error if the transaction is not active - if (!self.connection) { - process.nextTick(function() { - cb(new Error(g.f('The {{transaction}} is not active: %s', self.id))); - }); - return cb.promise; + if (this.ensureActive(cb)) { + var context = { + transaction: this, + operation: 'rollback', + }; + this.notifyObserversAround('rollback', context, + done => { + this.connector.rollback(this.connection, done); + }, + err => { + // Deference the connection to mark the transaction is not active + // The connection should have been released back the pool + this.connection = null; + cb(err); + } + ); } - var context = { - transaction: self, - operation: 'rollback', - }; - - function work(done) { - self.connector.rollback(self.connection, done); - } - - self.notifyObserversAround('rollback', context, work, function(err) { - // Deference the connection to mark the transaction is not active - // The connection should have been released back the pool - self.connection = null; - cb(err); - }); - return cb.promise; }; + Transaction.prototype.ensureActive = function(cb) { + // Report an error if the transaction is not active + if (!this.connection) { + process.nextTick(() => { + cb(new Error(g.f('The {{transaction}} is not active: %s', this.id))); + }); + } + return !!this.connection; + }; + Transaction.prototype.toJSON = function() { return this.id; }; diff --git a/package.json b/package.json index d3faacc7..0614bd6c 100644 --- a/package.json +++ b/package.json @@ -48,7 +48,7 @@ "depd": "^1.0.0", "inflection": "^1.6.0", "lodash": "^4.17.4", - "loopback-connector": "^4.0.0", + "loopback-connector": "^4.3.0", "minimatch": "^3.0.3", "qs": "^6.5.0", "shortid": "^2.2.6", diff --git a/test/transaction.test.js b/test/transaction.test.js new file mode 100644 index 00000000..5c973ab5 --- /dev/null +++ b/test/transaction.test.js @@ -0,0 +1,225 @@ +// Copyright IBM Corp. 2017. All Rights Reserved. +// Node module: loopback-datasource-juggler +// This file is licensed under the MIT License. +// License text available at https://opensource.org/licenses/MIT + +'use strict'; +/* global getSchema:false */ +const DataSource = require('..').DataSource; +const EventEmitter = require('events'); +const Connector = require('loopback-connector').Connector; +const Transaction = require('loopback-connector').Transaction; +const should = require('./init.js'); + +describe('Transactions on memory connector', function() { + let db, tx; + + before(() => { + db = getSchema(); + db.define('Model'); + }); + + it('returns an EventEmitter object', done => { + tx = db.transaction(); + tx.should.be.instanceOf(EventEmitter); + done(); + }); + + it('exposes and caches slave models', done => { + testModelCaching(tx.models, db.models); + done(); + }); + + it('changes count when committing', done => { + db.models.Model.count((err, count) => { + should.not.exist(err); + should.exist(count); + count.should.equal(0); + tx.models.Model.create(Array(1), () => { + // Only called after tx.commit()! + }); + tx.commit(err => { + should.not.exist(err); + db.models.Model.count((err, count) => { + should.not.exist(err); + should.exist(count); + count.should.equal(1); + done(); + }); + }); + }); + }); +}); + +describe('Transactions on test connector without execute()', () => { + let db, tx; + + before(() => { + db = createDataSource(); + }); + + beforeEach(resetState); + + it('resolves to an EventEmitter', done => { + const promise = db.transaction(); + promise.should.be.Promise(); + promise.then(transaction => { + should.exist(transaction); + transaction.should.be.instanceof(EventEmitter); + tx = transaction; + done(); + }, done); + }); + + it('exposes and caches slave models', done => { + testModelCaching(tx.models, db.models); + done(); + }); + + it('does not allow nesting of transactions', done => { + (() => tx.transaction()).should.throw('Nesting transactions is not supported'); + done(); + }); + + it('calls commit() on the connector', done => { + db.transaction().then(tx => { + tx.commit(err => { + callCount.should.deepEqual({commit: 1, rollback: 0, create: 0}); + done(err); + }); + }, done); + }); + + it('calls rollback() on the connector', done => { + db.transaction().then(tx => { + tx.rollback(err => { + callCount.should.deepEqual({commit: 0, rollback: 1, create: 0}); + done(err); + }); + }, done); + }); +}); + +describe('Transactions on test connector with execute()', () => { + let db; + + before(() => { + db = createDataSource(); + }); + + beforeEach(resetState); + + it('passes models and calls commit() automatically', done => { + db.transaction(models => { + testModelCaching(models, db.models); + return models.Model.create({}); + }, err => { + callCount.should.deepEqual({commit: 1, rollback: 0, create: 1}); + transactionPassed.should.be.true(); + done(err); + }); + }); + + it('calls rollback() automatically when throwing an error', done => { + let error; + db.transaction(models => { + error = new Error('exception'); + throw error; + }, err => { + error.should.equal(err); + callCount.should.deepEqual({commit: 0, rollback: 1, create: 0}); + done(); + }); + }); + + it('reports execution timeouts', done => { + let timedOut = false; + db.transaction(models => { + setTimeout(() => { + models.Model.create({}, function(err) { + if (!timedOut) { + done(new Error('Timeout was ineffective')); + } else { + should.exist(err); + err.message.should.startWith('The transaction is not active:'); + done(); + } + }); + }, 50); + }, { + timeout: 25, + }, err => { + timedOut = true; + should.exist(err); + err.code.should.equal('TRANSACTION_TIMEOUT'); + err.message.should.equal('Transaction is rolled back due to timeout'); + callCount.should.deepEqual({commit: 0, rollback: 1, create: 0}); + }); + }); +}); + +function createDataSource() { + let db = new DataSource({ + initialize: (dataSource, cb) => { + dataSource.connector = new TestConnector(); + cb(); + }, + }); + db.define('Model'); + return db; +} + +function testModelCaching(txModels, dbModels) { + should.exist(txModels); + // Test models caching mechanism: + // Model property should be a accessor with a getter first: + const accessor = Object.getOwnPropertyDescriptor(txModels, 'Model'); + should.exist(accessor); + should.exist(accessor.get); + accessor.get.should.be.Function(); + const Model = txModels.Model; + should.exist(Model); + // After accessing it once, it should be a normal cached property: + const desc = Object.getOwnPropertyDescriptor(txModels, 'Model'); + should.exist(desc.value); + Model.should.equal(txModels.Model); + Model.prototype.should.be.instanceof(dbModels.Model); +} + +let callCount; +let transactionPassed; + +function resetState() { + callCount = {commit: 0, rollback: 0, create: 0}; + transactionPassed = false; +} + +class TestConnector extends Connector { + constructor() { + super('test'); + } + + beginTransaction(isolationLevel, cb) { + this.currentTransaction = new Transaction(this, this); + process.nextTick(() => cb(null, this.currentTransaction)); + } + + commit(tx, cb) { + callCount.commit++; + cb(); + } + + rollback(tx, cb) { + callCount.rollback++; + cb(); + } + + create(model, data, options, cb) { + callCount.create++; + const transaction = options.transaction; + const current = this.currentTransaction; + transactionPassed = transaction && + (current === transaction || current === transaction.connection); + cb(); + } +}