Merge pull request #130 from strongloop/feature/fix-sql-count

Fix Model.count base implementation and Normalize/validate the query filter
This commit is contained in:
Raymond Feng 2014-06-04 15:25:05 -07:00
commit ea5c766ec1
3 changed files with 257 additions and 29 deletions

View File

@ -18,6 +18,7 @@ var Memory = require('./connectors/memory').Memory;
var utils = require('./utils');
var fieldsToArray = utils.fieldsToArray;
var removeUndefined = utils.removeUndefined;
var util = require('util');
/**
* Base class for all persistent objects.
@ -37,8 +38,6 @@ function DataAccessObject() {
}
}
function idName(m) {
return m.getDataSource().idName
? m.getDataSource().idName(m.modelName) : 'id';
@ -400,13 +399,88 @@ var operators = {
nlike: 'NOT LIKE'
};
/*
* Normalize the filter object and throw errors if invalid values are detected
* @param {Object} filter The query filter object
* @returns {Object} The normalized filter object
* @private
*/
DataAccessObject._normalize = function (filter) {
if (!filter) {
return undefined;
}
var err = null;
if ((typeof filter !== 'object') || Array.isArray(filter)) {
err = new Error(util.format('The query filter %j is not an object', filter));
err.statusCode = 400;
throw err;
}
if (filter.limit || filter.skip || filter.offset) {
var limit = Number(filter.limit || 100);
var offset = Number(filter.skip || filter.offset || 0);
if (isNaN(limit) || limit <= 0 || Math.ceil(limit) !== limit) {
err = new Error(util.format('The limit parameter %j is not valid',
filter.limit));
err.statusCode = 400;
throw err;
}
if (isNaN(offset) || offset < 0 || Math.ceil(offset) !== offset) {
err = new Error(util.format('The offset/skip parameter %j is not valid',
filter.skip || filter.offset));
err.statusCode = 400;
throw err;
}
filter.limit = limit;
filter.offset = offset;
delete filter.skip;
}
// normalize fields as array of included property names
if (filter.fields) {
filter.fields = fieldsToArray(filter.fields,
Object.keys(this.definition.properties));
}
filter = removeUndefined(filter);
this._coerce(filter.where);
return filter;
};
/*
* Coerce values based the property types
* @param {Object} where The where clause
* @returns {Object} The coerced where clause
* @private
*/
DataAccessObject._coerce = function (where) {
var self = this;
if (!where) {
return where;
}
var props = this.getDataSource().getModelDefinition(this.modelName).properties;
var err;
if (typeof where !== 'object' || Array.isArray(where)) {
err = new Error(util.format('The where clause %j is not an object', where));
err.statusCode = 400;
throw err;
}
var props = self.definition.properties;
for (var p in where) {
// Handle logical operators
if (p === 'and' || p === 'or' || p === 'nor') {
var clauses = where[p];
if (Array.isArray(clauses)) {
for (var i = 0; i < clauses.length; i++) {
self._coerce(clauses[i]);
}
} else {
err = new Error(util.format('The %p operator has invalid clauses %j', p, clauses));
err.statusCode = 400;
throw err;
}
return where;
}
var DataType = props[p] && props[p].type;
if (!DataType) {
continue;
@ -556,24 +630,21 @@ DataAccessObject.find = function find(query, cb) {
cb = query;
query = null;
}
var constr = this;
var self = this;
query = query || {};
if (query.where) {
query.where = this._coerce(query.where);
try {
this._normalize(query);
} catch (err) {
return process.nextTick(function () {
cb && cb(err);
});
}
var fields = query.fields;
var near = query && geo.nearFilter(query.where);
var supportsGeo = !!this.getDataSource().connector.buildNearFilter;
// normalize fields as array of included property names
if (fields) {
query.fields = fieldsToArray(fields, Object.keys(this.definition.properties));
}
query = removeUndefined(query);
if (near) {
if (supportsGeo) {
// convert it
@ -583,15 +654,15 @@ DataAccessObject.find = function find(query, cb) {
// using all documents
this.getDataSource().connector.all(this.modelName, {}, function (err, data) {
var memory = new Memory();
var modelName = constr.modelName;
var modelName = self.modelName;
if (err) {
cb(err);
} else if (Array.isArray(data)) {
memory.define({
properties: constr.dataSource.definitions[constr.modelName].properties,
settings: constr.dataSource.definitions[constr.modelName].settings,
model: constr
properties: self.dataSource.definitions[self.modelName].properties,
settings: self.dataSource.definitions[self.modelName].settings,
model: self
});
data.forEach(function (obj) {
@ -614,7 +685,7 @@ DataAccessObject.find = function find(query, cb) {
this.getDataSource().connector.all(this.modelName, query, function (err, data) {
if (data && data.forEach) {
data.forEach(function (d, i) {
var obj = new constr();
var obj = new self();
obj._initProperties(d, {fields: query.fields});
@ -726,9 +797,15 @@ DataAccessObject.remove = DataAccessObject.deleteAll = DataAccessObject.destroyA
if(!err) Model.emit('deletedAll');
}.bind(this));
} else {
// Support an optional where object
where = removeUndefined(where);
where = this._coerce(where);
try {
// Support an optional where object
where = removeUndefined(where);
where = this._coerce(where);
} catch (err) {
return process.nextTick(function() {
cb && cb(err);
});
}
this.getDataSource().connector.destroyAll(this.modelName, where, function (err, data) {
cb && cb(err, data);
if(!err) Model.emit('deletedAll', where);
@ -783,8 +860,14 @@ DataAccessObject.count = function (where, cb) {
cb = where;
where = null;
}
where = removeUndefined(where);
where = this._coerce(where);
try {
where = removeUndefined(where);
where = this._coerce(where);
} catch (err) {
return process.nextTick(function () {
cb && cb(err);
});
}
this.getDataSource().connector.count(this.modelName, cb, where);
};
@ -999,10 +1082,11 @@ setRemoting(DataAccessObject.prototype.updateAttributes, {
http: {verb: 'put', path: '/'}
});
/**
/*
* Reload object from persistence
* Requires `id` member of `object` to be able to call `find`
* @param {Function} callback Called with (err, instance) arguments
* @private
*/
DataAccessObject.prototype.reload = function reload(callback) {
if (stillConnecting(this.getDataSource(), this, arguments)) {
@ -1013,12 +1097,13 @@ DataAccessObject.prototype.reload = function reload(callback) {
};
/*!
/*
* Define readonly property on object
*
* @param {Object} obj
* @param {String} key
* @param {Mixed} value
* @private
*/
function defineReadonlyProp(obj, key, value) {
Object.defineProperty(obj, key, {
@ -1044,12 +1129,12 @@ DataAccessObject.scope = function (name, query, targetClass) {
defineScope(this, targetClass || this, name, query);
};
/*!
/*
* Add 'include'
*/
jutil.mixin(DataAccessObject, Inclusion);
/*!
/*
* Add 'relation'
*/
jutil.mixin(DataAccessObject, Relation);

View File

@ -292,8 +292,14 @@ BaseSQL.prototype.count = function count(model, callback, where) {
var self = this;
var props = this._models[model].properties;
var whereClause = '';
if (typeof this.buildWhere === 'function') {
whereClause = this.buildWhere(model, where);
} else {
whereClause = buildWhere(where);
}
this.queryOne('SELECT count(*) as cnt FROM ' +
this.tableEscaped(model) + ' ' + buildWhere(where), function (err, res) {
this.tableEscaped(model) + ' ' + whereClause, function (err, res) {
if (err) {
return callback(err);
}

View File

@ -993,7 +993,7 @@ describe('Model with scopes', function () {
});
describe('DataAccessObject', function () {
var ds, model, where;
var ds, model, where, error;
before(function () {
ds = new DataSource('memory');
@ -1007,6 +1007,10 @@ describe('DataAccessObject', function () {
});
});
beforeEach(function () {
error = null;
});
it('should be able to coerce where clause for string types', function () {
where = model._coerce({id: 1});
assert.deepEqual(where, {id: '1'});
@ -1069,6 +1073,139 @@ describe('DataAccessObject', function () {
});
it('should be able to coerce where clause with and operators', function () {
where = model._coerce({and: [{age: '10'}, {vip: 'true'}]});
assert.deepEqual(where, {and: [{age: 10}, {vip: true}]});
});
it('should be able to coerce where clause with or operators', function () {
where = model._coerce({or: [{age: '10'}, {vip: 'true'}]});
assert.deepEqual(where, {or: [{age: 10}, {vip: true}]});
});
it('should throw if the where property is not an object', function () {
try {
// The where clause has to be an object
model._coerce('abc');
} catch (err) {
error = err;
}
assert(error, 'An error should have been thrown');
});
it('should throw if the where property is an array', function () {
try {
// The where clause cannot be an array
model._coerce([
{vip: true}
]);
} catch (err) {
error = err;
}
assert(error, 'An error should have been thrown');
});
it('should throw if the and operator does not take an array', function () {
try {
// The and operator only takes an array of objects
model._coerce({and: {x: 1}});
} catch (err) {
error = err;
}
assert(error, 'An error should have been thrown');
});
it('should throw if the or operator does not take an array', function () {
try {
// The or operator only takes an array of objects
model._coerce({or: {x: 1}});
} catch (err) {
error = err;
}
assert(error, 'An error should have been thrown');
});
it('should throw if the or operator does not take an array of objects', function () {
try {
// The or operator only takes an array of objects
model._coerce({or: ['x']});
} catch(err) {
error = err;
}
assert(error, 'An error should have been thrown');
});
it('should throw if filter property is not an object', function () {
var filter = null;
try {
// The filter clause has to be an object
filter = model._normalize('abc');
} catch (err) {
error = err;
}
assert(error, 'An error should have been thrown');
});
it('should throw if filter.limit property is not a number', function () {
try {
// The limit param must be a valid number
filter = model._normalize({limit: 'x'});
} catch (err) {
error = err;
}
assert(error, 'An error should have been thrown');
});
it('should throw if filter.limit property is nagative', function () {
try {
// The limit param must be a valid number
filter = model._normalize({limit: -1});
} catch (err) {
error = err;
}
assert(error, 'An error should have been thrown');
});
it('should throw if filter.limit property is not an integer', function () {
try {
// The limit param must be a valid number
filter = model._normalize({limit: 5.8});
} catch (err) {
error = err;
}
assert(error, 'An error should have been thrown');
});
it('should throw if filter.offset property is not a number', function () {
try {
// The limit param must be a valid number
filter = model._normalize({offset: 'x'});
} catch (err) {
error = err;
}
assert(error, 'An error should have been thrown');
});
it('should throw if filter.skip property is not a number', function () {
try {
// The limit param must be a valid number
filter = model._normalize({skip: '_'});
} catch (err) {
error = err;
}
assert(error, 'An error should have been thrown');
});
it('should normalize limit/offset/skip', function () {
filter = model._normalize({limit: '10', skip: 5});
assert.deepEqual(filter, {limit: 10, offset: 5});
});
it('should set the default value for limit', function () {
filter = model._normalize({skip: 5});
assert.deepEqual(filter, {limit: 100, offset: 5});
});
it('should skip GeoPoint', function () {
where = model._coerce({location: {near: {lng: 10, lat: 20}, maxDistance: 20}});
assert.deepEqual(where, {location: {near: {lng: 10, lat: 20}, maxDistance: 20}});