Intent-based hooks for persistence

This patch introduces a new API for "intent-based" hooks. These hooks
are not tied to a particular method (e.g. "find" or "update"). Instead,
they are triggered from all methods that execute a particular "intent".

The consumer API is very simple, there is a new method
Model.observe(name, observer), where the observer is function
observer(context, callback).

Observers are inherited by child models and it is possible to register
multiple observers for the same hook.

List of hooks:

 - query
 - before save
 - after save
 - after delete
This commit is contained in:
Miroslav Bajtoš 2015-01-21 17:57:47 +01:00
parent b3d07ebbe8
commit 1fd6eff10f
5 changed files with 1610 additions and 198 deletions

View File

@ -594,6 +594,9 @@ Memory.prototype.updateAttributes = function updateAttributes(model, id, data, c
}
}
// Do not modify the data object passed in arguments
data = Object.create(data);
this.setIdValue(model, data, id);
var cachedModels = this.collection(model);

View File

@ -7,6 +7,7 @@ module.exports = DataAccessObject;
/*!
* Module dependencies
*/
var async = require('async');
var jutil = require('./jutil');
var ValidationError = require('./validations').ValidationError;
var Relation = require('./relations.js');
@ -62,6 +63,16 @@ function byIdQuery(m, id) {
return query;
}
function isWhereByGivenId(Model, where, idValue) {
var keys = Object.keys(where);
if (keys.length != 1) return false;
var pk = idName(Model);
if (keys[0] !== pk) return false;
return where[pk] === idValue;
}
DataAccessObject._forDB = function (data) {
if (!(this.getDataSource().isRelational && this.getDataSource().isRelational())) {
return data;
@ -133,7 +144,7 @@ DataAccessObject.create = function (data, callback) {
var Model = this;
var self = this;
if (typeof data === 'function') {
callback = data;
data = {};
@ -183,39 +194,42 @@ DataAccessObject.create = function (data, callback) {
}
}
}
var enforced = {};
var obj;
var idValue = getIdValue(this, data);
// if we come from save
if (data instanceof Model && !idValue) {
obj = data;
} else {
obj = new Model(data);
}
this.applyProperties(enforced, obj);
obj.setAttributes(enforced);
Model = this.lookupModel(data); // data-specific
if (Model !== obj.constructor) obj = new Model(data);
data = obj.toObject(true);
// validation required
obj.isValid(function (valid) {
if (valid) {
create();
} else {
callback(new ValidationError(obj), obj);
}
}, data);
Model.notifyObserversOf('before save', { Model: Model, instance: obj }, function(err) {
if (err) return callback(err);
data = obj.toObject(true);
// validation required
obj.isValid(function (valid) {
if (valid) {
create();
} else {
callback(new ValidationError(obj), obj);
}
}, data);
});
function create() {
obj.trigger('create', function (createDone) {
obj.trigger('save', function (saveDone) {
var _idName = idName(Model);
var modelName = Model.modelName;
this._adapter().create(modelName, this.constructor._forDB(obj.toObject(true)), function (err, id, rev) {
@ -232,8 +246,13 @@ DataAccessObject.create = function (data, callback) {
obj.__persisted = true;
saveDone.call(obj, function () {
createDone.call(obj, function () {
callback(err, obj);
if(!err) Model.emit('changed', obj);
if (err) {
return callback(err, obj);
}
Model.notifyObserversOf('after save', { Model: Model, instance: obj }, function(err) {
callback(err, obj);
if(!err) Model.emit('changed', obj);
});
});
});
}, obj);
@ -267,45 +286,83 @@ DataAccessObject.updateOrCreate = DataAccessObject.upsert = function upsert(data
}
var self = this;
var Model = this;
if (!getIdValue(this, data)) {
var id = getIdValue(this, data);
if (!id) {
return this.create(data, callback);
}
if (this.getDataSource().connector.updateOrCreate) {
var update = data;
var inst = data;
if(!(data instanceof Model)) {
inst = new Model(data);
Model.notifyObserversOf('query', { Model: Model, query: byIdQuery(Model, id) }, doUpdateOrCreate);
function doUpdateOrCreate(err, ctx) {
if (err) return callback(err);
var isOriginalQuery = isWhereByGivenId(Model, ctx.query.where, id)
if (Model.getDataSource().connector.updateOrCreate && isOriginalQuery) {
var context = { Model: Model, where: ctx.query.where, data: data };
Model.notifyObserversOf('before save', context, function(err, ctx) {
if (err) return callback(err);
data = ctx.data;
var update = data;
var inst = data;
if(!(data instanceof Model)) {
inst = new Model(data);
}
update = inst.toObject(false);
Model.applyProperties(update, inst);
Model = Model.lookupModel(update);
// FIXME(bajtos) validate the model!
// https://github.com/strongloop/loopback-datasource-juggler/issues/262
update = inst.toObject(true);
update = removeUndefined(update);
self.getDataSource().connector
.updateOrCreate(Model.modelName, update, done);
function done(err, data) {
var obj;
if (data && !(data instanceof Model)) {
inst._initProperties(data);
obj = inst;
} else {
obj = data;
}
if (err) {
callback(err, obj);
if(!err) {
Model.emit('changed', inst);
}
} else {
Model.notifyObserversOf('after save', { Model: Model, instance: obj }, function(err) {
callback(err, obj);
if(!err) {
Model.emit('changed', inst);
}
});
}
}
});
} else {
Model.findOne({ where: ctx.query.where }, { notify: false }, function (err, inst) {
if (err) {
return callback(err);
}
if (!isOriginalQuery) {
// The custom query returned from a hook may hide the fact that
// there is already a model with `id` value `data[idName(Model)]`
delete data[idName(Model)];
}
if (inst) {
inst.updateAttributes(data, callback);
} else {
Model = self.lookupModel(data);
var obj = new Model(data);
obj.save(data, callback);
}
});
}
update = inst.toObject(false);
this.applyProperties(update, inst);
update = removeUndefined(update);
Model = this.lookupModel(update);
this.getDataSource().connector.updateOrCreate(Model.modelName, update, function (err, data) {
var obj;
if (data && !(data instanceof Model)) {
inst._initProperties(data);
obj = inst;
} else {
obj = data;
}
callback(err, obj);
if(!err) {
Model.emit('changed', inst);
}
});
} else {
this.findById(getIdValue(this, data), function (err, inst) {
if (err) {
return callback(err);
}
if (inst) {
inst.updateAttributes(data, callback);
} else {
Model = self.lookupModel(data);
var obj = new Model(data);
obj.save(data, callback);
}
});
}
};
@ -332,11 +389,11 @@ DataAccessObject.findOrCreate = function findOrCreate(query, data, callback) {
};
}
var t = this;
this.findOne(query, function (err, record) {
var Model = this;
Model.findOne(query, function (err, record) {
if (err) return callback(err);
if (record) return callback(null, record, false);
t.create(data, function (err, record) {
Model.create(data, function (err, record) {
callback(err, record, record != null);
});
});
@ -383,13 +440,13 @@ DataAccessObject.findByIds = function(ids, cond, cb) {
cb = cond;
cond = {};
}
var pk = idName(this);
if (ids.length === 0) {
process.nextTick(function() { cb(null, []); });
return;
}
var filter = { where: {} };
filter.where[pk] = { inq: [].concat(ids) };
mergeQuery(filter, cond || {});
@ -731,17 +788,26 @@ DataAccessObject._coerce = function (where) {
* @param {Function} callback Required callback function. Call this function with two arguments: `err` (null or Error) and an array of instances.
*/
DataAccessObject.find = function find(query, cb) {
DataAccessObject.find = function find(query, options, cb) {
if (stillConnecting(this.getDataSource(), this, arguments)) return;
if (arguments.length === 1) {
cb = query;
query = null;
options = {};
}
if (cb === undefined && typeof options === 'function') {
cb = options;
options = {};
}
if (!options) options = {};
var self = this;
query = query || {};
try {
this._normalize(query);
} catch (err) {
@ -751,7 +817,7 @@ DataAccessObject.find = function find(query, cb) {
}
this.applyScope(query);
var near = query && geo.nearFilter(query.where);
var supportsGeo = !!this.getDataSource().connector.buildNearFilter;
@ -763,42 +829,48 @@ DataAccessObject.find = function find(query, cb) {
// do in memory query
// using all documents
// TODO [fabien] use default scope here?
this.getDataSource().connector.all(this.modelName, {}, function (err, data) {
var memory = new Memory();
var modelName = self.modelName;
if (err) {
cb(err);
} else if (Array.isArray(data)) {
memory.define({
properties: self.dataSource.definitions[self.modelName].properties,
settings: self.dataSource.definitions[self.modelName].settings,
model: self
});
self.notifyObserversOf('query', { Model: self, query: query }, function(err, ctx) {
if (err) return cb(err);
data.forEach(function (obj) {
memory.create(modelName, obj, function () {
// noop
self.getDataSource().connector.all(self.modelName, {}, function (err, data) {
var memory = new Memory();
var modelName = self.modelName;
if (err) {
cb(err);
} else if (Array.isArray(data)) {
memory.define({
properties: self.dataSource.definitions[self.modelName].properties,
settings: self.dataSource.definitions[self.modelName].settings,
model: self
});
});
memory.all(modelName, query, cb);
} else {
cb(null, []);
}
}.bind(this));
data.forEach(function (obj) {
memory.create(modelName, obj, function () {
// noop
});
});
// FIXME: apply "includes" and other transforms - see allCb below
memory.all(modelName, ctx.query, cb);
} else {
cb(null, []);
}
});
});
// already handled
return;
}
}
this.getDataSource().connector.all(this.modelName, query, function (err, data) {
var allCb = function (err, data) {
if (data && data.forEach) {
data.forEach(function (d, i) {
var Model = self.lookupModel(d);
var obj = new Model(d, {fields: query.fields, applySetters: false, persisted: true});
if (query && query.include) {
if (query.collect) {
// The collect property indicates that the query is to return the
@ -815,7 +887,7 @@ DataAccessObject.find = function find(query, cb) {
if (utils.isPlainObject(inc)) {
relationName = Object.keys(inc)[0];
}
// Promote the included model as a direct property
var data = obj.__cachedRelations[relationName];
if(Array.isArray(data)) {
@ -840,7 +912,18 @@ DataAccessObject.find = function find(query, cb) {
}
else
cb(err, []);
});
}
var self = this;
if (options.notify === false) {
self.getDataSource().connector.all(self.modelName, query, allCb);
} else {
this.notifyObserversOf('query', { Model: this, query: query }, function(err, ctx) {
if (err) return cb(err);
var query = ctx.query;
self.getDataSource().connector.all(self.modelName, query, allCb);
});
}
};
/**
@ -850,16 +933,22 @@ DataAccessObject.find = function find(query, cb) {
* For example: `{where: {test: 'me'}}`.
* @param {Function} cb Callback function called with (err, instance)
*/
DataAccessObject.findOne = function findOne(query, cb) {
DataAccessObject.findOne = function findOne(query, options, cb) {
if (stillConnecting(this.getDataSource(), this, arguments)) return;
if (typeof query === 'function') {
cb = query;
query = {};
}
if (cb === undefined && typeof options === 'function') {
cb = options;
options = {};
}
query = query || {};
query.limit = 1;
this.find(query, function (err, collection) {
this.find(query, options, function (err, collection) {
if (err || !collection || !collection.length > 0) return cb(err, null);
cb(err, collection[0]);
});
@ -878,41 +967,79 @@ DataAccessObject.findOne = function findOne(query, cb) {
* @param {Object} [where] Optional object that defines the criteria. This is a "where" object. Do NOT pass a filter object.
* @param {Function} [cb] Callback called with (err)
*/
DataAccessObject.remove = DataAccessObject.deleteAll = DataAccessObject.destroyAll = function destroyAll(where, cb) {
DataAccessObject.remove = DataAccessObject.deleteAll = DataAccessObject.destroyAll = function destroyAll(where, options, cb) {
if (stillConnecting(this.getDataSource(), this, arguments)) return;
var Model = this;
if (!cb && 'function' === typeof where) {
if (!cb && !options && 'function' === typeof where) {
cb = where;
where = undefined;
}
if (!cb && typeof options === 'function') {
cb = options;
}
if (!cb) cb = function(){};
if (!options) options = {};
var query = { where: where };
this.applyScope(query);
where = query.where;
if (!where || (typeof where === 'object' && Object.keys(where).length === 0)) {
this.getDataSource().connector.destroyAll(this.modelName, function (err, data) {
cb && cb(err, data);
if(!err) Model.emit('deletedAll');
}.bind(this));
var context = { Model: Model, where: whereIsEmpty(where) ? {} : where };
if (options.notify === false) {
doDelete(where);
} else {
try {
// Support an optional where object
where = removeUndefined(where);
where = this._coerce(where);
} catch (err) {
return process.nextTick(function() {
cb && cb(err);
query = { where: whereIsEmpty(where) ? {} : where };
Model.notifyObserversOf('query',
{ Model: Model, query: query },
function(err, ctx) {
if (err) return cb(err);
doDelete(ctx.query.where);
});
}
function doDelete(where) {
if (whereIsEmpty(where)) {
Model.getDataSource().connector.destroyAll(Model.modelName, done);
} else {
try {
// Support an optional where object
where = removeUndefined(where);
where = Model._coerce(where);
} catch (err) {
return process.nextTick(function() {
cb && cb(err);
});
}
Model.getDataSource().connector.destroyAll(Model.modelName, where, done);
}
function done(err, data) {
if (err) return cb(er);
if (options.notify === false) {
return cb(err, data);
}
Model.notifyObserversOf('after delete', { Model: Model, where: where }, function(err) {
cb(err, data);
if (!err)
Model.emit('deletedAll', whereIsEmpty(where) ? undefined : where);
});
}
this.getDataSource().connector.destroyAll(this.modelName, where, function (err, data) {
cb && cb(err, data);
if(!err) Model.emit('deletedAll', where);
}.bind(this));
}
};
function whereIsEmpty(where) {
return !where ||
(typeof where === 'object' && Object.keys(where).length === 0)
}
/**
* Delete the record with the specified ID.
* Aliases are `destroyById` and `deleteById`.
@ -926,7 +1053,7 @@ DataAccessObject.remove = DataAccessObject.deleteAll = DataAccessObject.destroyA
DataAccessObject.removeById = DataAccessObject.destroyById = DataAccessObject.deleteById = function deleteById(id, cb) {
if (stillConnecting(this.getDataSource(), this, arguments)) return;
var Model = this;
this.remove(byIdQuery(this, id).where, function(err) {
if ('function' === typeof cb) {
cb(err);
@ -955,11 +1082,11 @@ DataAccessObject.count = function (where, cb) {
cb = where;
where = null;
}
var query = { where: where };
this.applyScope(query);
where = query.where;
try {
where = removeUndefined(where);
where = this._coerce(where);
@ -968,8 +1095,13 @@ DataAccessObject.count = function (where, cb) {
cb && cb(err);
});
}
this.getDataSource().connector.count(this.modelName, cb, where);
var Model = this;
this.notifyObserversOf('query', { Model: Model, query: { where: where } }, function(err, ctx) {
if (err) return cb(err);
where = ctx.query.where;
Model.getDataSource().connector.count(Model.modelName, cb, where);
});
};
/**
@ -999,59 +1131,67 @@ DataAccessObject.prototype.save = function (options, callback) {
if (!('throws' in options)) {
options.throws = false;
}
var inst = this;
var data = inst.toObject(true);
var modelName = Model.modelName;
Model.applyProperties(data, this);
if (this.isNewRecord()) {
return Model.create(this, callback);
} else {
inst.setAttributes(data);
}
// validate first
if (!options.validate) {
return save();
}
Model.notifyObserversOf('before save', { Model: Model, instance: inst }, function(err) {
if (err) return callback(err);
data = inst.toObject(true);
inst.isValid(function (valid) {
if (valid) {
save();
} else {
var err = new ValidationError(inst);
// throws option is dangerous for async usage
if (options.throws) {
throw err;
}
callback(err, inst);
// validate first
if (!options.validate) {
return save();
}
});
// then save
function save() {
inst.trigger('save', function (saveDone) {
inst.trigger('update', function (updateDone) {
data = removeUndefined(data);
inst._adapter().save(modelName, inst.constructor._forDB(data), function (err) {
if (err) {
return callback(err, inst);
}
inst._initProperties(data, { persisted: true });
updateDone.call(inst, function () {
saveDone.call(inst, function () {
callback(err, inst);
if(!err) {
Model.emit('changed', inst);
}
inst.isValid(function (valid) {
if (valid) {
save();
} else {
var err = new ValidationError(inst);
// throws option is dangerous for async usage
if (options.throws) {
throw err;
}
callback(err, inst);
}
});
// then save
function save() {
inst.trigger('save', function (saveDone) {
inst.trigger('update', function (updateDone) {
data = removeUndefined(data);
inst._adapter().save(modelName, inst.constructor._forDB(data), function (err) {
if (err) {
return callback(err, inst);
}
inst._initProperties(data, { persisted: true });
Model.notifyObserversOf('after save', { Model: Model, instance: inst }, function(err) {
if (err) return callback(err, inst);
updateDone.call(inst, function () {
saveDone.call(inst, function () {
callback(err, inst);
if(!err) {
Model.emit('changed', inst);
}
});
});
});
});
});
}, data, callback);
}, data, callback);
}, data, callback);
}
}
});
};
/**
@ -1093,24 +1233,56 @@ DataAccessObject.updateAll = function (where, data, cb) {
assert(typeof where === 'object', 'The where argument should be an object');
assert(typeof data === 'object', 'The data argument should be an object');
assert(cb === null || typeof cb === 'function', 'The cb argument should be a function');
var query = { where: where };
this.applyScope(query);
this.applyProperties(data);
where = query.where;
try {
where = removeUndefined(where);
where = this._coerce(where);
} catch (err) {
return process.nextTick(function () {
cb && cb(err);
var Model = this;
Model.notifyObserversOf('query', { Model: Model, query: { where: where } }, function(err, ctx) {
if (err) return cb && cb(err);
Model.notifyObserversOf(
'before save',
{
Model: Model,
where: ctx.query.where,
data: data
},
function(err, ctx) {
if (err) return cb && cb(err);
doUpdate(ctx.where, ctx.data);
});
});
function doUpdate(where, data) {
try {
where = removeUndefined(where);
where = Model._coerce(where);
} catch (err) {
return process.nextTick(function () {
cb && cb(err);
});
}
var connector = Model.getDataSource().connector;
connector.update(Model.modelName, where, data, function(err, count) {
if (err) return cb && cb (err);
Model.notifyObserversOf(
'after save',
{
Model: Model,
where: where,
data: data
},
function(err, ctx) {
return cb && cb(err, count);
});
});
}
var connector = this.getDataSource().connector;
connector.update(this.modelName, where, data, cb);
};
DataAccessObject.prototype.isNewRecord = function () {
@ -1134,23 +1306,50 @@ DataAccessObject.prototype.remove =
DataAccessObject.prototype.delete =
DataAccessObject.prototype.destroy = function (cb) {
if (stillConnecting(this.getDataSource(), this, arguments)) return;
var self = this;
var Model = this.constructor;
var id = getIdValue(this.constructor, this);
this.trigger('destroy', function (destroyed) {
this._adapter().destroy(this.constructor.modelName, id, function (err) {
if (err) {
return cb(err);
}
Model.notifyObserversOf(
'query',
{ Model: Model, query: byIdQuery(Model, id) },
function(err, ctx) {
if (err) return cb(err);
doDeleteInstance(ctx.query.where);
});
destroyed(function () {
if (cb) cb();
Model.emit('deleted', id);
function doDeleteInstance(where) {
if (!isWhereByGivenId(Model, where, id)) {
// A hook modified the query, it is no longer
// a simple 'delete model with the given id'.
// We must switch to full query-based delete.
Model.deleteAll(where, { notify: false }, function(err) {
if (err) return cb && cb(err);
Model.notifyObserversOf('after delete', { Model: Model, where: where }, function(err) {
cb && cb(err);
if (!err) Model.emit('deleted', id);
});
});
}.bind(this));
}, null, cb);
return;
}
self.trigger('destroy', function (destroyed) {
self._adapter().destroy(self.constructor.modelName, id, function (err) {
if (err) {
return cb(err);
}
destroyed(function () {
Model.notifyObserversOf('after delete', { Model: Model, where: where }, function(err) {
cb && cb(err);
if (!err) Model.emit('deleted', id);
});
});
});
}, null, cb);
}
};
/**
* Set a single attribute.
* Equivalent to `setAttributes({name: value})`
@ -1160,7 +1359,7 @@ DataAccessObject.prototype.remove =
*/
DataAccessObject.prototype.setAttribute = function setAttribute(name, value) {
this[name] = value; // TODO [fabien] - currently not protected by applyProperties
};
};
/**
* Update a single attribute.
@ -1184,17 +1383,17 @@ DataAccessObject.prototype.updateAttribute = function updateAttribute(name, valu
*/
DataAccessObject.prototype.setAttributes = function setAttributes(data) {
if (typeof data !== 'object') return;
this.constructor.applyProperties(data, this);
var Model = this.constructor;
var inst = this;
// update instance's properties
for (var key in data) {
inst.setAttribute(key, data[key]);
}
Model.emit('set', inst);
};
@ -1231,15 +1430,29 @@ DataAccessObject.prototype.updateAttributes = function updateAttributes(data, cb
data = {};
}
// update instance's properties
inst.setAttributes(data);
if (!cb) {
cb = function() {};
}
inst.isValid(function (valid) {
if (!valid) {
if (cb) {
var context = {
Model: Model,
where: byIdQuery(Model, getIdValue(Model, inst)).where,
data: data
};
Model.notifyObserversOf('before save', context, function(err, ctx) {
if (err) return cb(err);
data = ctx.data;
// update instance's properties
inst.setAttributes(data);
inst.isValid(function (valid) {
if (!valid) {
cb(new ValidationError(inst), inst);
return;
}
} else {
inst.trigger('save', function (saveDone) {
inst.trigger('update', function (done) {
var typedData = {};
@ -1248,7 +1461,7 @@ DataAccessObject.prototype.updateAttributes = function updateAttributes(data, cb
// Convert the properties by type
inst[key] = data[key];
typedData[key] = inst[key];
if (typeof typedData[key] === 'object'
if (typeof typedData[key] === 'object'
&& typedData[key] !== null
&& typeof typedData[key].toObject === 'function') {
typedData[key] = typedData[key].toObject();
@ -1260,15 +1473,18 @@ DataAccessObject.prototype.updateAttributes = function updateAttributes(data, cb
if (!err) inst.__persisted = true;
done.call(inst, function () {
saveDone.call(inst, function () {
if(cb) cb(err, inst);
if(!err) Model.emit('changed', inst);
if (err) return cb(err, inst);
Model.notifyObserversOf('after save', { Model: Model, instance: inst }, function(err) {
if(!err) Model.emit('changed', inst);
cb(err, inst);
});
});
});
});
}, data, cb);
}, data, cb);
}
}, data);
}, data);
});
};
/**

View File

@ -445,7 +445,7 @@ function addHooks(name, done) {
};
User['after' + name] = function (next) {
(new Boolean(called)).should.equal(true);
this.email.should.equal(random);
this.should.have.property('email', random);
done();
};
}

View File

@ -5,6 +5,7 @@ var fs = require('fs');
var assert = require('assert');
var async = require('async');
var should = require('./init.js');
var Memory = require('../lib/connectors/memory').Memory;
describe('Memory connector', function () {
var file = path.join(__dirname, 'memory.json');
@ -278,27 +279,27 @@ describe('Memory connector', function () {
}
});
it('should use collection setting', function (done) {
var ds = new DataSource({
connector: 'memory'
});
var Product = ds.createModel('Product', {
name: String
});
var Tool = ds.createModel('Tool', {
name: String
}, {memory: {collection: 'Product'}});
var Widget = ds.createModel('Widget', {
name: String
}, {memory: {collection: 'Product'}});
ds.connector.getCollection('Tool').should.equal('Product');
ds.connector.getCollection('Widget').should.equal('Product');
async.series([
function(next) {
Tool.create({ name: 'Tool A' }, next);
@ -359,6 +360,17 @@ describe('Memory connector', function () {
});
});
require('./persistence-hooks.suite')(
new DataSource({ connector: Memory }),
should);
});
describe('Unoptimized connector', function() {
var ds = new DataSource({ connector: Memory });
// disable optimized methods
ds.connector.updateOrCreate = false;
require('./persistence-hooks.suite')(ds, should);
});

File diff suppressed because it is too large Load Diff