Rework replication test

This commit is contained in:
Ritchie Martori 2014-05-15 17:27:02 -07:00
parent 344601cde4
commit 2f21f4ec1e
8 changed files with 412 additions and 131 deletions

1
.gitignore vendored
View File

@ -11,3 +11,4 @@
*.swp
*.swo
node_modules
dist

View File

@ -195,7 +195,8 @@ Change.prototype.rectify = function(cb) {
Change.prototype.currentRevision = function(cb) {
var model = this.getModelCtor();
model.findById(this.modelId, function(err, inst) {
var id = this.getModelId();
model.findById(id, function(err, inst) {
if(err) return Change.handleError(err, cb);
if(inst) {
cb(null, Change.revisionForInst(inst));
@ -254,16 +255,6 @@ Change.prototype.type = function() {
return Change.UNKNOWN;
}
/**
* Get the `Model` class for `change.modelName`.
* @return {Model}
*/
Change.prototype.getModelCtor = function() {
// todo - not sure if this works with multiple data sources
return loopback.getModel(this.modelName);
}
/**
* Compare two changes.
* @param {Change} change
@ -403,9 +394,18 @@ Change.handleError = function(err) {
}
}
/**
* Get the `Model` class for `change.modelName`.
* @return {Model}
*/
Change.prototype.getModelCtor = function() {
return this.constructor.settings.trackModel;
}
Change.prototype.getModelId = function() {
// TODO(ritch) get rid of the need to create an instance
var Model = this.constructor.settings.trackModel;
var Model = this.getModelCtor();
var id = this.modelId;
var m = new Model();
m.setId(id);

View File

@ -62,16 +62,31 @@ DataModel.setup = function setupDataModel() {
* @private
*/
function setRemoting(fn, options) {
options = options || {};
for (var opt in options) {
if (options.hasOwnProperty(opt)) {
fn[opt] = options[opt];
}
function setRemoting(target, name, options) {
var fn = target[name];
setupFunction(fn, options);
target[name] = createProxy(fn, options);
}
function createProxy(fn, options) {
var p = function proxy() {
return fn.apply(this, arguments);
}
fn.shared = true;
// allow connectors to override the function by marking as delegate
fn._delegate = true;
return setupFunction(fn, options);
}
function setupFunction(fn, options) {
options = options || {};
for (var opt in options) {
if (options.hasOwnProperty(opt)) {
fn[opt] = options[opt];
}
}
fn.shared = true;
// allow connectors to override the function by marking as delegate
fn._delegate = true;
return fn;
}
/*!
@ -425,28 +440,28 @@ DataModel.setupRemoting = function() {
// TODO(ritch) setRemoting should create its own function...
setRemoting(DataModel.create, {
setRemoting(DataModel, 'create', {
description: 'Create a new instance of the model and persist it into the data source',
accepts: {arg: 'data', type: 'object', description: 'Model instance data', http: {source: 'body'}},
returns: {arg: 'data', type: typeName, root: true},
http: {verb: 'post', path: '/'}
});
setRemoting(DataModel.upsert, {
setRemoting(DataModel, 'upsert', {
description: 'Update an existing model instance or insert a new one into the data source',
accepts: {arg: 'data', type: 'object', description: 'Model instance data', http: {source: 'body'}},
returns: {arg: 'data', type: typeName, root: true},
http: {verb: 'put', path: '/'}
});
setRemoting(DataModel.exists, {
setRemoting(DataModel, 'exists', {
description: 'Check whether a model instance exists in the data source',
accepts: {arg: 'id', type: 'any', description: 'Model id', required: true},
returns: {arg: 'exists', type: 'boolean'},
http: {verb: 'get', path: '/:id/exists'}
});
setRemoting(DataModel.findById, {
setRemoting(DataModel, 'findById', {
description: 'Find a model instance by id from the data source',
accepts: {
arg: 'id', type: 'any', description: 'Model id', required: true,
@ -457,42 +472,42 @@ DataModel.setupRemoting = function() {
rest: {after: convertNullToNotFoundError}
});
setRemoting(DataModel.find, {
setRemoting(DataModel, 'find', {
description: 'Find all instances of the model matched by filter from the data source',
accepts: {arg: 'filter', type: 'object', description: 'Filter defining fields, where, orderBy, offset, and limit'},
returns: {arg: 'data', type: [typeName], root: true},
http: {verb: 'get', path: '/'}
});
setRemoting(DataModel.findOne, {
setRemoting(DataModel, 'findOne', {
description: 'Find first instance of the model matched by filter from the data source',
accepts: {arg: 'filter', type: 'object', description: 'Filter defining fields, where, orderBy, offset, and limit'},
returns: {arg: 'data', type: typeName, root: true},
http: {verb: 'get', path: '/findOne'}
});
setRemoting(DataModel.destroyAll, {
setRemoting(DataModel, 'destroyAll', {
description: 'Delete all matching records',
accepts: {arg: 'where', type: 'object', description: 'filter.where object'},
http: {verb: 'del', path: '/'}
http: {verb: 'del', path: '/'},
shared: false
});
DataModel.destroyAll.shared = false;
setRemoting(DataModel.deleteById, {
setRemoting(DataModel, 'removeById', {
description: 'Delete a model instance by id from the data source',
accepts: {arg: 'id', type: 'any', description: 'Model id', required: true,
http: {source: 'path'}},
http: {verb: 'del', path: '/:id'}
});
setRemoting(DataModel.count, {
setRemoting(DataModel, 'count', {
description: 'Count instances of the model matched by where from the data source',
accepts: {arg: 'where', type: 'object', description: 'Criteria to match model instances'},
returns: {arg: 'count', type: 'number'},
http: {verb: 'get', path: '/count'}
});
setRemoting(DataModel.prototype.updateAttributes, {
setRemoting(DataModel.prototype, 'updateAttributes', {
description: 'Update attributes for a model instance and persist it into the data source',
accepts: {arg: 'data', type: 'object', http: {source: 'body'}, description: 'An object of model property name/value pairs'},
returns: {arg: 'data', type: typeName, root: true},
@ -500,7 +515,7 @@ DataModel.setupRemoting = function() {
});
if(options.trackChanges) {
setRemoting(DataModel.diff, {
setRemoting(DataModel, 'diff', {
description: 'Get a set of deltas and conflicts since the given checkpoint',
accepts: [
{arg: 'since', type: 'number', description: 'Find deltas since this checkpoint'},
@ -511,7 +526,7 @@ DataModel.setupRemoting = function() {
http: {verb: 'post', path: '/diff'}
});
setRemoting(DataModel.changes, {
setRemoting(DataModel, 'changes', {
description: 'Get the changes to a model since a given checkpoint.'
+ 'Provide a filter object to reduce the number of results returned.',
accepts: [
@ -522,37 +537,37 @@ DataModel.setupRemoting = function() {
http: {verb: 'get', path: '/changes'}
});
setRemoting(DataModel.checkpoint, {
setRemoting(DataModel, 'checkpoint', {
description: 'Create a checkpoint.',
returns: {arg: 'checkpoint', type: 'object', root: true},
http: {verb: 'post', path: '/checkpoint'}
});
setRemoting(DataModel.currentCheckpoint, {
setRemoting(DataModel, 'currentCheckpoint', {
description: 'Get the current checkpoint.',
returns: {arg: 'checkpoint', type: 'object', root: true},
http: {verb: 'get', path: '/checkpoint'}
});
setRemoting(DataModel.createUpdates, {
setRemoting(DataModel, 'createUpdates', {
description: 'Create an update list from a delta list',
accepts: {arg: 'deltas', type: 'array', http: {source: 'body'}},
returns: {arg: 'updates', type: 'array', root: true},
http: {verb: 'post', path: '/create-updates'}
});
setRemoting(DataModel.bulkUpdate, {
setRemoting(DataModel, 'bulkUpdate', {
description: 'Run multiple updates at once. Note: this is not atomic.',
accepts: {arg: 'updates', type: 'array'},
http: {verb: 'post', path: '/bulk-update'}
});
setRemoting(DataModel.rectifyAllChanges, {
setRemoting(DataModel, 'rectifyAllChanges', {
description: 'Rectify all Model changes.',
http: {verb: 'post', path: '/rectify-all'}
});
setRemoting(DataModel.rectifyChange, {
setRemoting(DataModel, 'rectifyChange', {
description: 'Tell loopback that a change to the model with the given id has occurred.',
accepts: {arg: 'id', type: 'any', http: {source: 'path'}},
http: {verb: 'post', path: '/:id/rectify-change'}
@ -889,8 +904,6 @@ DataModel.getSourceId = function(cb) {
*/
DataModel.enableChangeTracking = function() {
// console.log('THIS SHOULD NOT RUN ON A MODEL CONNECTED TO A REMOTE DATASOURCE');
var Model = this;
var Change = this.Change || this._defineChangeModel();
var cleanupInterval = Model.settings.changeCleanupInterval || 30000;

View File

@ -6,12 +6,12 @@ describe('Change', function(){
var memory = loopback.createDataSource({
connector: loopback.Memory
});
Change = loopback.Change.extend('change');
Change.attachTo(memory);
TestModel = loopback.DataModel.extend('chtest');
TestModel = loopback.DataModel.extend('chtest', {}, {
trackChanges: true
});
this.modelName = TestModel.modelName;
TestModel.attachTo(memory);
Change = TestModel.getChangeModel();
});
beforeEach(function(done) {
@ -46,16 +46,16 @@ describe('Change', function(){
describe('using an existing untracked model', function () {
beforeEach(function(done) {
var test = this;
Change.rectifyModelChanges(this.modelName, [this.modelId], function(err, trakedChagnes) {
Change.rectifyModelChanges(this.modelName, [this.modelId], function(err, trackedChanges) {
if(err) return done(err);
test.trakedChagnes = trakedChagnes;
test.trackedChanges = trackedChanges;
done();
});
});
it('should create an entry', function () {
assert(Array.isArray(this.trakedChagnes));
assert.equal(this.trakedChagnes[0].modelId, this.modelId);
assert(Array.isArray(this.trackedChanges));
assert.equal(this.trackedChanges[0].modelId, this.modelId);
});
it('should only create one change', function (done) {

View File

@ -3,7 +3,7 @@ var path = require('path');
var app = module.exports = loopback();
var models = require('./models');
var TestModel = models.TestModel;
var explorer = require('loopback-explorer');
// var explorer = require('loopback-explorer');
app.use(loopback.cookieParser({secret: app.get('cookieSecret')}));
var apiPath = '/api';
@ -13,7 +13,7 @@ TestModel.attachTo(loopback.memory());
app.model(TestModel);
app.model(TestModel.getChangeModel());
app.use('/explorer', explorer(app, {basePath: apiPath}));
// app.use('/explorer', explorer(app, {basePath: apiPath}));
app.use(loopback.static(path.join(__dirname, 'public')));
app.use(loopback.urlNotFound());

View File

@ -479,7 +479,7 @@ describe.onServer('Remote Methods', function(){
var result;
var current;
async.parallel(tasks, function(err) {
async.series(tasks, function(err) {
if(err) return done(err);
assert.equal(result, current + 1);
@ -495,88 +495,13 @@ describe.onServer('Remote Methods', function(){
function checkpoint(cb) {
User.checkpoint(function(err, cp) {
result = cp.id;
result = cp.seq;
cb(err);
});
}
});
});
describe('Replication / Change APIs', function() {
beforeEach(function(done) {
var test = this;
this.dataSource = dataSource;
var SourceModel = this.SourceModel = DataModel.extend('SourceModel', {}, {
trackChanges: true
});
SourceModel.attachTo(dataSource);
var TargetModel = this.TargetModel = DataModel.extend('TargetModel', {}, {
trackChanges: true
});
TargetModel.attachTo(dataSource);
var createOne = SourceModel.create.bind(SourceModel, {
name: 'baz'
});
async.parallel([
createOne,
function(cb) {
SourceModel.currentCheckpoint(function(err, id) {
if(err) return cb(err);
test.startingCheckpoint = id;
cb();
});
}
], process.nextTick.bind(process, done));
});
describe('Model.changes(since, filter, callback)', function() {
it('Get changes since the given checkpoint', function (done) {
this.SourceModel.changes(this.startingCheckpoint, {}, function(err, changes) {
assert.equal(changes.length, 1);
done();
});
});
});
describe.skip('Model.replicate(since, targetModel, options, callback)', function() {
it('Replicate data using the target model', function (done) {
var test = this;
var options = {};
var sourceData;
var targetData;
this.SourceModel.replicate(this.startingCheckpoint, this.TargetModel,
options, function(err, conflicts) {
assert(conflicts.length === 0);
async.parallel([
function(cb) {
test.SourceModel.find(function(err, result) {
if(err) return cb(err);
sourceData = result;
cb();
});
},
function(cb) {
test.TargetModel.find(function(err, result) {
if(err) return cb(err);
targetData = result;
cb();
});
}
], function(err) {
if(err) return done(err);
assert.deepEqual(sourceData, targetData);
done();
});
});
});
});
});
describe('Model._getACLModel()', function() {
it('should return the subclass of ACL', function() {
var Model = require('../').Model;

View File

@ -62,7 +62,6 @@ describe('RemoteConnector', function() {
ServerModel.setupRemoting();
var m = new RemoteModel({foo: 'bar'});
console.log(m.save.toString());
m.save(function(err, inst) {
assert(inst instanceof RemoteModel);
assert(calledServerCreate);

343
test/replication.test.js Normal file
View File

@ -0,0 +1,343 @@
var async = require('async');
var loopback = require('../');
var ACL = loopback.ACL;
var Change = loopback.Change;
var defineModelTestsWithDataSource = require('./util/model-tests');
var DataModel = loopback.DataModel;
describe('Replication / Change APIs', function() {
beforeEach(function() {
var test = this;
var dataSource = this.dataSource = loopback.createDataSource({
connector: loopback.Memory
});
var SourceModel = this.SourceModel = DataModel.extend('SourceModel', {}, {
trackChanges: true
});
SourceModel.attachTo(dataSource);
var TargetModel = this.TargetModel = DataModel.extend('TargetModel', {}, {
trackChanges: true
});
TargetModel.attachTo(dataSource);
this.createInitalData = function(cb) {
SourceModel.create({name: 'foo'}, function(err, inst) {
if(err) return cb(err);
test.model = inst;
// give loopback a chance to register the change
// TODO(ritch) get rid of this...
setTimeout(function() {
SourceModel.replicate(TargetModel, cb);
}, 100);
});
};
});
describe('Model.changes(since, filter, callback)', function() {
it('Get changes since the given checkpoint', function (done) {
var test = this;
this.SourceModel.create({name: 'foo'}, function(err) {
if(err) return done(err);
setTimeout(function() {
test.SourceModel.changes(test.startingCheckpoint, {}, function(err, changes) {
assert.equal(changes.length, 1);
done();
});
}, 1);
});
});
});
describe('Model.replicate(since, targetModel, options, callback)', function() {
it('Replicate data using the target model', function (done) {
var test = this;
var options = {};
var sourceData;
var targetData;
this.SourceModel.create({name: 'foo'}, function(err) {
setTimeout(replicate, 100);
});
function replicate() {
test.SourceModel.replicate(test.startingCheckpoint, test.TargetModel,
options, function(err, conflicts) {
assert(conflicts.length === 0);
async.parallel([
function(cb) {
test.SourceModel.find(function(err, result) {
if(err) return cb(err);
sourceData = result;
cb();
});
},
function(cb) {
test.TargetModel.find(function(err, result) {
if(err) return cb(err);
targetData = result;
cb();
});
}
], function(err) {
if(err) return done(err);
assert.deepEqual(sourceData, targetData);
done();
});
});
}
});
});
describe('conflict detection - both updated', function() {
beforeEach(function(done) {
var SourceModel = this.SourceModel;
var TargetModel = this.TargetModel;
var test = this;
test.createInitalData(createConflict);
function createConflict(err, conflicts) {
async.parallel([
function(cb) {
SourceModel.findOne(function(err, inst) {
if(err) return cb(err);
inst.name = 'source update';
inst.save(cb);
});
},
function(cb) {
TargetModel.findOne(function(err, inst) {
if(err) return cb(err);
inst.name = 'target update';
inst.save(cb);
});
}
], function(err) {
if(err) return done(err);
SourceModel.replicate(TargetModel, function(err, conflicts) {
if(err) return done(err);
test.conflicts = conflicts;
test.conflict = conflicts[0];
done();
});
});
}
});
it('should detect a single conflict', function() {
assert.equal(this.conflicts.length, 1);
assert(this.conflict);
});
it('type should be UPDATE', function(done) {
this.conflict.type(function(err, type) {
assert.equal(type, Change.UPDATE);
done();
});
});
it('conflict.changes()', function(done) {
var test = this;
this.conflict.changes(function(err, sourceChange, targetChange) {
assert.equal(typeof sourceChange.id, 'string');
assert.equal(typeof targetChange.id, 'string');
assert.equal(test.model.getId(), sourceChange.getModelId());
assert.equal(sourceChange.type(), Change.UPDATE);
assert.equal(targetChange.type(), Change.UPDATE);
done();
});
});
it('conflict.models()', function(done) {
var test = this;
this.conflict.models(function(err, source, target) {
assert.deepEqual(source.toJSON(), {
id: 1,
name: 'source update'
});
assert.deepEqual(target.toJSON(), {
id: 1,
name: 'target update'
});
done();
});
});
});
describe('conflict detection - source deleted', function() {
beforeEach(function(done) {
var SourceModel = this.SourceModel;
var TargetModel = this.TargetModel;
var test = this;
test.createInitalData(createConflict);
function createConflict() {
async.parallel([
function(cb) {
SourceModel.findOne(function(err, inst) {
if(err) return cb(err);
test.model = inst;
inst.remove(cb);
});
},
function(cb) {
TargetModel.findOne(function(err, inst) {
if(err) return cb(err);
inst.name = 'target update';
inst.save(cb);
});
}
], function(err) {
if(err) return done(err);
SourceModel.replicate(TargetModel, function(err, conflicts) {
if(err) return done(err);
test.conflicts = conflicts;
test.conflict = conflicts[0];
done();
});
});
}
});
it('should detect a single conflict', function() {
assert.equal(this.conflicts.length, 1);
assert(this.conflict);
});
it('type should be DELETE', function(done) {
this.conflict.type(function(err, type) {
assert.equal(type, Change.DELETE);
done();
});
});
it('conflict.changes()', function(done) {
var test = this;
this.conflict.changes(function(err, sourceChange, targetChange) {
assert.equal(typeof sourceChange.id, 'string');
assert.equal(typeof targetChange.id, 'string');
assert.equal(test.model.getId(), sourceChange.getModelId());
assert.equal(sourceChange.type(), Change.DELETE);
assert.equal(targetChange.type(), Change.UPDATE);
done();
});
});
it('conflict.models()', function(done) {
var test = this;
this.conflict.models(function(err, source, target) {
assert.equal(source, null);
assert.deepEqual(target.toJSON(), {
id: 1,
name: 'target update'
});
done();
});
});
});
describe('conflict detection - target deleted', function() {
beforeEach(function(done) {
var SourceModel = this.SourceModel;
var TargetModel = this.TargetModel;
var test = this;
test.createInitalData(createConflict);
function createConflict() {
async.parallel([
function(cb) {
SourceModel.findOne(function(err, inst) {
if(err) return cb(err);
test.model = inst;
inst.name = 'source update';
inst.save(cb);
});
},
function(cb) {
TargetModel.findOne(function(err, inst) {
if(err) return cb(err);
inst.remove(cb);
});
}
], function(err) {
if(err) return done(err);
SourceModel.replicate(TargetModel, function(err, conflicts) {
if(err) return done(err);
test.conflicts = conflicts;
test.conflict = conflicts[0];
done();
});
});
}
});
it('should detect a single conflict', function() {
assert.equal(this.conflicts.length, 1);
assert(this.conflict);
});
it('type should be DELETE', function(done) {
this.conflict.type(function(err, type) {
assert.equal(type, Change.DELETE);
done();
});
});
it('conflict.changes()', function(done) {
var test = this;
this.conflict.changes(function(err, sourceChange, targetChange) {
assert.equal(typeof sourceChange.id, 'string');
assert.equal(typeof targetChange.id, 'string');
assert.equal(test.model.getId(), sourceChange.getModelId());
assert.equal(sourceChange.type(), Change.UPDATE);
assert.equal(targetChange.type(), Change.DELETE);
done();
});
});
it('conflict.models()', function(done) {
var test = this;
this.conflict.models(function(err, source, target) {
assert.equal(target, null);
assert.deepEqual(source.toJSON(), {
id: 1,
name: 'source update'
});
done();
});
});
});
describe('conflict detection - both deleted', function() {
beforeEach(function(done) {
var SourceModel = this.SourceModel;
var TargetModel = this.TargetModel;
var test = this;
test.createInitalData(createConflict);
function createConflict() {
async.parallel([
function(cb) {
SourceModel.findOne(function(err, inst) {
if(err) return cb(err);
test.model = inst;
inst.remove(cb);
});
},
function(cb) {
TargetModel.findOne(function(err, inst) {
if(err) return cb(err);
inst.remove(cb);
});
}
], function(err) {
if(err) return done(err);
SourceModel.replicate(TargetModel, function(err, conflicts) {
if(err) return done(err);
test.conflicts = conflicts;
test.conflict = conflicts[0];
done();
});
});
}
});
it('should not detect a conflict', function() {
assert.equal(this.conflicts.length, 0);
assert(!this.conflict);
});
});
});