From 40c5707a366d1f4f383afa7b5c3d74b9291abd39 Mon Sep 17 00:00:00 2001 From: Ritchie Martori Date: Thu, 9 Jul 2015 13:30:50 -0700 Subject: [PATCH] Add PersistedModel.createChangeStream() --- lib/persisted-model.js | 114 +++++++++++++++++++++++++++++++++++++ package.json | 2 +- test/change-stream.test.js | 87 ++++++++++++++++++++++++++++ test/model.test.js | 3 +- 4 files changed, 204 insertions(+), 2 deletions(-) create mode 100644 test/change-stream.test.js diff --git a/lib/persisted-model.js b/lib/persisted-model.js index be1b2549..b3a1c0ff 100644 --- a/lib/persisted-model.js +++ b/lib/persisted-model.js @@ -7,6 +7,7 @@ var assert = require('assert'); var async = require('async'); var deprecated = require('depd')('loopback'); var debug = require('debug')('loopback:persisted-model'); +var PassThrough = require('stream').PassThrough; module.exports = function(registry) { var Model = registry.getModel('Model'); @@ -761,6 +762,24 @@ module.exports = function(registry) { http: {verb: 'post', path: '/:id/rectify-change'} }); } + + setRemoting(PersistedModel, 'createChangeStream', { + description: 'Create a change stream.', + accessType: 'READ', + http: [ + {verb: 'post', path: '/change-stream'}, + {verb: 'get', path: '/change-stream'} + ], + accepts: { + arg: 'options', + type: 'object' + }, + returns: { + arg: 'changes', + type: 'ReadableStream', + json: true + } + }); }; /** @@ -1527,6 +1546,101 @@ module.exports = function(registry) { }); }; + /** + * Create a change stream. + * + * @param {Object} options + * @param {Object} options.where Only changes to models matching this where filter will be included in the `ChangeStream`. + * @callback {Function} callback + * @param {Error} err + * @param {ChangeStream} changes + */ + + PersistedModel.createChangeStream = function(options, cb) { + if (typeof options === 'function') { + cb = options; + options = undefined; + } + + var idName = this.getIdName(); + var Model = this; + var changes = new PassThrough({objectMode: true}); + var writeable = true; + + changes.destroy = function() { + changes.removeAllListeners('error'); + changes.removeAllListeners('end'); + writeable = false; + changes = null; + }; + + changes.on('error', function() { + writeable = false; + }); + changes.on('end', function() { + writeable = false; + }); + + process.nextTick(function() { + cb(null, changes); + }); + + Model.observe('after save', createChangeHandler('save')); + Model.observe('after delete', createChangeHandler('delete')); + + function createChangeHandler(type) { + return function(ctx, next) { + // since it might have set to null via destroy + if (!changes) { + return next(); + } + + var where = ctx.where; + var data = ctx.instance || ctx.data; + var whereId = where && where[idName]; + + // the data includes the id + // or the where includes the id + var target; + + if (data && (data[idName] || data[idName] === 0)) { + target = data[idName]; + } else if (where && (where[idName] || where[idName] === 0)) { + target = where[idName]; + } + + var hasTarget = target === 0 || !!target; + + var change = { + target: target, + where: where, + data: data + }; + + switch (type) { + case 'save': + if (ctx.isNewInstance === undefined) { + change.type = hasTarget ? 'update' : 'create'; + } else { + change.type = ctx.isNewInstance ? 'create' : 'update'; + } + + break; + case 'delete': + change.type = 'remove'; + break; + } + + // TODO(ritch) this is ugly... maybe a ReadableStream would be better + if (writeable) { + changes.write(change); + } + + next(); + }; + } + }; + PersistedModel.setup(); return PersistedModel; diff --git a/package.json b/package.json index d25d48ce..36e67d70 100644 --- a/package.json +++ b/package.json @@ -81,7 +81,7 @@ "karma-script-launcher": "^0.1.0", "loopback-boot": "^2.7.0", "loopback-datasource-juggler": "^2.19.1", - "loopback-testing": "^1.1.0", + "loopback-testing": "~1.1.0", "mocha": "^2.1.0", "sinon": "^1.13.0", "strong-task-emitter": "^0.0.6", diff --git a/test/change-stream.test.js b/test/change-stream.test.js new file mode 100644 index 00000000..ab740521 --- /dev/null +++ b/test/change-stream.test.js @@ -0,0 +1,87 @@ +describe('PersistedModel.createChangeStream()', function() { + describe('configured to source changes locally', function() { + before(function() { + var test = this; + var app = loopback({localRegistry: true}); + var ds = app.dataSource('ds', {connector: 'memory'}); + this.Score = app.model('Score', { + dataSource: 'ds', + changeDataSource: false // use only local observers + }); + }); + + it('should detect create', function(done) { + var Score = this.Score; + + Score.createChangeStream(function(err, changes) { + changes.on('data', function(change) { + expect(change.type).to.equal('create'); + changes.destroy(); + done(); + }); + + Score.create({team: 'foo'}); + }); + }); + + it('should detect update', function(done) { + var Score = this.Score; + Score.create({team: 'foo'}, function(err, newScore) { + Score.createChangeStream(function(err, changes) { + changes.on('data', function(change) { + expect(change.type).to.equal('update'); + changes.destroy(); + done(); + }); + newScore.updateAttributes({ + bat: 'baz' + }); + }); + }); + }); + + it('should detect delete', function(done) { + var Score = this.Score; + Score.create({team: 'foo'}, function(err, newScore) { + Score.createChangeStream(function(err, changes) { + changes.on('data', function(change) { + expect(change.type).to.equal('remove'); + changes.destroy(); + done(); + }); + + newScore.remove(); + }); + }); + }); + }); + + // TODO(ritch) implement multi-server support + describe.skip('configured to source changes using pubsub', function() { + before(function() { + var test = this; + var app = loopback({localRegistry: true}); + var db = app.dataSource('ds', {connector: 'memory'}); + var ps = app.dataSource('ps', { + host: 'localhost', + port: '12345', + connector: 'pubsub', + pubsubAdapter: 'mqtt' + }); + this.Score = app.model('Score', { + dataSource: 'db', + changeDataSource: 'ps' + }); + }); + + it('should detect a change', function(done) { + var Score = this.Score; + + Score.createChangeStream(function(err, changes) { + changes.on('data', function(change) { + done(); + }); + }); + }); + }); +}); diff --git a/test/model.test.js b/test/model.test.js index 3ce499ab..b9a16f36 100644 --- a/test/model.test.js +++ b/test/model.test.js @@ -614,7 +614,8 @@ describe.onServer('Remote Methods', function() { 'destroyById', 'removeById', 'count', - 'prototype.updateAttributes' + 'prototype.updateAttributes', + 'createChangeStream' ]); }); });