diff --git a/lib/persisted-model.js b/lib/persisted-model.js index 964b03bf..e83c80f6 100644 --- a/lib/persisted-model.js +++ b/lib/persisted-model.js @@ -1851,79 +1851,84 @@ module.exports = function(registry) { var idName = this.getIdName(); var Model = this; var changes = new PassThrough({objectMode: true}); - var writeable = true; - changes.destroy = function() { - changes.removeAllListeners('error'); - changes.removeAllListeners('end'); - writeable = false; - changes = null; + changes._destroy = function() { + changes.end(); + changes.emit('end'); + changes.emit('close'); }; - changes.on('error', function() { - writeable = false; - }); - changes.on('end', function() { - writeable = false; - }); + changes.destroy = changes.destroy || changes._destroy; // node 8 compability + + changes.on('error', removeHandlers); + changes.on('close', removeHandlers); + changes.on('finish', removeHandlers); + changes.on('end', removeHandlers); process.nextTick(function() { cb(null, changes); }); - Model.observe('after save', createChangeHandler('save')); - Model.observe('after delete', createChangeHandler('delete')); + Model.observe('after save', changeHandler); + Model.observe('after delete', deleteHandler); - function createChangeHandler(type) { - return function(ctx, next) { - // since it might have set to null via destroy - if (!changes) { - return next(); - } + function changeHandler(ctx, next) { + var change = createChangeObject(ctx, 'save'); + changes.write(change); - var where = ctx.where; - var data = ctx.instance || ctx.data; - var whereId = where && where[idName]; + next(); + }; - // the data includes the id - // or the where includes the id - var target; + function deleteHandler(ctx, next) { + var change = createChangeObject(ctx, 'delete'); + changes.write(change); - if (data && (data[idName] || data[idName] === 0)) { - target = data[idName]; - } else if (where && (where[idName] || where[idName] === 0)) { - target = where[idName]; - } + next(); + }; - var hasTarget = target === 0 || !!target; + function createChangeObject(ctx, type) { + var where = ctx.where; + var data = ctx.instance || ctx.data; + var whereId = where && where[idName]; - var change = { - target: target, - where: where, - data: data, - }; + // the data includes the id + // or the where includes the id + var target; - switch (type) { - case 'save': - if (ctx.isNewInstance === undefined) { - change.type = hasTarget ? 'update' : 'create'; - } else { - change.type = ctx.isNewInstance ? 'create' : 'update'; - } + if (data && (data[idName] || data[idName] === 0)) { + target = data[idName]; + } else if (where && (where[idName] || where[idName] === 0)) { + target = where[idName]; + } - break; - case 'delete': - change.type = 'remove'; - break; - } + var hasTarget = target === 0 || !!target; - // TODO(ritch) this is ugly... maybe a ReadableStream would be better - if (writeable) { - changes.write(change); - } - - next(); + var change = { + target: target, + where: where, + data: data, }; + + switch (type) { + case 'save': + if (ctx.isNewInstance === undefined) { + change.type = hasTarget ? 'update' : 'create'; + } else { + change.type = ctx.isNewInstance ? 'create' : 'update'; + } + + break; + case 'delete': + change.type = 'remove'; + break; + } + + return change; + } + + function removeHandlers() { + Model.removeObserver('after save', changeHandler); + Model.removeObserver('after delete', deleteHandler); } }; diff --git a/test/change-stream.test.js b/test/change-stream.test.js index 391f28df..53695996 100644 --- a/test/change-stream.test.js +++ b/test/change-stream.test.js @@ -5,6 +5,7 @@ 'use strict'; var expect = require('./helpers/expect'); +var sinon = require('sinon'); var loopback = require('../'); describe('PersistedModel.createChangeStream()', function() { @@ -20,6 +21,8 @@ describe('PersistedModel.createChangeStream()', function() { }); }); + afterEach(verifyObserversRemoval); + it('should detect create', function(done) { var Score = this.Score; @@ -27,7 +30,6 @@ describe('PersistedModel.createChangeStream()', function() { changes.on('data', function(change) { expect(change.type).to.equal('create'); changes.destroy(); - done(); }); @@ -67,6 +69,32 @@ describe('PersistedModel.createChangeStream()', function() { }); }); }); + + it('should not emit changes after destroy', function(done) { + var Score = this.Score; + + var spy = sinon.spy(); + + Score.createChangeStream(function(err, changes) { + changes.on('data', function() { + spy(); + changes.destroy(); + }); + + Score.create({team: 'foo'}) + .then(() => Score.deleteAll()) + .then(() => { + expect(spy.calledOnce); + done(); + }); + }); + }); + + function verifyObserversRemoval() { + var Score = this.Score; + expect(Score._observers['after save']).to.be.empty(); + expect(Score._observers['after delete']).to.be.empty(); + } }); // TODO(ritch) implement multi-server support