Merge pull request #3474 from bigcup/fixChangeStreamIssue#1569
Removed observers from a Model after finish of the stream
This commit is contained in:
commit
f7b88e3435
|
@ -1851,79 +1851,84 @@ module.exports = function(registry) {
|
||||||
var idName = this.getIdName();
|
var idName = this.getIdName();
|
||||||
var Model = this;
|
var Model = this;
|
||||||
var changes = new PassThrough({objectMode: true});
|
var changes = new PassThrough({objectMode: true});
|
||||||
var writeable = true;
|
|
||||||
|
|
||||||
changes.destroy = function() {
|
changes._destroy = function() {
|
||||||
changes.removeAllListeners('error');
|
changes.end();
|
||||||
changes.removeAllListeners('end');
|
changes.emit('end');
|
||||||
writeable = false;
|
changes.emit('close');
|
||||||
changes = null;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
changes.on('error', function() {
|
changes.destroy = changes.destroy || changes._destroy; // node 8 compability
|
||||||
writeable = false;
|
|
||||||
});
|
changes.on('error', removeHandlers);
|
||||||
changes.on('end', function() {
|
changes.on('close', removeHandlers);
|
||||||
writeable = false;
|
changes.on('finish', removeHandlers);
|
||||||
});
|
changes.on('end', removeHandlers);
|
||||||
|
|
||||||
process.nextTick(function() {
|
process.nextTick(function() {
|
||||||
cb(null, changes);
|
cb(null, changes);
|
||||||
});
|
});
|
||||||
|
|
||||||
Model.observe('after save', createChangeHandler('save'));
|
Model.observe('after save', changeHandler);
|
||||||
Model.observe('after delete', createChangeHandler('delete'));
|
Model.observe('after delete', deleteHandler);
|
||||||
|
|
||||||
function createChangeHandler(type) {
|
function changeHandler(ctx, next) {
|
||||||
return function(ctx, next) {
|
var change = createChangeObject(ctx, 'save');
|
||||||
// since it might have set to null via destroy
|
changes.write(change);
|
||||||
if (!changes) {
|
|
||||||
return next();
|
|
||||||
}
|
|
||||||
|
|
||||||
var where = ctx.where;
|
next();
|
||||||
var data = ctx.instance || ctx.data;
|
};
|
||||||
var whereId = where && where[idName];
|
|
||||||
|
|
||||||
// the data includes the id
|
function deleteHandler(ctx, next) {
|
||||||
// or the where includes the id
|
var change = createChangeObject(ctx, 'delete');
|
||||||
var target;
|
changes.write(change);
|
||||||
|
|
||||||
if (data && (data[idName] || data[idName] === 0)) {
|
next();
|
||||||
target = data[idName];
|
};
|
||||||
} else if (where && (where[idName] || where[idName] === 0)) {
|
|
||||||
target = where[idName];
|
|
||||||
}
|
|
||||||
|
|
||||||
var hasTarget = target === 0 || !!target;
|
function createChangeObject(ctx, type) {
|
||||||
|
var where = ctx.where;
|
||||||
|
var data = ctx.instance || ctx.data;
|
||||||
|
var whereId = where && where[idName];
|
||||||
|
|
||||||
var change = {
|
// the data includes the id
|
||||||
target: target,
|
// or the where includes the id
|
||||||
where: where,
|
var target;
|
||||||
data: data,
|
|
||||||
};
|
|
||||||
|
|
||||||
switch (type) {
|
if (data && (data[idName] || data[idName] === 0)) {
|
||||||
case 'save':
|
target = data[idName];
|
||||||
if (ctx.isNewInstance === undefined) {
|
} else if (where && (where[idName] || where[idName] === 0)) {
|
||||||
change.type = hasTarget ? 'update' : 'create';
|
target = where[idName];
|
||||||
} else {
|
}
|
||||||
change.type = ctx.isNewInstance ? 'create' : 'update';
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
var hasTarget = target === 0 || !!target;
|
||||||
case 'delete':
|
|
||||||
change.type = 'remove';
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(ritch) this is ugly... maybe a ReadableStream would be better
|
var change = {
|
||||||
if (writeable) {
|
target: target,
|
||||||
changes.write(change);
|
where: where,
|
||||||
}
|
data: data,
|
||||||
|
|
||||||
next();
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
switch (type) {
|
||||||
|
case 'save':
|
||||||
|
if (ctx.isNewInstance === undefined) {
|
||||||
|
change.type = hasTarget ? 'update' : 'create';
|
||||||
|
} else {
|
||||||
|
change.type = ctx.isNewInstance ? 'create' : 'update';
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
case 'delete':
|
||||||
|
change.type = 'remove';
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return change;
|
||||||
|
}
|
||||||
|
|
||||||
|
function removeHandlers() {
|
||||||
|
Model.removeObserver('after save', changeHandler);
|
||||||
|
Model.removeObserver('after delete', deleteHandler);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@
|
||||||
|
|
||||||
'use strict';
|
'use strict';
|
||||||
var expect = require('./helpers/expect');
|
var expect = require('./helpers/expect');
|
||||||
|
var sinon = require('sinon');
|
||||||
var loopback = require('../');
|
var loopback = require('../');
|
||||||
|
|
||||||
describe('PersistedModel.createChangeStream()', function() {
|
describe('PersistedModel.createChangeStream()', function() {
|
||||||
|
@ -20,6 +21,8 @@ describe('PersistedModel.createChangeStream()', function() {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
afterEach(verifyObserversRemoval);
|
||||||
|
|
||||||
it('should detect create', function(done) {
|
it('should detect create', function(done) {
|
||||||
var Score = this.Score;
|
var Score = this.Score;
|
||||||
|
|
||||||
|
@ -27,7 +30,6 @@ describe('PersistedModel.createChangeStream()', function() {
|
||||||
changes.on('data', function(change) {
|
changes.on('data', function(change) {
|
||||||
expect(change.type).to.equal('create');
|
expect(change.type).to.equal('create');
|
||||||
changes.destroy();
|
changes.destroy();
|
||||||
|
|
||||||
done();
|
done();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -67,6 +69,32 @@ describe('PersistedModel.createChangeStream()', function() {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should not emit changes after destroy', function(done) {
|
||||||
|
var Score = this.Score;
|
||||||
|
|
||||||
|
var spy = sinon.spy();
|
||||||
|
|
||||||
|
Score.createChangeStream(function(err, changes) {
|
||||||
|
changes.on('data', function() {
|
||||||
|
spy();
|
||||||
|
changes.destroy();
|
||||||
|
});
|
||||||
|
|
||||||
|
Score.create({team: 'foo'})
|
||||||
|
.then(() => Score.deleteAll())
|
||||||
|
.then(() => {
|
||||||
|
expect(spy.calledOnce);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
function verifyObserversRemoval() {
|
||||||
|
var Score = this.Score;
|
||||||
|
expect(Score._observers['after save']).to.be.empty();
|
||||||
|
expect(Score._observers['after delete']).to.be.empty();
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// TODO(ritch) implement multi-server support
|
// TODO(ritch) implement multi-server support
|
||||||
|
|
Loading…
Reference in New Issue