Mix in observer apis to the connector

This commit is contained in:
Raymond Feng 2015-05-20 15:02:44 -07:00
parent 44bdeb7ae0
commit 506223885d
5 changed files with 183 additions and 24 deletions

View File

@ -4,6 +4,7 @@
var ModelBuilder = require('./model-builder.js').ModelBuilder; var ModelBuilder = require('./model-builder.js').ModelBuilder;
var ModelDefinition = require('./model-definition.js'); var ModelDefinition = require('./model-definition.js');
var RelationDefinition = require('./relation-definition.js'); var RelationDefinition = require('./relation-definition.js');
var OberserverMixin = require('./observer');
var jutil = require('./jutil'); var jutil = require('./jutil');
var utils = require('./utils'); var utils = require('./utils');
var ModelBaseClass = require('./model.js'); var ModelBaseClass = require('./model.js');
@ -182,6 +183,8 @@ DataSource.prototype._setupConnector = function () {
log(q || query, t1); log(q || query, t1);
}; };
}; };
// Configure the connector instance to mix in observer functions
jutil.mixin(this.connector, OberserverMixin);
} }
}; };

View File

@ -88,11 +88,59 @@ ObserverMixin.notifyObserversOf = function(operation, context, callback) {
); );
}); });
return callback.promise; return callback.promise;
} };
ObserverMixin._notifyBaseObservers = function(operation, context, callback) { ObserverMixin._notifyBaseObservers = function(operation, context, callback) {
if (this.base && this.base.notifyObserversOf) if (this.base && this.base.notifyObserversOf)
this.base.notifyObserversOf(operation, context, callback); this.base.notifyObserversOf(operation, context, callback);
else else
callback(); callback();
};
/**
* Run the given function with before/after observers. It's done in three serial
* steps asynchronously:
*
* - Notify the registered observers under 'before ' + operation
* - Execute the function
* - Notify the registered observers under 'after ' + operation
*
* If an error happens, it fails fast and calls the callback with err.
*
* @param {String} operation The operation name
* @param {Context} context The context object
* @param {Function} fn The task to be invoked as fn(done) or fn(context, done)
* @param {Function} callback The callback function
* @returns {*}
*/
ObserverMixin.notifyObserversAround = function(operation, context, fn, callback) {
var self = this;
return self.notifyObserversOf('before ' + operation, context,
function(err, context) {
if (err) return callback(err, context);
function cbForWork(err) {
if (err) return callback(err, context);
var returnedArgs = [].slice.call(arguments, 1);
context.results = returnedArgs;
self.notifyObserversOf('after ' + operation, context,
function(err, context) {
if (err) return callback(err, context);
var results = returnedArgs;
if (context) {
results = context.results;
} }
var args = [err].concat(results);
callback.apply(null, args);
});
}
if (fn.length === 1) {
// fn(done)
fn(cbForWork);
} else {
// fn(context, done)
fn(context, cbForWork);
}
});
};

View File

@ -118,18 +118,18 @@ if (Transaction) {
transaction: self, transaction: self,
operation: 'commit' operation: 'commit'
}; };
self.notifyObserversOf('before commit', context, function(err) {
if (err) return cb(err); function work(done) {
self.connector.commit(self.connection, function(err) { self.connector.commit(self.connection, done);
if (err) return cb(err); }
self.notifyObserversOf('after commit', context, function(err) {
self.notifyObserversAround('commit', context, work, function(err) {
// Deference the connection to mark the transaction is not active // Deference the connection to mark the transaction is not active
// The connection should have been released back the pool // The connection should have been released back the pool
self.connection = null; self.connection = null;
cb(err); cb(err);
}); });
});
});
return cb.promise; return cb.promise;
}; };
@ -152,18 +152,18 @@ if (Transaction) {
transaction: self, transaction: self,
operation: 'rollback' operation: 'rollback'
}; };
self.notifyObserversOf('before rollback', context, function(err) {
if (err) return cb(err); function work(done) {
self.connector.rollback(self.connection, function(err) { self.connector.rollback(self.connection, done);
if (err) return cb(err); }
self.notifyObserversOf('after rollback', context, function(err) {
self.notifyObserversAround('rollback', context, work, function(err) {
// Deference the connection to mark the transaction is not active // Deference the connection to mark the transaction is not active
// The connection should have been released back the pool // The connection should have been released back the pool
self.connection = null; self.connection = null;
cb(err); cb(err);
}); });
});
});
return cb.promise; return cb.promise;
}; };

View File

@ -132,6 +132,70 @@ describe('async observer', function() {
}); });
}); });
describe('notifyObserversAround', function() {
var notifications;
beforeEach(function() {
notifications = [];
TestModel.observe('before execute',
pushAndNext(notifications, 'before execute'));
TestModel.observe('after execute',
pushAndNext(notifications, 'after execute'));
});
it('should notify before/after observers', function(done) {
var context = {};
function work(done) {
process.nextTick(function() {
done(null, 1);
});
}
TestModel.notifyObserversAround('execute', context, work,
function(err, result) {
notifications.should.eql(['before execute', 'after execute']);
result.should.eql(1);
done();
});
});
it('should allow work with context', function(done) {
var context = {};
function work(context, done) {
process.nextTick(function() {
done(null, 1);
});
}
TestModel.notifyObserversAround('execute', context, work,
function(err, result) {
notifications.should.eql(['before execute', 'after execute']);
result.should.eql(1);
done();
});
});
it('should notify before/after observers with multiple results',
function(done) {
var context = {};
function work(done) {
process.nextTick(function() {
done(null, 1, 2);
});
}
TestModel.notifyObserversAround('execute', context, work,
function(err, r1, r2) {
r1.should.eql(1);
r2.should.eql(2);
notifications.should.eql(['before execute', 'after execute']);
done();
});
});
});
it('resolves promises returned by observers', function(done) { it('resolves promises returned by observers', function(done) {
TestModel.observe('event', function(ctx) { TestModel.observe('event', function(ctx) {
return Promise.resolve('value-to-ignore'); return Promise.resolve('value-to-ignore');

View File

@ -584,4 +584,48 @@ describe('Memory connector with options', function() {
}); });
describe('Memory connector with observers', function() {
var ds = new DataSource({
connector: 'memory'
});
it('should have observer mixed into the connector', function() {
ds.connector.observe.should.be.a.function;
ds.connector.notifyObserversOf.should.be.a.function;
});
it('should notify observers', function(done) {
var events = [];
ds.connector.execute = function(command, params, options, cb) {
var self = this;
var context = {command: command, params: params, options: options};
self.notifyObserversOf('before execute', context, function(err) {
process.nextTick(function() {
if (err) return cb(err);
events.push('execute');
self.notifyObserversOf('after execute', context, function(err) {
cb(err);
});
});
});
};
ds.connector.observe('before execute', function(context, next) {
events.push('before execute');
next();
});
ds.connector.observe('after execute', function(context, next) {
events.push('after execute');
next();
});
ds.connector.execute('test', [1, 2], {x: 2}, function(err) {
if (err) return done(err);
events.should.eql(['before execute', 'execute', 'after execute']);
done();
});
});
});