Normalize/validate the query filter object

This commit is contained in:
Raymond Feng 2014-06-01 23:31:51 -07:00
parent 3e8284d1ee
commit 42c1ad3dca
2 changed files with 239 additions and 24 deletions

View File

@ -18,6 +18,7 @@ var Memory = require('./connectors/memory').Memory;
var utils = require('./utils'); var utils = require('./utils');
var fieldsToArray = utils.fieldsToArray; var fieldsToArray = utils.fieldsToArray;
var removeUndefined = utils.removeUndefined; var removeUndefined = utils.removeUndefined;
var util = require('util');
/** /**
* Base class for all persistent objects. * Base class for all persistent objects.
@ -37,8 +38,6 @@ function DataAccessObject() {
} }
} }
function idName(m) { function idName(m) {
return m.getDataSource().idName return m.getDataSource().idName
? m.getDataSource().idName(m.modelName) : 'id'; ? m.getDataSource().idName(m.modelName) : 'id';
@ -400,13 +399,83 @@ var operators = {
nlike: 'NOT LIKE' nlike: 'NOT LIKE'
}; };
/*!
* Normalize the filter object and throw errors if invalid values are detected
* @param {Object} filter The query filter object
* @returns {Object} The normalized filter object
* @private
*/
DataAccessObject._normalize = function (filter) {
if (!filter) {
return undefined;
}
var err = null;
if ((typeof filter !== 'object') || Array.isArray(filter)) {
err = new Error(util.format('The query filter %j is not an object', filter));
err.statusCode = 400;
throw err;
}
if (filter.limit || filter.skip || filter.offset) {
var limit = Number(filter.limit || 100);
var offset = Number(filter.skip || filter.offset || 0);
if (isNaN(limit) || limit <= 0 || Math.ceil(limit) !== limit) {
err = new Error(util.format('The limit parameter %j is not valid',
filter.limit));
err.statusCode = 400;
throw err;
}
if (isNaN(offset) || offset < 0 || Math.ceil(offset) !== offset) {
err = new Error(util.format('The offset/skip parameter %j is not valid',
filter.skip || filter.offset));
err.statusCode = 400;
throw err;
}
filter.limit = limit;
filter.offset = offset;
delete filter.skip;
}
// normalize fields as array of included property names
if (filter.fields) {
filter.fields = fieldsToArray(filter.fields,
Object.keys(this.definition.properties));
}
filter = removeUndefined(filter);
this._coerce(filter.where);
return filter;
};
/*!
* Coerce values based the property types
* @param {Object} where The where clause
* @returns {Object} The coerced where clause
* @private
*/
DataAccessObject._coerce = function (where) { DataAccessObject._coerce = function (where) {
var self = this;
if (!where) { if (!where) {
return where; return where;
} }
var props = this.getDataSource().getModelDefinition(this.modelName).properties; if (typeof where !== 'object' || Array.isArray(where)) {
var err = new Error(util.format('The where clause %j is not an object', where));
err.statusCode = 400;
throw err;
}
var props = self.definition.properties;
for (var p in where) { for (var p in where) {
// Handle logical operators
if (p === 'and' || p === 'or' || p === 'nor') {
var clauses = where[p];
if (Array.isArray(clauses)) {
for (var i = 0; i < clauses.length; i++) {
self._coerce(clauses[i]);
}
}
return where;
}
var DataType = props[p] && props[p].type; var DataType = props[p] && props[p].type;
if (!DataType) { if (!DataType) {
continue; continue;
@ -556,24 +625,21 @@ DataAccessObject.find = function find(query, cb) {
cb = query; cb = query;
query = null; query = null;
} }
var constr = this; var self = this;
query = query || {}; query = query || {};
if (query.where) { try {
query.where = this._coerce(query.where); this._normalize(query);
} catch (err) {
return process.nextTick(function () {
cb && cb(err);
});
} }
var fields = query.fields;
var near = query && geo.nearFilter(query.where); var near = query && geo.nearFilter(query.where);
var supportsGeo = !!this.getDataSource().connector.buildNearFilter; var supportsGeo = !!this.getDataSource().connector.buildNearFilter;
// normalize fields as array of included property names
if (fields) {
query.fields = fieldsToArray(fields, Object.keys(this.definition.properties));
}
query = removeUndefined(query);
if (near) { if (near) {
if (supportsGeo) { if (supportsGeo) {
// convert it // convert it
@ -583,15 +649,15 @@ DataAccessObject.find = function find(query, cb) {
// using all documents // using all documents
this.getDataSource().connector.all(this.modelName, {}, function (err, data) { this.getDataSource().connector.all(this.modelName, {}, function (err, data) {
var memory = new Memory(); var memory = new Memory();
var modelName = constr.modelName; var modelName = self.modelName;
if (err) { if (err) {
cb(err); cb(err);
} else if (Array.isArray(data)) { } else if (Array.isArray(data)) {
memory.define({ memory.define({
properties: constr.dataSource.definitions[constr.modelName].properties, properties: self.dataSource.definitions[self.modelName].properties,
settings: constr.dataSource.definitions[constr.modelName].settings, settings: self.dataSource.definitions[self.modelName].settings,
model: constr model: self
}); });
data.forEach(function (obj) { data.forEach(function (obj) {
@ -614,7 +680,7 @@ DataAccessObject.find = function find(query, cb) {
this.getDataSource().connector.all(this.modelName, query, function (err, data) { this.getDataSource().connector.all(this.modelName, query, function (err, data) {
if (data && data.forEach) { if (data && data.forEach) {
data.forEach(function (d, i) { data.forEach(function (d, i) {
var obj = new constr(); var obj = new self();
obj._initProperties(d, {fields: query.fields}); obj._initProperties(d, {fields: query.fields});
@ -726,9 +792,15 @@ DataAccessObject.remove = DataAccessObject.deleteAll = DataAccessObject.destroyA
if(!err) Model.emit('deletedAll'); if(!err) Model.emit('deletedAll');
}.bind(this)); }.bind(this));
} else { } else {
try {
// Support an optional where object // Support an optional where object
where = removeUndefined(where); where = removeUndefined(where);
where = this._coerce(where); where = this._coerce(where);
} catch (err) {
return process.nextTick(function() {
cb && cb(err);
});
}
this.getDataSource().connector.destroyAll(this.modelName, where, function (err, data) { this.getDataSource().connector.destroyAll(this.modelName, where, function (err, data) {
cb && cb(err, data); cb && cb(err, data);
if(!err) Model.emit('deletedAll', where); if(!err) Model.emit('deletedAll', where);
@ -783,8 +855,14 @@ DataAccessObject.count = function (where, cb) {
cb = where; cb = where;
where = null; where = null;
} }
try {
where = removeUndefined(where); where = removeUndefined(where);
where = this._coerce(where); where = this._coerce(where);
} catch (err) {
return process.nextTick(function () {
cb && cb(err);
});
}
this.getDataSource().connector.count(this.modelName, cb, where); this.getDataSource().connector.count(this.modelName, cb, where);
}; };

View File

@ -993,7 +993,7 @@ describe('Model with scopes', function () {
}); });
describe('DataAccessObject', function () { describe('DataAccessObject', function () {
var ds, model, where; var ds, model, where, error;
before(function () { before(function () {
ds = new DataSource('memory'); ds = new DataSource('memory');
@ -1007,6 +1007,10 @@ describe('DataAccessObject', function () {
}); });
}); });
beforeEach(function () {
error = null;
});
it('should be able to coerce where clause for string types', function () { it('should be able to coerce where clause for string types', function () {
where = model._coerce({id: 1}); where = model._coerce({id: 1});
assert.deepEqual(where, {id: '1'}); assert.deepEqual(where, {id: '1'});
@ -1069,6 +1073,139 @@ describe('DataAccessObject', function () {
}); });
it('should be able to coerce where clause with and operators', function () {
where = model._coerce({and: [{age: '10'}, {vip: 'true'}]});
assert.deepEqual(where, {and: [{age: 10}, {vip: true}]});
});
it('should be able to coerce where clause with or operators', function () {
where = model._coerce({or: [{age: '10'}, {vip: 'true'}]});
assert.deepEqual(where, {or: [{age: 10}, {vip: true}]});
});
it('should throw if the where property is not an object', function () {
try {
// The where clause has to be an object
error = err;model._coerce('abc');
} catch (err) {
error = err;
}
assert(error, 'An error should have been thrown');
});
it('should throw if the where property is an array', function () {
try {
// The where clause cannot be an array
error = err;model._coerce([
{vip: true}
]);
} catch (err) {
error = err;
}
assert(error, 'An error should have been thrown');
});
it('should throw if the and operator does not take an array', function () {
try {
// The and operator only takes an array of objects
error = err;model._coerce({and: {x: 1}});
} catch (err) {
error = err;
}
assert(error, 'An error should have been thrown');
});
it('should throw if the or operator does not take an array', function () {
try {
// The or operator only takes an array of objects
error = err;model._coerce({or: {x: 1}});
} catch (err) {
error = err;
}
assert(error, 'An error should have been thrown');
});
it('should throw if the or operator does not take an array of objects', function () {
try {
// The or operator only takes an array of objects
error = err;model._coerce({or: ['x']});
} catch(err) {
error = err;
}
assert(error, 'An error should have been thrown');
});
it('should throw if filter property is not an object', function () {
var filter = null;
try {
// The filter clause has to be an object
filter = model._normalize('abc');
} catch (err) {
error = err;
}
assert(error, 'An error should have been thrown');
});
it('should throw if filter.limit property is not a number', function () {
try {
// The limit param must be a valid number
filter = model._normalize({limit: 'x'});
} catch (err) {
error = err;
}
assert(error, 'An error should have been thrown');
});
it('should throw if filter.limit property is nagative', function () {
try {
// The limit param must be a valid number
filter = model._normalize({limit: -1});
} catch (err) {
error = err;
}
assert(error, 'An error should have been thrown');
});
it('should throw if filter.limit property is not an integer', function () {
try {
// The limit param must be a valid number
filter = model._normalize({limit: 5.8});
} catch (err) {
error = err;
}
assert(error, 'An error should have been thrown');
});
it('should throw if filter.offset property is not a number', function () {
try {
// The limit param must be a valid number
filter = model._normalize({offset: 'x'});
} catch (err) {
error = err;
}
assert(error, 'An error should have been thrown');
});
it('should throw if filter.skip property is not a number', function () {
try {
// The limit param must be a valid number
filter = model._normalize({skip: '_'});
} catch (err) {
error = err;
}
assert(error, 'An error should have been thrown');
});
it('should normalize limit/offset/skip', function () {
filter = model._normalize({limit: '10', skip: 5});
assert.deepEqual(filter, {limit: 10, offset: 5});
});
it('should set the default value for limit', function () {
filter = model._normalize({skip: 5});
assert.deepEqual(filter, {limit: 100, offset: 5});
});
it('should skip GeoPoint', function () { it('should skip GeoPoint', function () {
where = model._coerce({location: {near: {lng: 10, lat: 20}, maxDistance: 20}}); where = model._coerce({location: {near: {lng: 10, lat: 20}, maxDistance: 20}});
assert.deepEqual(where, {location: {near: {lng: 10, lat: 20}, maxDistance: 20}}); assert.deepEqual(where, {location: {near: {lng: 10, lat: 20}, maxDistance: 20}});