Merge pull request #1116 from strongloop/fix/change-detection
Fix change detection & tracking
This commit is contained in:
commit
3d977f3e68
1
.jscsrc
1
.jscsrc
|
@ -8,6 +8,7 @@
|
||||||
"try",
|
"try",
|
||||||
"catch"
|
"catch"
|
||||||
],
|
],
|
||||||
|
"disallowMultipleVarDecl": "exceptUndefined",
|
||||||
"disallowSpacesInsideObjectBrackets": null,
|
"disallowSpacesInsideObjectBrackets": null,
|
||||||
"maximumLineLength": {
|
"maximumLineLength": {
|
||||||
"value": 150,
|
"value": 150,
|
||||||
|
|
|
@ -193,6 +193,7 @@ module.exports = function(Change) {
|
||||||
function updateCheckpoint(cb) {
|
function updateCheckpoint(cb) {
|
||||||
change.constructor.getCheckpointModel().current(function(err, checkpoint) {
|
change.constructor.getCheckpointModel().current(function(err, checkpoint) {
|
||||||
if (err) return Change.handleError(err);
|
if (err) return Change.handleError(err);
|
||||||
|
debug('updated checkpoint to', checkpoint);
|
||||||
change.checkpoint = checkpoint;
|
change.checkpoint = checkpoint;
|
||||||
cb();
|
cb();
|
||||||
});
|
});
|
||||||
|
@ -412,9 +413,10 @@ module.exports = function(Change) {
|
||||||
// this should be optimized
|
// this should be optimized
|
||||||
this.find(function(err, changes) {
|
this.find(function(err, changes) {
|
||||||
if (err) return cb(err);
|
if (err) return cb(err);
|
||||||
changes.forEach(function(change) {
|
async.each(
|
||||||
change.rectify();
|
changes,
|
||||||
});
|
function(c, next) { c.rectify(next); },
|
||||||
|
cb);
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -755,10 +755,10 @@ PersistedModel.changes = function(since, filter, callback) {
|
||||||
filter.fields[idName] = true;
|
filter.fields[idName] = true;
|
||||||
|
|
||||||
// TODO(ritch) this whole thing could be optimized a bit more
|
// TODO(ritch) this whole thing could be optimized a bit more
|
||||||
Change.find({
|
Change.find({ where: {
|
||||||
checkpoint: {gt: since},
|
checkpoint: {gt: since},
|
||||||
modelName: this.modelName
|
modelName: this.modelName
|
||||||
}, function(err, changes) {
|
}}, function(err, changes) {
|
||||||
if (err) return callback(err);
|
if (err) return callback(err);
|
||||||
if (!Array.isArray(changes) || changes.length === 0) return callback(null, []);
|
if (!Array.isArray(changes) || changes.length === 0) return callback(null, []);
|
||||||
var ids = changes.map(function(change) {
|
var ids = changes.map(function(change) {
|
||||||
|
@ -1044,15 +1044,9 @@ PersistedModel.enableChangeTracking = function() {
|
||||||
Change.attachTo(this.dataSource);
|
Change.attachTo(this.dataSource);
|
||||||
Change.getCheckpointModel().attachTo(this.dataSource);
|
Change.getCheckpointModel().attachTo(this.dataSource);
|
||||||
|
|
||||||
Model.afterSave = function afterSave(next) {
|
Model.observe('after save', rectifyOnSave);
|
||||||
Model.rectifyChange(this.getId(), next);
|
|
||||||
};
|
|
||||||
|
|
||||||
Model.afterDestroy = function afterDestroy(next) {
|
Model.observe('after delete', rectifyOnDelete);
|
||||||
Model.rectifyChange(this.getId(), next);
|
|
||||||
};
|
|
||||||
|
|
||||||
Model.on('deletedAll', cleanup);
|
|
||||||
|
|
||||||
if (runtime.isServer) {
|
if (runtime.isServer) {
|
||||||
// initial cleanup
|
// initial cleanup
|
||||||
|
@ -1072,6 +1066,54 @@ PersistedModel.enableChangeTracking = function() {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
function rectifyOnSave(ctx, next) {
|
||||||
|
if (ctx.instance) {
|
||||||
|
ctx.Model.rectifyChange(ctx.instance.getId(), reportErrorAndNext);
|
||||||
|
} else {
|
||||||
|
ctx.Model.rectifyAllChanges(reportErrorAndNext);
|
||||||
|
}
|
||||||
|
|
||||||
|
function reportErrorAndNext(err) {
|
||||||
|
if (err) {
|
||||||
|
console.error(
|
||||||
|
ctx.Model.modelName + '.rectifyChange(s) after save failed:' + err);
|
||||||
|
}
|
||||||
|
next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function rectifyOnDelete(ctx, next) {
|
||||||
|
var id = getIdFromWhereByModelId(ctx.Model, ctx.where);
|
||||||
|
if (id) {
|
||||||
|
ctx.Model.rectifyChange(id, reportErrorAndNext);
|
||||||
|
} else {
|
||||||
|
ctx.Model.rectifyAllChanges(reportErrorAndNext);
|
||||||
|
}
|
||||||
|
|
||||||
|
function reportErrorAndNext(err) {
|
||||||
|
if (err) {
|
||||||
|
console.error(
|
||||||
|
ctx.Model.modelName + '.rectifyChange(s) after delete failed:' + err);
|
||||||
|
}
|
||||||
|
next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function getIdFromWhereByModelId(Model, where) {
|
||||||
|
var whereKeys = Object.keys(where);
|
||||||
|
if (whereKeys.length != 1) return undefined;
|
||||||
|
|
||||||
|
var idName = Model.getIdName();
|
||||||
|
if (whereKeys[0] !== idName) return undefined;
|
||||||
|
|
||||||
|
var id = where[idName];
|
||||||
|
// TODO(bajtos) support object values that are not LB conditions
|
||||||
|
if (typeof id === 'string' || typeof id === 'number') {
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
PersistedModel._defineChangeModel = function() {
|
PersistedModel._defineChangeModel = function() {
|
||||||
var BaseChangeModel = registry.getModel('Change');
|
var BaseChangeModel = registry.getModel('Change');
|
||||||
assert(BaseChangeModel,
|
assert(BaseChangeModel,
|
||||||
|
|
|
@ -4,23 +4,28 @@ var ACL = loopback.ACL;
|
||||||
var Change = loopback.Change;
|
var Change = loopback.Change;
|
||||||
var defineModelTestsWithDataSource = require('./util/model-tests');
|
var defineModelTestsWithDataSource = require('./util/model-tests');
|
||||||
var PersistedModel = loopback.PersistedModel;
|
var PersistedModel = loopback.PersistedModel;
|
||||||
|
var expect = require('chai').expect;
|
||||||
|
|
||||||
describe('Replication / Change APIs', function() {
|
describe('Replication / Change APIs', function() {
|
||||||
|
var dataSource, SourceModel, TargetModel;
|
||||||
|
|
||||||
beforeEach(function() {
|
beforeEach(function() {
|
||||||
var test = this;
|
var test = this;
|
||||||
var dataSource = this.dataSource = loopback.createDataSource({
|
dataSource = this.dataSource = loopback.createDataSource({
|
||||||
connector: loopback.Memory
|
connector: loopback.Memory
|
||||||
});
|
});
|
||||||
var SourceModel = this.SourceModel = PersistedModel.extend('SourceModel', {}, {
|
SourceModel = this.SourceModel = PersistedModel.extend('SourceModel', {}, {
|
||||||
trackChanges: true
|
trackChanges: true
|
||||||
});
|
});
|
||||||
SourceModel.attachTo(dataSource);
|
SourceModel.attachTo(dataSource);
|
||||||
|
|
||||||
var TargetModel = this.TargetModel = PersistedModel.extend('TargetModel', {}, {
|
TargetModel = this.TargetModel = PersistedModel.extend('TargetModel', {}, {
|
||||||
trackChanges: true
|
trackChanges: true
|
||||||
});
|
});
|
||||||
TargetModel.attachTo(dataSource);
|
TargetModel.attachTo(dataSource);
|
||||||
|
|
||||||
|
test.startingCheckpoint = -1;
|
||||||
|
|
||||||
this.createInitalData = function(cb) {
|
this.createInitalData = function(cb) {
|
||||||
SourceModel.create({name: 'foo'}, function(err, inst) {
|
SourceModel.create({name: 'foo'}, function(err, inst) {
|
||||||
if (err) return cb(err);
|
if (err) return cb(err);
|
||||||
|
@ -48,6 +53,19 @@ describe('Replication / Change APIs', function() {
|
||||||
}, 1);
|
}, 1);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('excludes changes from older checkpoints', function(done) {
|
||||||
|
var FUTURE_CHECKPOINT = 999;
|
||||||
|
|
||||||
|
SourceModel.create({ name: 'foo' }, function(err) {
|
||||||
|
if (err) return done(err);
|
||||||
|
SourceModel.changes(FUTURE_CHECKPOINT, {}, function(err, changes) {
|
||||||
|
if (err) return done(err);
|
||||||
|
expect(changes).to.be.empty();
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('Model.replicate(since, targetModel, options, callback)', function() {
|
describe('Model.replicate(since, targetModel, options, callback)', function() {
|
||||||
|
@ -340,4 +358,137 @@ describe('Replication / Change APIs', function() {
|
||||||
assert(!this.conflict);
|
assert(!this.conflict);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('change detection', function() {
|
||||||
|
it('detects "create"', function(done) {
|
||||||
|
SourceModel.create({}, function(err, inst) {
|
||||||
|
if (err) return done(err);
|
||||||
|
assertChangeRecordedForId(inst.id, done);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('detects "updateOrCreate"', function(done) {
|
||||||
|
givenReplicatedInstance(function(err, created) {
|
||||||
|
if (err) return done(err);
|
||||||
|
var data = created.toObject();
|
||||||
|
created.name = 'updated';
|
||||||
|
SourceModel.updateOrCreate(created, function(err, inst) {
|
||||||
|
if (err) return done(err);
|
||||||
|
assertChangeRecordedForId(inst.id, done);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('detects "findOrCreate"', function(done) {
|
||||||
|
// make sure we bypass find+create and call the connector directly
|
||||||
|
SourceModel.dataSource.connector.findOrCreate =
|
||||||
|
function(model, query, data, callback) {
|
||||||
|
this.all(model, query, function(err, list) {
|
||||||
|
if (err || (list && list[0]))
|
||||||
|
return callback(err, list && list[0], false);
|
||||||
|
this.create(model, data, function(err) {
|
||||||
|
callback(err, data, true);
|
||||||
|
});
|
||||||
|
}.bind(this));
|
||||||
|
};
|
||||||
|
|
||||||
|
SourceModel.findOrCreate(
|
||||||
|
{ where: { name: 'does-not-exist' } },
|
||||||
|
{ name: 'created' },
|
||||||
|
function(err, inst) {
|
||||||
|
if (err) return done(err);
|
||||||
|
assertChangeRecordedForId(inst.id, done);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('detects "deleteById"', function(done) {
|
||||||
|
givenReplicatedInstance(function(err, inst) {
|
||||||
|
if (err) return done(err);
|
||||||
|
SourceModel.deleteById(inst.id, function(err) {
|
||||||
|
assertChangeRecordedForId(inst.id, done);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('detects "deleteAll"', function(done) {
|
||||||
|
givenReplicatedInstance(function(err, inst) {
|
||||||
|
if (err) return done(err);
|
||||||
|
SourceModel.deleteAll({ name: inst.name }, function(err) {
|
||||||
|
if (err) return done(err);
|
||||||
|
assertChangeRecordedForId(inst.id, done);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('detects "updateAll"', function(done) {
|
||||||
|
givenReplicatedInstance(function(err, inst) {
|
||||||
|
if (err) return done(err);
|
||||||
|
SourceModel.updateAll(
|
||||||
|
{ name: inst.name },
|
||||||
|
{ name: 'updated' },
|
||||||
|
function(err) {
|
||||||
|
if (err) return done(err);
|
||||||
|
assertChangeRecordedForId(inst.id, done);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('detects "prototype.save"', function(done) {
|
||||||
|
givenReplicatedInstance(function(err, inst) {
|
||||||
|
if (err) return done(err);
|
||||||
|
inst.name = 'updated';
|
||||||
|
inst.save(function(err) {
|
||||||
|
if (err) return done(err);
|
||||||
|
assertChangeRecordedForId(inst.id, done);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('detects "prototype.updateAttributes"', function(done) {
|
||||||
|
givenReplicatedInstance(function(err, inst) {
|
||||||
|
if (err) return done(err);
|
||||||
|
inst.updateAttributes({ name: 'updated' }, function(err) {
|
||||||
|
if (err) return done(err);
|
||||||
|
assertChangeRecordedForId(inst.id, done);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('detects "prototype.delete"', function(done) {
|
||||||
|
givenReplicatedInstance(function(err, inst) {
|
||||||
|
if (err) return done(err);
|
||||||
|
inst.delete(function(err) {
|
||||||
|
assertChangeRecordedForId(inst.id, done);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
function givenReplicatedInstance(cb) {
|
||||||
|
SourceModel.create({ name: 'a-name' }, function(err, inst) {
|
||||||
|
if (err) return cb(err);
|
||||||
|
SourceModel.checkpoint(function(err) {
|
||||||
|
if (err) return cb(err);
|
||||||
|
cb(null, inst);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function assertChangeRecordedForId(id, cb) {
|
||||||
|
SourceModel.getChangeModel().getCheckpointModel()
|
||||||
|
.current(function(err, cp) {
|
||||||
|
if (err) return cb(err);
|
||||||
|
SourceModel.changes(cp - 1, {}, function(err, pendingChanges) {
|
||||||
|
if (err) return cb(err);
|
||||||
|
expect(pendingChanges, 'list of changes').to.have.length(1);
|
||||||
|
var change = pendingChanges[0].toObject();
|
||||||
|
expect(change).to.have.property('checkpoint', cp); // sanity check
|
||||||
|
expect(change).to.have.property('modelName', SourceModel.modelName);
|
||||||
|
// NOTE(bajtos) Change.modelId is always String
|
||||||
|
// regardless of the type of the changed model's id property
|
||||||
|
expect(change).to.have.property('modelId', '' + id);
|
||||||
|
cb();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in New Issue