This commit is contained in:
Esco Obong 2015-03-25 16:45:58 -04:00
commit 1993338c0b
7 changed files with 772 additions and 113 deletions

View File

@ -83,7 +83,7 @@ See [loopback-example](https://github.com/strongloop/loopback-example) for detai
## Contributing
See https://github.com/strongloop/loopback/wiki/Contributing
See https://github.com/strongloop/loopback/wiki/Contributing-code
## Issues

View File

@ -126,7 +126,7 @@ module.exports = function(Change) {
modelId: modelId
});
ch.debug('creating change');
ch.save(callback);
Change.updateOrCreate(ch, callback);
}
});
};
@ -248,6 +248,7 @@ module.exports = function(Change) {
*/
Change.revisionForInst = function(inst) {
assert(inst, 'Change.revisionForInst() requires an instance object.');
return this.hash(CJSON.stringify(inst));
};
@ -370,15 +371,18 @@ module.exports = function(Change) {
this.find({
where: {
modelName: modelName,
modelId: {inq: modelIds},
checkpoint: {gte: since}
modelId: {inq: modelIds}
}
}, function(err, localChanges) {
}, function(err, allLocalChanges) {
if (err) return callback(err);
var deltas = [];
var conflicts = [];
var localModelIds = [];
var localChanges = allLocalChanges.filter(function(c) {
return c.checkpoint >= since;
});
localChanges.forEach(function(localChange) {
localChange = new Change(localChange);
localModelIds.push(localChange.modelId);
@ -396,9 +400,20 @@ module.exports = function(Change) {
});
modelIds.forEach(function(id) {
if (localModelIds.indexOf(id) === -1) {
deltas.push(remoteChangeIndex[id]);
if (localModelIds.indexOf(id) !== -1) return;
var d = remoteChangeIndex[id];
var oldChange = allLocalChanges.filter(function(c) {
return c.modelId === id;
})[0];
if (oldChange) {
d.prev = oldChange.rev;
} else {
d.prev = null;
}
deltas.push(d);
});
callback(null, {
@ -601,6 +616,13 @@ module.exports = function(Change) {
/**
* Resolve the conflict.
*
* Set the source change's previous revision to the current revision of the
* (conflicting) target change. Since the changes are no longer conflicting
* and appear as if the source change was based on the target, they will be
* replicated normally as part of the next replicate() call.
*
* This is effectively resolving the conflict using the source version.
*
* @callback {Function} callback
* @param {Error} err
*/
@ -614,6 +636,74 @@ module.exports = function(Change) {
});
};
/**
* Resolve the conflict using the instance data in the source model.
*
* @callback {Function} callback
* @param {Error} err
*/
Conflict.prototype.resolveUsingSource = function(cb) {
this.resolve(function(err) {
// don't forward any cb arguments from resolve()
cb(err);
});
};
/**
* Resolve the conflict using the instance data in the target model.
*
* @callback {Function} callback
* @param {Error} err
*/
Conflict.prototype.resolveUsingTarget = function(cb) {
var conflict = this;
conflict.models(function(err, source, target) {
if (err) return done(err);
if (target === null) {
return conflict.SourceModel.deleteById(conflict.modelId, done);
}
var inst = new conflict.SourceModel(target);
inst.save(done);
});
function done(err) {
// don't forward any cb arguments from internal calls
cb(err);
}
};
/**
* Resolve the conflict using the supplied instance data.
*
* @param {Object} data The set of changes to apply on the model
* instance. Use `null` value to delete the source instance instead.
* @callback {Function} callback
* @param {Error} err
*/
Conflict.prototype.resolveManually = function(data, cb) {
var conflict = this;
if (!data) {
return conflict.SourceModel.deleteById(conflict.modelId, done);
}
conflict.models(function(err, source, target) {
if (err) return done(err);
var inst = source || new conflict.SourceModel(target);
inst.setAttributes(data);
inst.save(function(err) {
if (err) return done(err);
conflict.resolve(done);
});
});
function done(err) {
// don't forward any cb arguments from internal calls
cb(err);
}
};
/**
* Determine the conflict type.
*

View File

@ -303,11 +303,12 @@ module.exports = function(User) {
*
* ```js
* var options = {
* type: 'email',
* to: user.email,
* template: 'verify.ejs',
* redirect: '/'
* };
* type: 'email',
* to: user.email,
* template: 'verify.ejs',
* redirect: '/',
* tokenGenerator: function (user, cb) { cb("random-token"); }
* };
*
* user.verify(options, next);
* ```
@ -323,6 +324,11 @@ module.exports = function(User) {
* page, for example, `'verify.ejs'.
* @property {String} redirect Page to which user will be redirected after
* they verify their email, for example `'/'` for root URI.
* @property {Function} generateVerificationToken A function to be used to
* generate the verification token. It must accept the user object and a
* callback function. This function should NOT add the token to the user
* object, instead simply execute the callback with the token! User saving
* and email sending will be handled in the `verify()` method.
*/
User.prototype.verify = function(options, fn) {
@ -360,19 +366,20 @@ module.exports = function(User) {
// Email model
var Email = options.mailer || this.constructor.email || loopback.getModelByType(loopback.Email);
crypto.randomBytes(64, function(err, buf) {
if (err) {
fn(err);
} else {
user.verificationToken = buf.toString('hex');
user.save(function(err) {
if (err) {
fn(err);
} else {
sendEmail(user);
}
});
}
// Set a default token generation function if one is not provided
var tokenGenerator = options.generateVerificationToken || User.generateVerificationToken;
tokenGenerator(user, function(err, token) {
if (err) { return fn(err); }
user.verificationToken = token;
user.save(function(err) {
if (err) {
fn(err);
} else {
sendEmail(user);
}
});
});
// TODO - support more verification types
@ -401,6 +408,22 @@ module.exports = function(User) {
}
};
/**
* A default verification token generator which accepts the user the token is
* being generated for and a callback function to indicate completion.
* This one uses the crypto library and 64 random bytes (converted to hex)
* for the token. When used in combination with the user.verify() method this
* function will be called with the `user` object as it's context (`this`).
*
* @param {object} user The User this token is being generated for.
* @param {Function} cb The generator must pass back the new token with this function call
*/
User.generateVerificationToken = function(user, cb) {
crypto.randomBytes(64, function(err, buf) {
cb(err, buf && buf.toString('hex'));
});
};
/**
* Confirm the user's identity.
*

View File

@ -952,7 +952,20 @@ function tryReplicate(sourceModel, targetModel, since, options, callback) {
function bulkUpdate(_updates, cb) {
debug('\tstarting bulk update');
updates = _updates;
targetModel.bulkUpdate(updates, cb);
targetModel.bulkUpdate(updates, function(err) {
var conflicts = err && err.details && err.details.conflicts;
if (conflicts && err.statusCode == 409) {
diff.conflicts = conflicts;
// filter out updates that were not applied
updates = updates.filter(function(u) {
return conflicts
.filter(function(d) { return d.modelId === u.change.modelId; })
.length === 0;
});
return cb();
}
cb(err);
});
}
function checkpoints() {
@ -974,7 +987,7 @@ function tryReplicate(sourceModel, targetModel, since, options, callback) {
debug('\treplication finished');
debug('\t\t%s conflict(s) detected', diff.conflicts.length);
debug('\t\t%s change(s) applied', updates && updates.length);
debug('\t\t%s change(s) applied', updates ? updates.length : 0);
debug('\t\tnew checkpoints: { source: %j, target: %j }',
newSourceCp, newTargetCp);
@ -1058,31 +1071,197 @@ PersistedModel.createUpdates = function(deltas, cb) {
PersistedModel.bulkUpdate = function(updates, callback) {
var tasks = [];
var Model = this;
var idName = this.dataSource.idName(this.modelName);
var Change = this.getChangeModel();
var conflicts = [];
updates.forEach(function(update) {
switch (update.type) {
case Change.UPDATE:
case Change.CREATE:
// var model = new Model(update.data);
// tasks.push(model.save.bind(model));
tasks.push(function(cb) {
var model = new Model(update.data);
model.save(cb);
});
break;
case Change.DELETE:
var data = {};
data[idName] = update.change.modelId;
var model = new Model(data);
tasks.push(model.destroy.bind(model));
break;
buildLookupOfAffectedModelData(Model, updates, function(err, currentMap) {
if (err) return callback(err);
updates.forEach(function(update) {
var id = update.change.modelId;
var current = currentMap[id];
switch (update.type) {
case Change.UPDATE:
tasks.push(function(cb) {
applyUpdate(Model, id, current, update.data, update.change, conflicts, cb);
});
break;
case Change.CREATE:
tasks.push(function(cb) {
applyCreate(Model, id, current, update.data, update.change, conflicts, cb);
});
break;
case Change.DELETE:
tasks.push(function(cb) {
applyDelete(Model, id, current, update.change, conflicts, cb);
});
break;
}
});
async.parallel(tasks, function(err) {
if (err) return callback(err);
if (conflicts.length) {
err = new Error('Conflict');
err.statusCode = 409;
err.details = { conflicts: conflicts };
return callback(err);
}
callback();
});
});
};
function buildLookupOfAffectedModelData(Model, updates, callback) {
var idName = Model.dataSource.idName(Model.modelName);
var affectedIds = updates.map(function(u) { return u.change.modelId; });
var whereAffected = {};
whereAffected[idName] = { inq: affectedIds };
Model.find({ where: whereAffected }, function(err, affectedList) {
if (err) return callback(err);
var dataLookup = {};
affectedList.forEach(function(it) {
dataLookup[it[idName]] = it;
});
callback(null, dataLookup);
});
}
function applyUpdate(Model, id, current, data, change, conflicts, cb) {
var Change = Model.getChangeModel();
var rev = current ? Change.revisionForInst(current) : null;
if (rev !== change.prev) {
debug('Detected non-rectified change of %s %j',
Model.modelName, id);
debug('\tExpected revision: %s', change.rev);
debug('\tActual revision: %s', rev);
conflicts.push(change);
return Change.rectifyModelChanges(Model.modelName, [id], cb);
}
// TODO(bajtos) modify `data` so that it instructs
// the connector to remove any properties included in "inst"
// but not included in `data`
// See https://github.com/strongloop/loopback/issues/1215
Model.updateAll(current.toObject(), data, function(err, result) {
if (err) return cb(err);
var count = result && result.count;
switch (count) {
case 1:
// The happy path, exactly one record was updated
return cb();
case 0:
debug('UpdateAll detected non-rectified change of %s %j',
Model.modelName, id);
conflicts.push(change);
// NOTE(bajtos) updateAll triggers change rectification
// for all model instances, even when no records were updated,
// thus we don't need to rectify explicitly ourselves
return cb();
case undefined:
case null:
return cb(new Error(
'Cannot apply bulk updates, ' +
'the connector does not correctly report ' +
'the number of updated records.'));
default:
debug('%s.updateAll modified unexpected number of instances: %j',
Model.modelName, count);
return cb(new Error(
'Bulk update failed, the connector has modified unexpected ' +
'number of records: ' + JSON.stringify(count)));
}
});
}
async.parallel(tasks, callback);
};
function applyCreate(Model, id, current, data, change, conflicts, cb) {
Model.create(data, function(createErr) {
if (!createErr) return cb();
// We don't have a reliable way how to detect the situation
// where he model was not create because of a duplicate id
// The workaround is to query the DB to check if the model already exists
Model.findById(id, function(findErr, inst) {
if (findErr || !inst) {
// There isn't any instance with the same id, thus there isn't
// any conflict and we just report back the original error.
return cb(createErr);
}
return conflict();
});
});
function conflict() {
// The instance already exists - report a conflict
debug('Detected non-rectified new instance of %s %j',
Model.modelName, id);
conflicts.push(change);
var Change = Model.getChangeModel();
return Change.rectifyModelChanges(Model.modelName, [id], cb);
}
}
function applyDelete(Model, id, current, change, conflicts, cb) {
if (!current) {
// The instance was either already deleted or not created at all,
// we are done.
return cb();
}
var Change = Model.getChangeModel();
var rev = Change.revisionForInst(current);
if (rev !== change.prev) {
debug('Detected non-rectified change of %s %j',
Model.modelName, id);
debug('\tExpected revision: %s', change.rev);
debug('\tActual revision: %s', rev);
conflicts.push(change);
return Change.rectifyModelChanges(Model.modelName, [id], cb);
}
Model.deleteAll(current.toObject(), function(err, result) {
if (err) return cb(err);
var count = result && result.count;
switch (count) {
case 1:
// The happy path, exactly one record was updated
return cb();
case 0:
debug('DeleteAll detected non-rectified change of %s %j',
Model.modelName, id);
conflicts.push(change);
// NOTE(bajtos) deleteAll triggers change rectification
// for all model instances, even when no records were updated,
// thus we don't need to rectify explicitly ourselves
return cb();
case undefined:
case null:
return cb(new Error(
'Cannot apply bulk updates, ' +
'the connector does not correctly report ' +
'the number of deleted records.'));
default:
debug('%s.deleteAll modified unexpected number of instances: %j',
Model.modelName, count);
return cb(new Error(
'Bulk update failed, the connector has deleted unexpected ' +
'number of records: ' + JSON.stringify(count)));
}
});
}
/**
* Get the `Change` model.
@ -1183,7 +1362,9 @@ function rectifyOnSave(ctx, next) {
}
function rectifyOnDelete(ctx, next) {
var id = getIdFromWhereByModelId(ctx.Model, ctx.where);
var id = ctx.instance ? ctx.instance.getId() :
getIdFromWhereByModelId(ctx.Model, ctx.where);
if (id) {
ctx.Model.rectifyChange(id, reportErrorAndNext);
} else {

View File

@ -1,4 +1,5 @@
var async = require('async');
var expect = require('chai').expect;
var Change;
var TestModel;
@ -134,11 +135,16 @@ describe('Change', function() {
describe('change.rectify(callback)', function() {
var change;
beforeEach(function() {
change = new Change({
modelName: this.modelName,
modelId: this.modelId
});
beforeEach(function(done) {
Change.findOrCreate(
{
modelName: this.modelName,
modelId: this.modelId
},
function(err, ch) {
change = ch;
done(err);
});
});
it('should create a new change with the correct revision', function(done) {
@ -344,10 +350,89 @@ describe('Change', function() {
];
Change.diff(this.modelName, 0, remoteChanges, function(err, diff) {
if (err) return done(err);
assert.equal(diff.deltas.length, 1);
assert.equal(diff.conflicts.length, 1);
done();
});
});
it('should set "prev" to local revision in non-conflicting delta', function(done) {
var updateRecord = {
rev: 'foo-new',
prev: 'foo',
modelName: this.modelName,
modelId: '9',
checkpoint: 2
};
Change.diff(this.modelName, 0, [updateRecord], function(err, diff) {
if (err) return done(err);
expect(diff.conflicts, 'conflicts').to.have.length(0);
expect(diff.deltas, 'deltas').to.have.length(1);
var actual = diff.deltas[0].toObject();
delete actual.id;
expect(actual).to.eql({
checkpoint: 2,
modelId: '9',
modelName: updateRecord.modelName,
prev: 'foo', // this is the current local revision
rev: 'foo-new',
});
done();
});
});
it('should set "prev" to local revision in remote-only delta', function(done) {
var updateRecord = {
rev: 'foo-new',
prev: 'foo-prev',
modelName: this.modelName,
modelId: '9',
checkpoint: 2
};
// IMPORTANT: the diff call excludes the local change
// with rev=foo CP=1
Change.diff(this.modelName, 2, [updateRecord], function(err, diff) {
if (err) return done(err);
expect(diff.conflicts, 'conflicts').to.have.length(0);
expect(diff.deltas, 'deltas').to.have.length(1);
var actual = diff.deltas[0].toObject();
delete actual.id;
expect(actual).to.eql({
checkpoint: 2,
modelId: '9',
modelName: updateRecord.modelName,
prev: 'foo', // this is the current local revision
rev: 'foo-new',
});
done();
});
});
it('should set "prev" to null for a new instance', function(done) {
var updateRecord = {
rev: 'new-rev',
prev: 'new-prev',
modelName: this.modelName,
modelId: 'new-id',
checkpoint: 2
};
Change.diff(this.modelName, 0, [updateRecord], function(err, diff) {
if (err) return done(err);
expect(diff.conflicts).to.have.length(0);
expect(diff.deltas).to.have.length(1);
var actual = diff.deltas[0].toObject();
delete actual.id;
expect(actual).to.eql({
checkpoint: 2,
modelId: 'new-id',
modelName: updateRecord.modelName,
prev: null, // this is the current local revision
rev: 'new-rev',
});
done();
});
});
});
});

View File

@ -88,12 +88,10 @@ describe('Replication / Change APIs', function() {
var targetData;
this.SourceModel.create({name: 'foo'}, function(err) {
setTimeout(replicate, 100);
});
function replicate() {
if (err) return done(err);
test.SourceModel.replicate(test.startingCheckpoint, test.TargetModel,
options, function(err, conflicts) {
if (err) return done(err);
assert(conflicts.length === 0);
async.parallel([
function(cb) {
@ -117,7 +115,7 @@ describe('Replication / Change APIs', function() {
done();
});
});
}
});
});
it('applies "since" filter on source changes', function(done) {
@ -179,18 +177,11 @@ describe('Replication / Change APIs', function() {
});
it('picks up changes made during replication', function(done) {
var bulkUpdate = TargetModel.bulkUpdate;
TargetModel.bulkUpdate = function(data, cb) {
var self = this;
setupRaceConditionInReplication(function(cb) {
// simulate the situation when another model is created
// while a replication run is in progress
SourceModel.create({ id: 'racer' }, function(err) {
if (err) return cb(err);
bulkUpdate.call(self, data, cb);
});
// create the new model only once
TargetModel.bulkUpdate = bulkUpdate;
};
SourceModel.create({ id: 'racer' }, cb);
});
var lastCp;
async.series([
@ -291,6 +282,128 @@ describe('Replication / Change APIs', function() {
}
], done);
});
describe('with 3rd-party changes', function() {
it('detects UPDATE made during UPDATE', function(done) {
async.series([
createModel(SourceModel, { id: '1' }),
replicateExpectingSuccess(),
function updateModel(next) {
SourceModel.updateAll({ id: '1' }, { name: 'source' }, next);
},
function replicateWith3rdPartyModifyingData(next) {
setupRaceConditionInReplication(function(cb) {
TargetModel.dataSource.connector.updateAttributes(
TargetModel.modelName,
'1',
{ name: '3rd-party' },
cb);
});
SourceModel.replicate(
TargetModel,
function(err, conflicts, cps, updates) {
if (err) return next(err);
var conflictedIds = getPropValue(conflicts || [], 'modelId');
expect(conflictedIds).to.eql(['1']);
// resolve the conflict using ours
conflicts[0].resolve(next);
});
},
replicateExpectingSuccess(),
verifyInstanceWasReplicated(SourceModel, TargetModel, '1')
], done);
});
it('detects CREATE made during CREATE', function(done) {
async.series([
// FIXME(bajtos) Remove the 'name' property once the implementation
// of UPDATE is fixed to correctly remove properties
createModel(SourceModel, { id: '1', name: 'source' }),
function replicateWith3rdPartyModifyingData(next) {
setupRaceConditionInReplication(function(cb) {
TargetModel.dataSource.connector.create(
TargetModel.modelName,
{ id: '1', name: '3rd-party' },
cb);
});
SourceModel.replicate(
TargetModel,
function(err, conflicts, cps, updates) {
if (err) return next(err);
var conflictedIds = getPropValue(conflicts || [], 'modelId');
expect(conflictedIds).to.eql(['1']);
// resolve the conflict using ours
conflicts[0].resolve(next);
});
},
replicateExpectingSuccess(),
verifyInstanceWasReplicated(SourceModel, TargetModel, '1')
], done);
});
it('detects UPDATE made during DELETE', function(done) {
async.series([
createModel(SourceModel, { id: '1' }),
replicateExpectingSuccess(),
function deleteModel(next) {
SourceModel.deleteById('1', next);
},
function replicateWith3rdPartyModifyingData(next) {
setupRaceConditionInReplication(function(cb) {
TargetModel.dataSource.connector.updateAttributes(
TargetModel.modelName,
'1',
{ name: '3rd-party' },
cb);
});
SourceModel.replicate(
TargetModel,
function(err, conflicts, cps, updates) {
if (err) return next(err);
var conflictedIds = getPropValue(conflicts || [], 'modelId');
expect(conflictedIds).to.eql(['1']);
// resolve the conflict using ours
conflicts[0].resolve(next);
});
},
replicateExpectingSuccess(),
verifyInstanceWasReplicated(SourceModel, TargetModel, '1')
], done);
});
it('handles DELETE made during DELETE', function(done) {
async.series([
createModel(SourceModel, { id: '1' }),
replicateExpectingSuccess(),
function deleteModel(next) {
SourceModel.deleteById('1', next);
},
function setup3rdPartyModifyingData(next) {
setupRaceConditionInReplication(function(cb) {
TargetModel.dataSource.connector.destroy(
TargetModel.modelName,
'1',
cb);
});
next();
},
replicateExpectingSuccess(),
verifyInstanceWasReplicated(SourceModel, TargetModel, '1')
], done);
});
});
});
describe('conflict detection - both updated', function() {
@ -850,54 +963,74 @@ describe('Replication / Change APIs', function() {
], done);
});
it('handles conflict resolved using "ours"', function(done) {
testResolvedConflictIsHandledWithNoMoreConflicts(
it('handles UPDATE conflict resolved using "ours"', function(done) {
testUpdateConflictIsResolved(
function resolveUsingOurs(conflict, cb) {
conflict.resolve(cb);
conflict.resolveUsingSource(cb);
},
done);
});
it('handles conflict resolved using "theirs"', function(done) {
testResolvedConflictIsHandledWithNoMoreConflicts(
it('handles UPDATE conflict resolved using "theirs"', function(done) {
testUpdateConflictIsResolved(
function resolveUsingTheirs(conflict, cb) {
conflict.models(function(err, source, target) {
if (err) return cb(err);
// We sync ClientA->Server first
expect(conflict.SourceModel.modelName)
.to.equal(ClientB.modelName);
var m = new conflict.SourceModel(target);
m.save(cb);
});
// We sync ClientA->Server first
expect(conflict.SourceModel.modelName)
.to.equal(ClientB.modelName);
conflict.resolveUsingTarget(cb);
},
done);
});
it('handles conflict resolved manually', function(done) {
testResolvedConflictIsHandledWithNoMoreConflicts(
it('handles UPDATE conflict resolved manually', function(done) {
testUpdateConflictIsResolved(
function resolveManually(conflict, cb) {
conflict.models(function(err, source, target) {
if (err) return cb(err);
var m = new conflict.SourceModel(source || target);
m.name = 'manual';
m.save(function(err) {
if (err) return cb(err);
conflict.resolve(function(err) {
if (err) return cb(err);
cb();
});
});
});
conflict.resolveManually({ name: 'manual' }, cb);
},
done);
});
it('handles DELETE conflict resolved using "ours"', function(done) {
testDeleteConflictIsResolved(
function resolveUsingOurs(conflict, cb) {
conflict.resolveUsingSource(cb);
},
done);
});
it('handles DELETE conflict resolved using "theirs"', function(done) {
testDeleteConflictIsResolved(
function resolveUsingTheirs(conflict, cb) {
// We sync ClientA->Server first
expect(conflict.SourceModel.modelName)
.to.equal(ClientB.modelName);
conflict.resolveUsingTarget(cb);
},
done);
});
it('handles DELETE conflict resolved as manual delete', function(done) {
testDeleteConflictIsResolved(
function resolveManually(conflict, cb) {
conflict.resolveManually(null, cb);
},
done);
});
it('handles DELETE conflict resolved manually', function(done) {
testDeleteConflictIsResolved(
function resolveManually(conflict, cb) {
conflict.resolveManually({ name: 'manual' }, cb);
},
done);
});
});
function testResolvedConflictIsHandledWithNoMoreConflicts(resolver, cb) {
function testUpdateConflictIsResolved(resolver, cb) {
async.series([
// sync the new model to ClientB
sync(ClientB, Server),
verifyInstanceWasReplicated(ClientA, ClientB),
verifyInstanceWasReplicated(ClientA, ClientB, sourceInstanceId),
// ClientA makes a change
updateSourceInstanceNameTo('a'),
@ -924,7 +1057,45 @@ describe('Replication / Change APIs', function() {
// and sync back to ClientA too
sync(ClientA, Server),
verifyInstanceWasReplicated(ClientB, ClientA)
verifyInstanceWasReplicated(ClientB, ClientA, sourceInstanceId)
], cb);
}
function testDeleteConflictIsResolved(resolver, cb) {
async.series([
// sync the new model to ClientB
sync(ClientB, Server),
verifyInstanceWasReplicated(ClientA, ClientB, sourceInstanceId),
// ClientA makes a change
function deleteInstanceOnClientA(next) {
ClientA.deleteById(sourceInstanceId, next);
},
sync(ClientA, Server),
// ClientB changes the same instance
updateClientB('b'),
function syncAndResolveConflict(next) {
replicate(ClientB, Server, function(err, conflicts, cps) {
if (err) return next(err);
expect(conflicts).to.have.length(1);
expect(conflicts[0].SourceModel.modelName)
.to.equal(ClientB.modelName);
debug('Resolving the conflict %j', conflicts[0]);
resolver(conflicts[0], next);
});
},
// repeat the last sync, it should pass now
sync(ClientB, Server),
// and sync back to ClientA too
sync(ClientA, Server),
verifyInstanceWasReplicated(ClientB, ClientA, sourceInstanceId)
], cb);
}
@ -953,6 +1124,7 @@ describe('Replication / Change APIs', function() {
function updateSourceInstanceNameTo(value) {
return function updateInstance(next) {
debug('update source instance name to %j', value);
sourceInstance.name = value;
sourceInstance.save(next);
};
@ -960,6 +1132,7 @@ describe('Replication / Change APIs', function() {
function deleteSourceInstance(value) {
return function deleteInstance(next) {
debug('delete source instance', value);
sourceInstance.remove(function(err) {
sourceInstance = null;
next(err);
@ -978,21 +1151,6 @@ describe('Replication / Change APIs', function() {
});
};
}
function verifyInstanceWasReplicated(source, target) {
return function verify(next) {
source.findById(sourceInstanceId, function(err, expected) {
if (err) return next(err);
target.findById(sourceInstanceId, function(err, actual) {
if (err) return next(err);
expect(actual && actual.toObject())
.to.eql(expected && expected.toObject());
debug('replicated instance: %j', actual);
next();
});
});
};
}
});
var _since = {};
@ -1021,6 +1179,12 @@ describe('Replication / Change APIs', function() {
});
}
function createModel(Model, data) {
return function create(next) {
Model.create(data, next);
};
}
function replicateExpectingSuccess(source, target, since) {
if (!source) source = SourceModel;
if (!target) target = TargetModel;
@ -1037,6 +1201,37 @@ describe('Replication / Change APIs', function() {
};
}
function setupRaceConditionInReplication(fn) {
var bulkUpdate = TargetModel.bulkUpdate;
TargetModel.bulkUpdate = function(data, cb) {
// simulate the situation when a 3rd party modifies the database
// while a replication run is in progress
var self = this;
fn(function(err) {
if (err) return cb(err);
bulkUpdate.call(self, data, cb);
});
// apply the 3rd party modification only once
TargetModel.bulkUpdate = bulkUpdate;
};
}
function verifyInstanceWasReplicated(source, target, id) {
return function verify(next) {
source.findById(id, function(err, expected) {
if (err) return next(err);
target.findById(id, function(err, actual) {
if (err) return next(err);
expect(actual && actual.toObject())
.to.eql(expected && expected.toObject());
debug('replicated instance: %j', actual);
next();
});
});
};
}
function spyAndStoreSinceArg(Model, methodName, store) {
var orig = Model[methodName];
Model[methodName] = function(since) {

View File

@ -842,6 +842,91 @@ describe('User', function() {
});
});
it('Verify a user\'s email address with custom token generator', function(done) {
User.afterRemote('create', function(ctx, user, next) {
assert(user, 'afterRemote should include result');
var options = {
type: 'email',
to: user.email,
from: 'noreply@myapp.org',
redirect: '/',
protocol: ctx.req.protocol,
host: ctx.req.get('host'),
generateVerificationToken: function(user, cb) {
assert(user);
assert.equal(user.email, 'bar@bat.com');
assert(cb);
assert.equal(typeof cb, 'function');
// let's ensure async execution works on this one
process.nextTick(function() {
cb(null, 'token-123456');
});
}
};
user.verify(options, function(err, result) {
assert(result.email);
assert(result.email.response);
assert(result.token);
assert.equal(result.token, 'token-123456');
var msg = result.email.response.toString('utf-8');
assert(~msg.indexOf('token-123456'));
done();
});
});
request(app)
.post('/users')
.expect('Content-Type', /json/)
.expect(200)
.send({email: 'bar@bat.com', password: 'bar'})
.end(function(err, res) {
if (err) {
return done(err);
}
});
});
it('Fails if custom token generator returns error', function(done) {
User.afterRemote('create', function(ctx, user, next) {
assert(user, 'afterRemote should include result');
var options = {
type: 'email',
to: user.email,
from: 'noreply@myapp.org',
redirect: '/',
protocol: ctx.req.protocol,
host: ctx.req.get('host'),
generateVerificationToken: function(user, cb) {
// let's ensure async execution works on this one
process.nextTick(function() {
cb(new Error('Fake error'));
});
}
};
user.verify(options, function(err, result) {
assert(err);
assert.equal(err.message, 'Fake error');
assert.equal(result, undefined);
done();
});
});
request(app)
.post('/users')
.expect('Content-Type', /json/)
.expect(200)
.send({email: 'bar@bat.com', password: 'bar'})
.end(function(err, res) {
if (err) {
return done(err);
}
});
});
});
describe('User.confirm(options, fn)', function() {