From 8deec2e89aeea65dfbd8a9f43e2dc15a11faef15 Mon Sep 17 00:00:00 2001 From: Amir Jafarian Date: Thu, 17 Dec 2015 12:02:35 -0500 Subject: [PATCH] Checkpoint speedup --- common/models/checkpoint.js | 64 ++++++++++++++++---------------- lib/persisted-model.js | 7 +--- test/checkpoint.test.js | 74 ++++++++++++++++++++++++++++++++----- 3 files changed, 99 insertions(+), 46 deletions(-) diff --git a/common/models/checkpoint.js b/common/models/checkpoint.js index 304f8314..59cc33de 100644 --- a/common/models/checkpoint.js +++ b/common/models/checkpoint.js @@ -27,43 +27,45 @@ module.exports = function(Checkpoint) { * Get the current checkpoint id * @callback {Function} callback * @param {Error} err - * @param {Number} checkpointId The current checkpoint id + * @param {Number} checkpoint The current checkpoint seq */ - Checkpoint.current = function(cb) { var Checkpoint = this; - this.find({ - limit: 1, - order: 'seq DESC' - }, function(err, checkpoints) { - if (err) return cb(err); - var checkpoint = checkpoints[0]; - if (checkpoint) { - cb(null, checkpoint.seq); - } else { - Checkpoint.create({ seq: 1 }, function(err, checkpoint) { - if (err) return cb(err); - cb(null, checkpoint.seq); - }); - } + Checkpoint._getSingleton(function(err, cp) { + cb(err, cp.seq); }); }; - Checkpoint.observe('before save', function(ctx, next) { - if (!ctx.instance) { - // Example: Checkpoint.updateAll() and Checkpoint.updateOrCreate() - return next(new Error('Checkpoint does not support partial updates.')); - } + Checkpoint._getSingleton = function(cb) { + var query = {limit: 1}; // match all instances, return only one + var initialData = {seq: 1}; + this.findOrCreate(query, initialData, cb); + }; - var model = ctx.instance; - if (!model.getId() && model.seq === undefined) { - model.constructor.current(function(err, seq) { - if (err) return next(err); - model.seq = seq + 1; - next(); + /** + * Increase the current checkpoint if it already exists otherwise initialize it + * @callback {Function} callback + * @param {Error} err + * @param {Object} checkpoint The current checkpoint + */ + Checkpoint.bumpLastSeq = function(cb) { + var Checkpoint = this; + Checkpoint._getSingleton(function(err, cp) { + if (err) return cb(err); + var originalSeq = cp.seq; + cp.seq++; + // Update the checkpoint but only if it was not changed under our hands + Checkpoint.updateAll({id: cp.id, seq: originalSeq}, {seq: cp.seq}, function(err, info) { + if (err) return cb(err); + // possible outcomes + // 1) seq was updated to seq+1 - exactly what we wanted! + // 2) somebody else already updated seq to seq+1 and our call was a no-op. + // That should be ok, checkpoints are time based, so we reuse the one created just now + // 3) seq was bumped more than once, so we will be using a value that is behind the latest seq. + // @bajtos is not entirely sure if this is ok, but since it wasn't handled by the current implementation either, + // he thinks we can keep it this way. + cb(null, cp); }); - } else { - next(); - } - }); + }); + }; }; diff --git a/lib/persisted-model.js b/lib/persisted-model.js index ab17ef16..bcac8e18 100644 --- a/lib/persisted-model.js +++ b/lib/persisted-model.js @@ -898,12 +898,7 @@ module.exports = function(registry) { PersistedModel.checkpoint = function(cb) { var Checkpoint = this.getChangeModel().getCheckpointModel(); - this.getSourceId(function(err, sourceId) { - if (err) return cb(err); - Checkpoint.create({ - sourceId: sourceId - }, cb); - }); + Checkpoint.bumpLastSeq(cb); }; /** diff --git a/test/checkpoint.test.js b/test/checkpoint.test.js index 98e248ef..c824d02e 100644 --- a/test/checkpoint.test.js +++ b/test/checkpoint.test.js @@ -1,20 +1,22 @@ var async = require('async'); var loopback = require('../'); +var expect = require('chai').expect; -// create a unique Checkpoint model var Checkpoint = loopback.Checkpoint.extend('TestCheckpoint'); -var memory = loopback.createDataSource({ - connector: loopback.Memory -}); -Checkpoint.attachTo(memory); - describe('Checkpoint', function() { - describe('current()', function() { + describe('bumpLastSeq() and current()', function() { + beforeEach(function() { + var memory = loopback.createDataSource({ + connector: loopback.Memory + }); + Checkpoint.attachTo(memory); + }); + it('returns the highest `seq` value', function(done) { async.series([ - Checkpoint.create.bind(Checkpoint), - Checkpoint.create.bind(Checkpoint), + Checkpoint.bumpLastSeq.bind(Checkpoint), + Checkpoint.bumpLastSeq.bind(Checkpoint), function(next) { Checkpoint.current(function(err, seq) { if (err) next(err); @@ -24,5 +26,59 @@ describe('Checkpoint', function() { } ], done); }); + + it('Should be no race condition for current() when calling in parallel', function(done) { + async.parallel([ + function(next) { Checkpoint.current(next); }, + function(next) { Checkpoint.current(next); } + ], function(err, list) { + if (err) return done(err); + Checkpoint.find(function(err, data) { + if (err) return done(err); + expect(data).to.have.length(1); + done(); + }); + }); + }); + + it('Should be no race condition for bumpLastSeq() when calling in parallel', function(done) { + async.parallel([ + function(next) { Checkpoint.bumpLastSeq(next); }, + function(next) { Checkpoint.bumpLastSeq(next); } + ], function(err, list) { + if (err) return done(err); + Checkpoint.find(function(err, data) { + if (err) return done(err); + // The invariant "we have at most 1 checkpoint instance" is preserved + // even when multiple calls are made in parallel + expect(data).to.have.length(1); + // There is a race condition here, we could end up with both 2 or 3 as the "seq". + // The current implementation of the memory connector always yields 2 though. + expect(data[0].seq).to.equal(2); + // In this particular case, since the new last seq is always 2, both results + // should be 2. + expect(list.map(function(it) {return it.seq;})) + .to.eql([2, 2]); + done(); + }); + }); + }); + + it('Checkpoint.current() for non existing checkpoint should initialize checkpoint', function(done) { + Checkpoint.current(function(err, seq) { + expect(seq).to.equal(1); + done(err); + }); + }); + + it('bumpLastSeq() works when singleton instance does not exists yet', function(done) { + Checkpoint.bumpLastSeq(function(err, cp) { + // We expect `seq` to be 2 since `checkpoint` does not exist and + // `bumpLastSeq` for the first time not only initializes it to one, + // but also increments the initialized value by one. + expect(cp.seq).to.equal(2); + done(err); + }); + }); }); });