Merge pull request #3683 from edwardchoh/master

Support options.filter in createChangeStream
This commit is contained in:
Miroslav Bajtoš 2017-12-14 14:59:14 +01:00 committed by GitHub
commit 0a4940e31e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 53 additions and 2 deletions

View File

@ -15,6 +15,8 @@ var deprecated = require('depd')('loopback');
var debug = require('debug')('loopback:persisted-model'); var debug = require('debug')('loopback:persisted-model');
var PassThrough = require('stream').PassThrough; var PassThrough = require('stream').PassThrough;
var utils = require('./utils'); var utils = require('./utils');
var filterNodes = require('loopback-filters');
var REPLICATION_CHUNK_SIZE = -1; var REPLICATION_CHUNK_SIZE = -1;
module.exports = function(registry) { module.exports = function(registry) {
@ -1855,6 +1857,7 @@ module.exports = function(registry) {
cb = options; cb = options;
options = undefined; options = undefined;
} }
cb = cb || utils.createPromiseCallback();
var idName = this.getIdName(); var idName = this.getIdName();
var Model = this; var Model = this;
@ -1880,16 +1883,22 @@ module.exports = function(registry) {
Model.observe('after save', changeHandler); Model.observe('after save', changeHandler);
Model.observe('after delete', deleteHandler); Model.observe('after delete', deleteHandler);
return cb.promise;
function changeHandler(ctx, next) { function changeHandler(ctx, next) {
var change = createChangeObject(ctx, 'save'); var change = createChangeObject(ctx, 'save');
changes.write(change); if (change) {
changes.write(change);
}
next(); next();
}; };
function deleteHandler(ctx, next) { function deleteHandler(ctx, next) {
var change = createChangeObject(ctx, 'delete'); var change = createChangeObject(ctx, 'delete');
changes.write(change); if (change) {
changes.write(change);
}
next(); next();
}; };
@ -1911,6 +1920,15 @@ module.exports = function(registry) {
var hasTarget = target === 0 || !!target; var hasTarget = target === 0 || !!target;
// apply filtering if options is set
if (options) {
var filtered = filterNodes([data], options);
if (filtered.length !== 1) {
return null;
}
data = filtered[0];
}
var change = { var change = {
target: target, target: target,
where: where, where: where,

View File

@ -50,6 +50,7 @@
"isemail": "^2.2.1", "isemail": "^2.2.1",
"loopback-connector-remote": "^3.0.0", "loopback-connector-remote": "^3.0.0",
"loopback-datasource-juggler": "^3.9.3", "loopback-datasource-juggler": "^3.9.3",
"loopback-filters": "^1.0.0",
"loopback-phase": "^3.0.0", "loopback-phase": "^3.0.0",
"nodemailer": "^2.5.0", "nodemailer": "^2.5.0",
"nodemailer-stub-transport": "^1.0.0", "nodemailer-stub-transport": "^1.0.0",

View File

@ -70,6 +70,38 @@ describe('PersistedModel.createChangeStream()', function() {
}); });
}); });
it('should apply "where" and "fields" to create events', function() {
const Score = this.Score;
const data = [
{team: 'baz', player: 'baz', value: 1},
{team: 'bar', player: 'baz', value: 2},
{team: 'foo', player: 'bar', value: 3},
];
const options = {where: {player: 'bar'}, fields: ['team', 'value']};
const changes = [];
let changeStream;
return Score.createChangeStream(options)
.then(stream => {
changeStream = stream;
changeStream.on('data', function(change) {
changes.push(change);
});
return Score.create(data);
})
.then(scores => {
changeStream.destroy();
expect(changes).to.have.length(1);
expect(changes[0]).to.have.property('type', 'create');
expect(changes[0].data).to.eql({
'team': 'foo',
value: 3,
});
});
});
it('should not emit changes after destroy', function(done) { it('should not emit changes after destroy', function(done) {
var Score = this.Score; var Score = this.Score;