Merge pull request #1445 from strongloop/feature/subscribe

Add PersistedModel.subscribe()
This commit is contained in:
Ritchie Martori 2015-07-09 13:35:49 -07:00
commit cdddb08a67
3 changed files with 203 additions and 1 deletions

View File

@ -7,6 +7,7 @@ var assert = require('assert');
var async = require('async');
var deprecated = require('depd')('loopback');
var debug = require('debug')('loopback:persisted-model');
var PassThrough = require('stream').PassThrough;
module.exports = function(registry) {
var Model = registry.getModel('Model');
@ -761,6 +762,24 @@ module.exports = function(registry) {
http: {verb: 'post', path: '/:id/rectify-change'}
});
}
setRemoting(PersistedModel, 'createChangeStream', {
description: 'Create a change stream.',
accessType: 'READ',
http: [
{verb: 'post', path: '/change-stream'},
{verb: 'get', path: '/change-stream'}
],
accepts: {
arg: 'options',
type: 'object'
},
returns: {
arg: 'changes',
type: 'ReadableStream',
json: true
}
});
};
/**
@ -1527,6 +1546,101 @@ module.exports = function(registry) {
});
};
/**
* Create a change stream.
*
* @param {Object} options
* @param {Object} options.where Only changes to models matching this where filter will be included in the `ChangeStream`.
* @callback {Function} callback
* @param {Error} err
* @param {ChangeStream} changes
*/
PersistedModel.createChangeStream = function(options, cb) {
if (typeof options === 'function') {
cb = options;
options = undefined;
}
var idName = this.getIdName();
var Model = this;
var changes = new PassThrough({objectMode: true});
var writeable = true;
changes.destroy = function() {
changes.removeAllListeners('error');
changes.removeAllListeners('end');
writeable = false;
changes = null;
};
changes.on('error', function() {
writeable = false;
});
changes.on('end', function() {
writeable = false;
});
process.nextTick(function() {
cb(null, changes);
});
Model.observe('after save', createChangeHandler('save'));
Model.observe('after delete', createChangeHandler('delete'));
function createChangeHandler(type) {
return function(ctx, next) {
// since it might have set to null via destroy
if (!changes) {
return next();
}
var where = ctx.where;
var data = ctx.instance || ctx.data;
var whereId = where && where[idName];
// the data includes the id
// or the where includes the id
var target;
if (data && (data[idName] || data[idName] === 0)) {
target = data[idName];
} else if (where && (where[idName] || where[idName] === 0)) {
target = where[idName];
}
var hasTarget = target === 0 || !!target;
var change = {
target: target,
where: where,
data: data
};
switch (type) {
case 'save':
if (ctx.isNewInstance === undefined) {
change.type = hasTarget ? 'update' : 'create';
} else {
change.type = ctx.isNewInstance ? 'create' : 'update';
}
break;
case 'delete':
change.type = 'remove';
break;
}
// TODO(ritch) this is ugly... maybe a ReadableStream would be better
if (writeable) {
changes.write(change);
}
next();
};
}
};
PersistedModel.setup();
return PersistedModel;

View File

@ -0,0 +1,87 @@
describe('PersistedModel.createChangeStream()', function() {
describe('configured to source changes locally', function() {
before(function() {
var test = this;
var app = loopback({localRegistry: true});
var ds = app.dataSource('ds', {connector: 'memory'});
this.Score = app.model('Score', {
dataSource: 'ds',
changeDataSource: false // use only local observers
});
});
it('should detect create', function(done) {
var Score = this.Score;
Score.createChangeStream(function(err, changes) {
changes.on('data', function(change) {
expect(change.type).to.equal('create');
changes.destroy();
done();
});
Score.create({team: 'foo'});
});
});
it('should detect update', function(done) {
var Score = this.Score;
Score.create({team: 'foo'}, function(err, newScore) {
Score.createChangeStream(function(err, changes) {
changes.on('data', function(change) {
expect(change.type).to.equal('update');
changes.destroy();
done();
});
newScore.updateAttributes({
bat: 'baz'
});
});
});
});
it('should detect delete', function(done) {
var Score = this.Score;
Score.create({team: 'foo'}, function(err, newScore) {
Score.createChangeStream(function(err, changes) {
changes.on('data', function(change) {
expect(change.type).to.equal('remove');
changes.destroy();
done();
});
newScore.remove();
});
});
});
});
// TODO(ritch) implement multi-server support
describe.skip('configured to source changes using pubsub', function() {
before(function() {
var test = this;
var app = loopback({localRegistry: true});
var db = app.dataSource('ds', {connector: 'memory'});
var ps = app.dataSource('ps', {
host: 'localhost',
port: '12345',
connector: 'pubsub',
pubsubAdapter: 'mqtt'
});
this.Score = app.model('Score', {
dataSource: 'db',
changeDataSource: 'ps'
});
});
it('should detect a change', function(done) {
var Score = this.Score;
Score.createChangeStream(function(err, changes) {
changes.on('data', function(change) {
done();
});
});
});
});
});

View File

@ -614,7 +614,8 @@ describe.onServer('Remote Methods', function() {
'destroyById',
'removeById',
'count',
'prototype.updateAttributes'
'prototype.updateAttributes',
'createChangeStream'
]);
});
});