Partition by foreign key for pagination

See https://github.com/strongloop/loopback-datasource-juggler/issues/610
This commit is contained in:
Raymond Feng 2015-12-11 16:59:41 -08:00 committed by Simon Ho
parent 267d24de0a
commit 84da11f98e
2 changed files with 432 additions and 29 deletions

View File

@ -55,7 +55,7 @@ function IncludeScope(scope) {
} else { } else {
this._include = null; this._include = null;
} }
}; }
IncludeScope.prototype.conditions = function() { IncludeScope.prototype.conditions = function() {
return utils.deepMerge({}, this._scope); return utils.deepMerge({}, this._scope);
@ -174,12 +174,91 @@ Inclusion.include = function(objects, include, options, cb) {
include = normalizeInclude(include); include = normalizeInclude(include);
// Find the limit of items for `inq`
var inqLimit = 256;
if (self.dataSource && self.dataSource.settings &&
self.dataSource.settings.inqLimit) {
inqLimit = self.dataSource.settings.inqLimit;
}
async.each(include, function(item, callback) { async.each(include, function(item, callback) {
processIncludeItem(objects, item, options, callback); processIncludeItem(objects, item, options, callback);
}, function(err) { }, function(err) {
cb && cb(err, objects); cb && cb(err, objects);
}); });
/**
* Find related items with an array of foreign keys by page
* @param model The model class
* @param filter The query filter
* @param fkName The name of the foreign key property
* @param pageSize The size of page
* @param options Options
* @param cb
*/
function findWithForeignKeysByPage(model, filter, fkName, pageSize, options, cb) {
var foreignKeys = [];
if (filter.where[fkName]) {
foreignKeys = filter.where[fkName].inq;
} else if (filter.where.and) {
// The inq can be embedded inside 'and: []'. No or: [] is needed as
// include only uses and. We only deal with the generated inq for include.
for (var j in filter.where.and) {
if (filter.where.and[j][fkName] &&
Array.isArray(filter.where.and[j][fkName].inq)) {
foreignKeys = filter.where.and[j][fkName].inq;
break;
}
}
}
if (filter.limit || filter.skip || filter.offset) {
// Force the find to be performed per FK to honor the pagination
pageSize = 1;
}
var size = foreignKeys.length;
if (size > inqLimit && pageSize <= 0) {
pageSize = inqLimit;
}
if (pageSize <= 0) {
return model.find(filter, options, cb);
}
var listOfFKs = [];
for (var i = 0; i < size; i += pageSize) {
var end = i + pageSize;
if (end > size) {
end = size;
}
listOfFKs.push(foreignKeys.slice(i, end));
}
var items = [];
async.each(listOfFKs, function(foreignKeys, done) {
var newFilter = {};
for (var f in filter) {
newFilter[f] = filter[f];
}
if (filter.where) {
newFilter.where = {};
for (var w in filter.where) {
newFilter.where[w] = filter.where[w];
}
}
newFilter.where[fkName] = {
inq: foreignKeys,
};
model.find(newFilter, options, function(err, results) {
if (err) return done(err);
items = items.concat(results);
done();
});
}, function(err) {
if (err) return cb(err);
cb(null, items);
});
}
function processIncludeItem(objs, include, options, cb) { function processIncludeItem(objs, include, options, cb) {
var relations = self.relations; var relations = self.relations;
@ -332,7 +411,9 @@ Inclusion.include = function(objects, include, options, cb) {
/** /**
* 1st DB Call of 2 step process. Get through model objects first * 1st DB Call of 2 step process. Get through model objects first
*/ */
relation.modelThrough.find(throughFilter, options, throughFetchHandler); findWithForeignKeysByPage(relation.modelThrough, throughFilter,
relation.keyTo, 0, options, throughFetchHandler);
/** /**
* Handle the results of Through model objects and fetch the modelTo items * Handle the results of Through model objects and fetch the modelTo items
* @param err * @param err
@ -377,7 +458,10 @@ Inclusion.include = function(objects, include, options, cb) {
/** /**
* 2nd DB Call of 2 step process. Get modelTo (target) objects * 2nd DB Call of 2 step process. Get modelTo (target) objects
*/ */
relation.modelTo.find(filter, options, targetsFetchHandler); findWithForeignKeysByPage(relation.modelTo, filter,
modelToIdName, 0, options, targetsFetchHandler);
// relation.modelTo.find(filter, options, targetsFetchHandler);
function targetsFetchHandler(err, targets) { function targetsFetchHandler(err, targets) {
if (err) { if (err) {
return callback(err); return callback(err);
@ -452,7 +536,8 @@ Inclusion.include = function(objects, include, options, cb) {
/** /**
* Make the DB Call, fetch all target objects * Make the DB Call, fetch all target objects
*/ */
relation.modelTo.find(filter, options, targetFetchHandler); findWithForeignKeysByPage(relation.modelTo, filter,
relation.keyTo, 0, options, targetFetchHandler);
/** /**
* Handle the fetched target objects * Handle the fetched target objects
* @param err * @param err
@ -502,7 +587,9 @@ Inclusion.include = function(objects, include, options, cb) {
}; };
relation.applyScope(null, filter); relation.applyScope(null, filter);
relation.modelTo.find(filter, options, targetFetchHandler);
findWithForeignKeysByPage(relation.modelTo, filter,
relation.keyTo, 0, options, targetFetchHandler);
function targetFetchHandler(err, targets) { function targetFetchHandler(err, targets) {
if (err) { if (err) {
@ -544,7 +631,10 @@ Inclusion.include = function(objects, include, options, cb) {
}; };
relation.applyScope(null, filter); relation.applyScope(null, filter);
options.partitionBy = relation.keyTo; options.partitionBy = relation.keyTo;
relation.modelTo.find(filter, options, targetFetchHandler);
findWithForeignKeysByPage(relation.modelTo, filter,
relation.keyTo, 0, options, targetFetchHandler);
/** /**
* Process fetched related objects * Process fetched related objects
* @param err * @param err
@ -653,7 +743,10 @@ Inclusion.include = function(objects, include, options, cb) {
return; return;
} }
relation.applyScope(null, typeFilter); relation.applyScope(null, typeFilter);
Model.find(typeFilter, options, targetFetchHandler);
findWithForeignKeysByPage(Model, typeFilter,
relation.keyTo, 0, options, targetFetchHandler);
/** /**
* Process fetched related objects * Process fetched related objects
* @param err * @param err
@ -718,7 +811,10 @@ Inclusion.include = function(objects, include, options, cb) {
inq: uniq(sourceIds), inq: uniq(sourceIds),
}; };
relation.applyScope(null, filter); relation.applyScope(null, filter);
relation.modelTo.find(filter, options, targetFetchHandler);
findWithForeignKeysByPage(relation.modelTo, filter,
relation.keyTo, 0, options, targetFetchHandler);
/** /**
* Process fetched related objects * Process fetched related objects
* @param err * @param err
@ -785,7 +881,10 @@ Inclusion.include = function(objects, include, options, cb) {
inq: uniq(targetIds), inq: uniq(targetIds),
}; };
relation.applyScope(null, filter); relation.applyScope(null, filter);
relation.modelTo.find(filter, options, targetFetchHandler);
findWithForeignKeysByPage(relation.modelTo, filter,
relation.keyTo, 0, options, targetFetchHandler);
/** /**
* Process fetched related objects * Process fetched related objects
* @param err * @param err

View File

@ -3,7 +3,6 @@
// This file is licensed under the MIT License. // This file is licensed under the MIT License.
// License text available at https://opensource.org/licenses/MIT // License text available at https://opensource.org/licenses/MIT
// This test written in mocha+should.js
var should = require('./init.js'); var should = require('./init.js');
var async = require('async'); var async = require('async');
var assert = require('assert'); var assert = require('assert');
@ -196,36 +195,341 @@ describe('include', function() {
relation: 'posts', scope: { relation: 'posts', scope: {
fields: ['title'], include: ['author'], fields: ['title'], include: ['author'],
order: 'title DESC', order: 'title DESC',
limit: 2, limit: 1,
}, },
}, },
}, },
limit: 1, limit: 2,
}, function(err, passports) { }, function(err, passports) {
if (err) return done(err); if (err) return done(err);
passports.length.should.equal(1);
passports[0].toJSON().owner.posts.length.should.equal(2);
done();
});
});
it('should fetch Users with include scope on Posts - belongsTo', function(done) { passports.length.should.equal(2);
Post.find({ var posts1 = passports[0].toJSON().owner.posts;
include: { relation: 'author', scope: { fields: ['name'] }}, posts1.length.should.equal(1);
}, function(err, posts) { posts1[0].title.should.equal('Post C');
should.not.exist(err); var posts2 = passports[1].toJSON().owner.posts;
should.exist(posts); posts2.length.should.equal(1);
posts.length.should.equal(5); posts2[0].title.should.equal('Post D');
var author = posts[0].author();
author.name.should.equal('User A');
author.should.have.property('id');
author.should.have.property('age', undefined);
done(); done();
}); });
}); });
describe('inq limit', function() {
before(function() {
Passport.dataSource.settings.inqLimit = 2;
});
after(function() {
delete Passport.dataSource.settings.inqLimit;
});
it('should support include by pagination', function(done) {
// `pagination` in this case is inside the implementation and set by
// `inqLimit = 2` in the before block. This will need to be reworked once
// we decouple `findWithForeignKeysByPage`.
//
// --superkhau
Passport.find({
include: {
owner: {
relation: 'posts',
scope: {
fields: ['title'], include: ['author'],
order: 'title ASC',
},
},
},
}, function(err, passports) {
if (err) return done(err);
passports.length.should.equal(4);
var posts1 = passports[0].toJSON().owner.posts;
posts1.length.should.equal(3);
posts1[0].title.should.equal('Post A');
var posts2 = passports[1].toJSON().owner.posts;
posts2.length.should.equal(1);
posts2[0].title.should.equal('Post D');
done();
});
});
});
describe('findWithForeignKeysByPage', function() {
context('filter', function() {
it('works when using a `where` with a foreign key', function(done) {
User.findOne({
include: {
relation: 'passports',
},
}, function(err, user) {
if (err) return done(err);
var passport = user.passports()[0];
passport.id.should.equal(1);
passport.ownerId.should.equal(1);
passport.number.should.equal('1');
done();
});
});
it('works when using a `where` with `and`', function(done) {
User.findOne({
include: {
relation: 'posts',
scope: {
where: {
and: [
{ id: 1 },
{ userId: 1 },
{ title: 'Post A' },
],
},
},
},
}, function(err, user) {
if (err) return done(err);
user.name.should.equal('User A');
user.age.should.equal(21);
user.id.should.equal(1);
var posts = user.posts();
posts.length.should.equal(1);
var post = posts[0];
post.title.should.equal('Post A');
post.userId.should.equal(1);
post.id.should.equal(1);
done();
});
});
it('works when using `where` with `limit`', function(done) {
User.findOne({
include: {
relation: 'posts',
scope: {
limit: 1,
},
},
}, function(err, user) {
if (err) return done(err);
user.posts().length.should.equal(1);
done();
});
});
it('works when using `where` with `skip`', function(done) {
User.findOne({
include: {
relation: 'posts',
scope: {
skip: 1,
},
},
}, function(err, user) {
if (err) return done(err);
var ids = user.posts().map(function(p) { return p.id; });
ids.should.eql([2, 3]);
done();
});
});
it('works when using `where` with `offset`', function(done) {
User.findOne({
include: {
relation: 'posts',
scope: {
offset: 1,
},
},
}, function(err, user) {
if (err) return done(err);
var ids = user.posts().map(function(p) { return p.id; });
ids.should.eql([2, 3]);
done();
});
});
it('works when using `where` without `limit`, `skip` or `offset`',
function(done) {
User.findOne({ include: { relation: 'posts' }}, function(err, user) {
if (err) return done(err);
var posts = user.posts();
var ids = posts.map(function(p) { return p.id; });
ids.should.eql([1, 2, 3]);
done();
});
});
});
context('pagination', function() {
it('works with the default page size (0) and `inqlimit` is exceeded',
function(done) {
// inqLimit modifies page size in the impl (there is no way to modify
// page size directly as it is hardcoded (once we decouple the func,
// we can use ctor injection to pass in whatever page size we want).
//
// --superkhau
Post.dataSource.settings.inqLimit = 2;
User.find({ include: { relation: 'posts' }}, function(err, users) {
if (err) return done(err);
users.length.should.equal(5);
delete Post.dataSource.settings.inqLimit;
done();
});
});
it('works when page size is set to 0', function(done) {
Post.dataSource.settings.inqLimit = 0;
User.find({ include: { relation: 'posts' }}, function(err, users) {
if (err) return done(err);
users.length.should.equal(5);
delete Post.dataSource.settings.inqLimit;
done();
});
});
});
context('relations', function() {
// WARNING
// The code paths for in this suite of tests were verified manually due to
// the tight coupling of the `findWithForeignKeys` in `include.js`.
//
// TODO
// Decouple the utility functions into their own modules and export each
// function individually to allow for unit testing via DI.
//
// --superkhau
it('works when hasOne is called', function(done) {
User.findOne({ include: { relation: 'profile' }}, function(err, user) {
if (err) return done(err);
user.name.should.equal('User A');
user.age.should.equal(21);
user.id.should.equal(1);
profile = user.profile();
profile.profileName.should.equal('Profile A');
profile.userId.should.equal(1);
profile.id.should.equal(1);
done();
});
});
it('works when hasMany is called', function(done) {
User.findOne({ include: { relation: 'posts' }}, function(err, user) {
if (err) return done();
user.name.should.equal('User A');
user.age.should.equal(21);
user.id.should.equal(1);
user.posts().length.should.equal(3);
done();
});
});
it('works when hasManyThrough is called', function(done) {
Physician = db.define('Physician', { name: String });
Patient = db.define('Patient', { name: String });
Appointment = db.define('Appointment', {
date: {
type: Date,
default: function() {
return new Date();
},
},
});
Address = db.define('Address', { name: String });
Physician.hasMany(Patient, { through: Appointment });
Patient.hasMany(Physician, { through: Appointment });
Patient.belongsTo(Address);
Appointment.belongsTo(Patient);
Appointment.belongsTo(Physician);
db.automigrate(['Physician', 'Patient', 'Appointment', 'Address'],
function() {
Physician.create(function(err, physician) {
physician.patients.create({ name: 'a' }, function(err, patient) {
Address.create({ name: 'z' }, function(err, address) {
patient.address(address);
patient.save(function() {
physician.patients({ include: 'address' },
function(err, posts) {
if (err) return done(err);
posts.should.be.an.instanceOf(Array).and.have.length(1);
var p = posts[0];
p.name.should.equal('a');
p.addressId.should.equal(1);
p.address().id.should.equal(1);
p.address().name.should.equal('z');
done();
});
});
});
});
});
});
});
it('works when belongsTo is called', function(done) {
Profile.findOne({ include: 'user' }, function(err, profile) {
if (err) return done(err);
profile.profileName.should.equal('Profile A');
profile.userId.should.equal(1);
profile.id.should.equal(1);
user = profile.user();
user.name.should.equal('User A');
user.age.should.equal(21);
user.id.should.equal(1);
done();
});
});
});
});
it('should fetch Users with include scope on Posts - belongsTo',
function(done) {
Post.find({ include: { relation: 'author', scope: { fields: ['name'] }}},
function(err, posts) {
should.not.exist(err);
should.exist(posts);
posts.length.should.equal(5);
var author = posts[0].author();
author.name.should.equal('User A');
author.should.have.property('id');
author.should.have.property('age', undefined);
done();
});
});
it('should fetch Users with include scope on Posts - hasMany', function(done) { it('should fetch Users with include scope on Posts - hasMany', function(done) {
User.find({ User.find({
include: { relation: 'posts', scope: { include: { relation: 'posts', scope: {