diff --git a/lib/datasource.js b/lib/datasource.js index bed1f58e..da0632e6 100644 --- a/lib/datasource.js +++ b/lib/datasource.js @@ -4,6 +4,7 @@ var ModelBuilder = require('./model-builder.js').ModelBuilder; var ModelDefinition = require('./model-definition.js'); var RelationDefinition = require('./relation-definition.js'); +var OberserverMixin = require('./observer'); var jutil = require('./jutil'); var utils = require('./utils'); var ModelBaseClass = require('./model.js'); @@ -182,6 +183,8 @@ DataSource.prototype._setupConnector = function () { log(q || query, t1); }; }; + // Configure the connector instance to mix in observer functions + jutil.mixin(this.connector, OberserverMixin); } }; diff --git a/lib/observer.js b/lib/observer.js index 5588ad56..893ac0be 100644 --- a/lib/observer.js +++ b/lib/observer.js @@ -88,11 +88,59 @@ ObserverMixin.notifyObserversOf = function(operation, context, callback) { ); }); return callback.promise; -} +}; ObserverMixin._notifyBaseObservers = function(operation, context, callback) { if (this.base && this.base.notifyObserversOf) this.base.notifyObserversOf(operation, context, callback); else callback(); -} +}; + +/** + * Run the given function with before/after observers. It's done in three serial + * steps asynchronously: + * + * - Notify the registered observers under 'before ' + operation + * - Execute the function + * - Notify the registered observers under 'after ' + operation + * + * If an error happens, it fails fast and calls the callback with err. + * + * @param {String} operation The operation name + * @param {Context} context The context object + * @param {Function} fn The task to be invoked as fn(done) or fn(context, done) + * @param {Function} callback The callback function + * @returns {*} + */ +ObserverMixin.notifyObserversAround = function(operation, context, fn, callback) { + var self = this; + return self.notifyObserversOf('before ' + operation, context, + function(err, context) { + if (err) return callback(err, context); + + function cbForWork(err) { + if (err) return callback(err, context); + var returnedArgs = [].slice.call(arguments, 1); + context.results = returnedArgs; + self.notifyObserversOf('after ' + operation, context, + function(err, context) { + if (err) return callback(err, context); + var results = returnedArgs; + if (context) { + results = context.results; + } + var args = [err].concat(results); + callback.apply(null, args); + }); + } + + if (fn.length === 1) { + // fn(done) + fn(cbForWork); + } else { + // fn(context, done) + fn(context, cbForWork); + } + }); +}; diff --git a/lib/transaction.js b/lib/transaction.js index 9ec76055..489d0630 100644 --- a/lib/transaction.js +++ b/lib/transaction.js @@ -118,18 +118,18 @@ if (Transaction) { transaction: self, operation: 'commit' }; - self.notifyObserversOf('before commit', context, function(err) { - if (err) return cb(err); - self.connector.commit(self.connection, function(err) { - if (err) return cb(err); - self.notifyObserversOf('after commit', context, function(err) { - // Deference the connection to mark the transaction is not active - // The connection should have been released back the pool - self.connection = null; - cb(err); - }); - }); + + function work(done) { + self.connector.commit(self.connection, done); + } + + self.notifyObserversAround('commit', context, work, function(err) { + // Deference the connection to mark the transaction is not active + // The connection should have been released back the pool + self.connection = null; + cb(err); }); + return cb.promise; }; @@ -152,18 +152,18 @@ if (Transaction) { transaction: self, operation: 'rollback' }; - self.notifyObserversOf('before rollback', context, function(err) { - if (err) return cb(err); - self.connector.rollback(self.connection, function(err) { - if (err) return cb(err); - self.notifyObserversOf('after rollback', context, function(err) { - // Deference the connection to mark the transaction is not active - // The connection should have been released back the pool - self.connection = null; - cb(err); - }); - }); + + function work(done) { + self.connector.rollback(self.connection, done); + } + + self.notifyObserversAround('rollback', context, work, function(err) { + // Deference the connection to mark the transaction is not active + // The connection should have been released back the pool + self.connection = null; + cb(err); }); + return cb.promise; }; diff --git a/test/async-observer.test.js b/test/async-observer.test.js index 3786969e..f337f158 100644 --- a/test/async-observer.test.js +++ b/test/async-observer.test.js @@ -132,6 +132,70 @@ describe('async observer', function() { }); }); + describe('notifyObserversAround', function() { + var notifications; + beforeEach(function() { + notifications = []; + TestModel.observe('before execute', + pushAndNext(notifications, 'before execute')); + TestModel.observe('after execute', + pushAndNext(notifications, 'after execute')); + }); + + it('should notify before/after observers', function(done) { + var context = {}; + + function work(done) { + process.nextTick(function() { + done(null, 1); + }); + } + + TestModel.notifyObserversAround('execute', context, work, + function(err, result) { + notifications.should.eql(['before execute', 'after execute']); + result.should.eql(1); + done(); + }); + }); + + it('should allow work with context', function(done) { + var context = {}; + + function work(context, done) { + process.nextTick(function() { + done(null, 1); + }); + } + + TestModel.notifyObserversAround('execute', context, work, + function(err, result) { + notifications.should.eql(['before execute', 'after execute']); + result.should.eql(1); + done(); + }); + }); + + it('should notify before/after observers with multiple results', + function(done) { + var context = {}; + + function work(done) { + process.nextTick(function() { + done(null, 1, 2); + }); + } + + TestModel.notifyObserversAround('execute', context, work, + function(err, r1, r2) { + r1.should.eql(1); + r2.should.eql(2); + notifications.should.eql(['before execute', 'after execute']); + done(); + }); + }); + }); + it('resolves promises returned by observers', function(done) { TestModel.observe('event', function(ctx) { return Promise.resolve('value-to-ignore'); diff --git a/test/memory.test.js b/test/memory.test.js index afd3b8df..14c5bb06 100644 --- a/test/memory.test.js +++ b/test/memory.test.js @@ -584,4 +584,48 @@ describe('Memory connector with options', function() { }); +describe('Memory connector with observers', function() { + var ds = new DataSource({ + connector: 'memory' + }); + + it('should have observer mixed into the connector', function() { + ds.connector.observe.should.be.a.function; + ds.connector.notifyObserversOf.should.be.a.function; + }); + + it('should notify observers', function(done) { + var events = []; + ds.connector.execute = function(command, params, options, cb) { + var self = this; + var context = {command: command, params: params, options: options}; + self.notifyObserversOf('before execute', context, function(err) { + process.nextTick(function() { + if (err) return cb(err); + events.push('execute'); + self.notifyObserversOf('after execute', context, function(err) { + cb(err); + }); + }); + }); + }; + + ds.connector.observe('before execute', function(context, next) { + events.push('before execute'); + next(); + }); + + ds.connector.observe('after execute', function(context, next) { + events.push('after execute'); + next(); + }); + + ds.connector.execute('test', [1, 2], {x: 2}, function(err) { + if (err) return done(err); + events.should.eql(['before execute', 'execute', 'after execute']); + done(); + }); + }); +}); +