Support options.filter in createChangeStream

Implement "options.filter" argument in Persisted.createChangeStream()
by leveraging loopback-filter module.
This commit is contained in:
Edward Choh 2017-11-02 03:01:52 -07:00 committed by Miroslav Bajtoš
parent 7c030c6900
commit 00169d2312
No known key found for this signature in database
GPG Key ID: 6F2304BA9361C7E3
3 changed files with 53 additions and 2 deletions

View File

@ -15,6 +15,8 @@ var deprecated = require('depd')('loopback');
var debug = require('debug')('loopback:persisted-model');
var PassThrough = require('stream').PassThrough;
var utils = require('./utils');
var filterNodes = require('loopback-filters');
var REPLICATION_CHUNK_SIZE = -1;
module.exports = function(registry) {
@ -1855,6 +1857,7 @@ module.exports = function(registry) {
cb = options;
options = undefined;
}
cb = cb || utils.createPromiseCallback();
var idName = this.getIdName();
var Model = this;
@ -1880,16 +1883,22 @@ module.exports = function(registry) {
Model.observe('after save', changeHandler);
Model.observe('after delete', deleteHandler);
return cb.promise;
function changeHandler(ctx, next) {
var change = createChangeObject(ctx, 'save');
changes.write(change);
if (change) {
changes.write(change);
}
next();
};
function deleteHandler(ctx, next) {
var change = createChangeObject(ctx, 'delete');
changes.write(change);
if (change) {
changes.write(change);
}
next();
};
@ -1911,6 +1920,15 @@ module.exports = function(registry) {
var hasTarget = target === 0 || !!target;
// apply filtering if options is set
if (options) {
var filtered = filterNodes([data], options);
if (filtered.length !== 1) {
return null;
}
data = filtered[0];
}
var change = {
target: target,
where: where,

View File

@ -50,6 +50,7 @@
"isemail": "^2.2.1",
"loopback-connector-remote": "^3.0.0",
"loopback-datasource-juggler": "^3.9.3",
"loopback-filters": "^1.0.0",
"loopback-phase": "^3.0.0",
"nodemailer": "^2.5.0",
"nodemailer-stub-transport": "^1.0.0",

View File

@ -70,6 +70,38 @@ describe('PersistedModel.createChangeStream()', function() {
});
});
it('should apply "where" and "fields" to create events', function() {
const Score = this.Score;
const data = [
{team: 'baz', player: 'baz', value: 1},
{team: 'bar', player: 'baz', value: 2},
{team: 'foo', player: 'bar', value: 3},
];
const options = {where: {player: 'bar'}, fields: ['team', 'value']};
const changes = [];
let changeStream;
return Score.createChangeStream(options)
.then(stream => {
changeStream = stream;
changeStream.on('data', function(change) {
changes.push(change);
});
return Score.create(data);
})
.then(scores => {
changeStream.destroy();
expect(changes).to.have.length(1);
expect(changes[0]).to.have.property('type', 'create');
expect(changes[0].data).to.eql({
'team': 'foo',
value: 3,
});
});
});
it('should not emit changes after destroy', function(done) {
var Score = this.Score;