Checkpoint speedup

This commit is contained in:
Amir Jafarian 2015-12-17 12:02:35 -05:00
parent 5974c6afdf
commit 8deec2e89a
3 changed files with 99 additions and 46 deletions

View File

@ -27,43 +27,45 @@ module.exports = function(Checkpoint) {
* Get the current checkpoint id
* @callback {Function} callback
* @param {Error} err
* @param {Number} checkpointId The current checkpoint id
* @param {Number} checkpoint The current checkpoint seq
*/
Checkpoint.current = function(cb) {
var Checkpoint = this;
this.find({
limit: 1,
order: 'seq DESC'
}, function(err, checkpoints) {
if (err) return cb(err);
var checkpoint = checkpoints[0];
if (checkpoint) {
cb(null, checkpoint.seq);
} else {
Checkpoint.create({ seq: 1 }, function(err, checkpoint) {
if (err) return cb(err);
cb(null, checkpoint.seq);
});
}
Checkpoint._getSingleton(function(err, cp) {
cb(err, cp.seq);
});
};
Checkpoint.observe('before save', function(ctx, next) {
if (!ctx.instance) {
// Example: Checkpoint.updateAll() and Checkpoint.updateOrCreate()
return next(new Error('Checkpoint does not support partial updates.'));
}
Checkpoint._getSingleton = function(cb) {
var query = {limit: 1}; // match all instances, return only one
var initialData = {seq: 1};
this.findOrCreate(query, initialData, cb);
};
var model = ctx.instance;
if (!model.getId() && model.seq === undefined) {
model.constructor.current(function(err, seq) {
if (err) return next(err);
model.seq = seq + 1;
next();
/**
* Increase the current checkpoint if it already exists otherwise initialize it
* @callback {Function} callback
* @param {Error} err
* @param {Object} checkpoint The current checkpoint
*/
Checkpoint.bumpLastSeq = function(cb) {
var Checkpoint = this;
Checkpoint._getSingleton(function(err, cp) {
if (err) return cb(err);
var originalSeq = cp.seq;
cp.seq++;
// Update the checkpoint but only if it was not changed under our hands
Checkpoint.updateAll({id: cp.id, seq: originalSeq}, {seq: cp.seq}, function(err, info) {
if (err) return cb(err);
// possible outcomes
// 1) seq was updated to seq+1 - exactly what we wanted!
// 2) somebody else already updated seq to seq+1 and our call was a no-op.
// That should be ok, checkpoints are time based, so we reuse the one created just now
// 3) seq was bumped more than once, so we will be using a value that is behind the latest seq.
// @bajtos is not entirely sure if this is ok, but since it wasn't handled by the current implementation either,
// he thinks we can keep it this way.
cb(null, cp);
});
} else {
next();
}
});
});
};
};

View File

@ -898,12 +898,7 @@ module.exports = function(registry) {
PersistedModel.checkpoint = function(cb) {
var Checkpoint = this.getChangeModel().getCheckpointModel();
this.getSourceId(function(err, sourceId) {
if (err) return cb(err);
Checkpoint.create({
sourceId: sourceId
}, cb);
});
Checkpoint.bumpLastSeq(cb);
};
/**

View File

@ -1,20 +1,22 @@
var async = require('async');
var loopback = require('../');
var expect = require('chai').expect;
// create a unique Checkpoint model
var Checkpoint = loopback.Checkpoint.extend('TestCheckpoint');
var memory = loopback.createDataSource({
connector: loopback.Memory
});
Checkpoint.attachTo(memory);
describe('Checkpoint', function() {
describe('current()', function() {
describe('bumpLastSeq() and current()', function() {
beforeEach(function() {
var memory = loopback.createDataSource({
connector: loopback.Memory
});
Checkpoint.attachTo(memory);
});
it('returns the highest `seq` value', function(done) {
async.series([
Checkpoint.create.bind(Checkpoint),
Checkpoint.create.bind(Checkpoint),
Checkpoint.bumpLastSeq.bind(Checkpoint),
Checkpoint.bumpLastSeq.bind(Checkpoint),
function(next) {
Checkpoint.current(function(err, seq) {
if (err) next(err);
@ -24,5 +26,59 @@ describe('Checkpoint', function() {
}
], done);
});
it('Should be no race condition for current() when calling in parallel', function(done) {
async.parallel([
function(next) { Checkpoint.current(next); },
function(next) { Checkpoint.current(next); }
], function(err, list) {
if (err) return done(err);
Checkpoint.find(function(err, data) {
if (err) return done(err);
expect(data).to.have.length(1);
done();
});
});
});
it('Should be no race condition for bumpLastSeq() when calling in parallel', function(done) {
async.parallel([
function(next) { Checkpoint.bumpLastSeq(next); },
function(next) { Checkpoint.bumpLastSeq(next); }
], function(err, list) {
if (err) return done(err);
Checkpoint.find(function(err, data) {
if (err) return done(err);
// The invariant "we have at most 1 checkpoint instance" is preserved
// even when multiple calls are made in parallel
expect(data).to.have.length(1);
// There is a race condition here, we could end up with both 2 or 3 as the "seq".
// The current implementation of the memory connector always yields 2 though.
expect(data[0].seq).to.equal(2);
// In this particular case, since the new last seq is always 2, both results
// should be 2.
expect(list.map(function(it) {return it.seq;}))
.to.eql([2, 2]);
done();
});
});
});
it('Checkpoint.current() for non existing checkpoint should initialize checkpoint', function(done) {
Checkpoint.current(function(err, seq) {
expect(seq).to.equal(1);
done(err);
});
});
it('bumpLastSeq() works when singleton instance does not exists yet', function(done) {
Checkpoint.bumpLastSeq(function(err, cp) {
// We expect `seq` to be 2 since `checkpoint` does not exist and
// `bumpLastSeq` for the first time not only initializes it to one,
// but also increments the initialized value by one.
expect(cp.seq).to.equal(2);
done(err);
});
});
});
});