Optimise replication

Add a new model-level setting "replicationChunkSize" which allows
users to configure change replication algorithm to issue several
smaller requests to fetch changes and upload updates.
This commit is contained in:
kobaska 2016-11-21 19:48:43 +11:00 committed by Miroslav Bajtoš
parent 92317e811a
commit 7078c5d0e5
No known key found for this signature in database
GPG Key ID: 797723F23CE0A94A
4 changed files with 423 additions and 47 deletions

View File

@ -15,6 +15,7 @@ var deprecated = require('depd')('loopback');
var debug = require('debug')('loopback:persisted-model');
var PassThrough = require('stream').PassThrough;
var utils = require('./utils');
var REPLICATION_CHUNK_SIZE = -1;
module.exports = function(registry) {
var Model = registry.getModel('Model');
@ -1192,6 +1193,11 @@ module.exports = function(registry) {
var Change = sourceModel.getChangeModel();
var TargetChange = targetModel.getChangeModel();
var changeTrackingEnabled = Change && TargetChange;
var replicationChunkSize = REPLICATION_CHUNK_SIZE;
if (sourceModel.settings && sourceModel.settings.replicationChunkSize) {
replicationChunkSize = sourceModel.settings.replicationChunkSize;
}
assert(
changeTrackingEnabled,
@ -1211,7 +1217,13 @@ module.exports = function(registry) {
async.waterfall(tasks, done);
function getSourceChanges(cb) {
sourceModel.changes(since.source, options.filter, debug.enabled ? log : cb);
utils.downloadInChunks(
options.filter,
replicationChunkSize,
function(filter, pagingCallback) {
sourceModel.changes(since.source, filter, pagingCallback);
},
debug.enabled ? log : cb);
function log(err, result) {
if (err) return cb(err);
@ -1222,7 +1234,13 @@ module.exports = function(registry) {
}
function getDiffFromTarget(sourceChanges, cb) {
targetModel.diff(since.target, sourceChanges, debug.enabled ? log : cb);
utils.uploadInChunks(
sourceChanges,
replicationChunkSize,
function(smallArray, chunkCallback) {
return targetModel.diff(since.target, smallArray, chunkCallback);
},
debug.enabled ? log : cb);
function log(err, result) {
if (err) return cb(err);
@ -1241,9 +1259,16 @@ module.exports = function(registry) {
function createSourceUpdates(_diff, cb) {
diff = _diff;
diff.conflicts = diff.conflicts || [];
if (diff && diff.deltas && diff.deltas.length) {
debug('\tbuilding a list of updates');
sourceModel.createUpdates(diff.deltas, cb);
utils.uploadInChunks(
diff.deltas,
replicationChunkSize,
function(smallArray, chunkCallback) {
return sourceModel.createUpdates(smallArray, chunkCallback);
},
cb);
} else {
// nothing to replicate
done();
@ -1253,20 +1278,29 @@ module.exports = function(registry) {
function bulkUpdate(_updates, cb) {
debug('\tstarting bulk update');
updates = _updates;
targetModel.bulkUpdate(updates, options, function(err) {
var conflicts = err && err.details && err.details.conflicts;
if (conflicts && err.statusCode == 409) {
diff.conflicts = conflicts;
// filter out updates that were not applied
updates = updates.filter(function(u) {
return conflicts
.filter(function(d) { return d.modelId === u.change.modelId; })
.length === 0;
utils.uploadInChunks(
updates,
replicationChunkSize,
function(smallArray, chunkCallback) {
return targetModel.bulkUpdate(smallArray, options, function(err) {
// bulk update is a special case where we want to process all chunks and aggregate all errors
chunkCallback(null, err);
});
return cb();
}
cb(err);
});
},
function(notUsed, err) {
var conflicts = err && err.details && err.details.conflicts;
if (conflicts && err.statusCode == 409) {
diff.conflicts = conflicts;
// filter out updates that were not applied
updates = updates.filter(function(u) {
return conflicts
.filter(function(d) { return d.modelId === u.change.modelId; })
.length === 0;
});
return cb();
}
cb(err);
});
}
function checkpoints() {

View File

@ -4,8 +4,14 @@
// License text available at https://opensource.org/licenses/MIT
'use strict';
exports.createPromiseCallback = createPromiseCallback;
exports.uploadInChunks = uploadInChunks;
exports.downloadInChunks = downloadInChunks;
exports.concatResults = concatResults;
var Promise = require('bluebird');
var async = require('async');
function createPromiseCallback() {
var cb;
@ -18,3 +24,117 @@ function createPromiseCallback() {
cb.promise = promise;
return cb;
}
function throwPromiseNotDefined() {
throw new Error(
'Your Node runtime does support ES6 Promises. ' +
'Set "global.Promise" to your preferred implementation of promises.');
}
/**
* Divide an async call with large array into multiple calls using smaller chunks
* @param {Array} largeArray - the large array to be chunked
* @param {Number} chunkSize - size of each chunks
* @param {Function} processFunction - the function to be called multiple times
* @param {Function} cb - the callback
*/
function uploadInChunks(largeArray, chunkSize, processFunction, cb) {
var chunkArrays = [];
if (!chunkSize || chunkSize < 1 || largeArray.length <= chunkSize) {
// if chunking not required
processFunction(largeArray, cb);
} else {
// copying so that the largeArray object does not get affected during splice
var copyOfLargeArray = [].concat(largeArray);
// chunking to smaller arrays
while (copyOfLargeArray.length > 0) {
chunkArrays.push(copyOfLargeArray.splice(0, chunkSize));
}
var tasks = chunkArrays.map(function(chunkArray) {
return function(previousResults, chunkCallback) {
var lastArg = arguments[arguments.length - 1];
if (typeof lastArg === 'function') {
chunkCallback = lastArg;
}
processFunction(chunkArray, function(err, results) {
if (err) {
return chunkCallback(err);
}
// if this is the first async waterfall call or if previous results was not defined
if (typeof previousResults === 'function' || typeof previousResults === 'undefined' ||
previousResults === null) {
previousResults = results;
} else if (results) {
previousResults = concatResults(previousResults, results);
}
chunkCallback(err, previousResults);
});
};
});
async.waterfall(tasks, cb);
}
}
/**
* Page async download calls
* @param {Object} filter - filter object used for the async call
* @param {Number} chunkSize - size of each chunks
* @param {Function} processFunction - the function to be called multiple times
* @param {Function} cb - the callback
*/
function downloadInChunks(filter, chunkSize, processFunction, cb) {
var results = [];
filter = filter ? JSON.parse(JSON.stringify(filter)) : {};
if (!chunkSize || chunkSize < 1) {
// if chunking not required
processFunction(filter, cb);
} else {
filter.skip = 0;
filter.limit = chunkSize;
processFunction(JSON.parse(JSON.stringify(filter)), pageAndConcatResults);
}
function pageAndConcatResults(err, pagedResults) {
if (err) {
return cb(err);
} else {
results = concatResults(results, pagedResults);
if (pagedResults.length >= chunkSize) {
filter.skip += pagedResults.length;
processFunction(JSON.parse(JSON.stringify(filter)), pageAndConcatResults);
} else {
cb(null, results);
}
}
}
}
/**
* Concat current results into previous results
* Assumption made here that the previous results and current results are homogeneous
* @param {Object|Array} previousResults
* @param {Object|Array} currentResults
*/
function concatResults(previousResults, currentResults) {
if (Array.isArray(currentResults)) {
previousResults = previousResults.concat(currentResults);
} else if (typeof currentResults === 'object') {
Object.keys(currentResults).forEach(function(key) {
previousResults[key] = concatResults(previousResults[key], currentResults[key]);
});
} else {
previousResults = currentResults;
}
return previousResults;
}

View File

@ -297,37 +297,6 @@ describe('Replication / Change APIs', function() {
});
describe('Model.replicate(since, targetModel, options, callback)', function() {
function assertTargetModelEqualsSourceModel(conflicts, sourceModel,
targetModel, done) {
var sourceData, targetData;
assert(conflicts.length === 0);
async.parallel([
function(cb) {
sourceModel.find(function(err, result) {
if (err) return cb(err);
sourceData = result;
cb();
});
},
function(cb) {
targetModel.find(function(err, result) {
if (err) return cb(err);
targetData = result;
cb();
});
},
], function(err) {
if (err) return done(err);
assert.deepEqual(sourceData, targetData);
done();
});
}
it('Replicate data using the target model', function(done) {
var test = this;
var options = {};
@ -1695,6 +1664,111 @@ describe('Replication / Change APIs', function() {
});
});
describe('Replication with chunking', function() {
beforeEach(function() {
var test = this;
SourceModel = this.SourceModel = PersistedModel.extend(
'SourceModel-' + tid,
{id: {id: true, type: String, defaultFn: 'guid'}},
{trackChanges: true, replicationChunkSize: 1});
SourceModel.attachTo(dataSource);
TargetModel = this.TargetModel = PersistedModel.extend(
'TargetModel-' + tid,
{id: {id: true, type: String, defaultFn: 'guid'}},
{trackChanges: true, replicationChunkSize: 1});
var TargetChange = TargetModel.Change;
TargetChange.Checkpoint = loopback.Checkpoint.extend('TargetCheckpoint');
TargetChange.Checkpoint.attachTo(dataSource);
TargetModel.attachTo(dataSource);
test.startingCheckpoint = -1;
});
describe('Model.replicate(since, targetModel, options, callback)', function() {
it('calls bulkUpdate multiple times', function(done) {
var test = this;
var options = {};
var calls = mockBulkUpdate(TargetModel);
SourceModel.create([{name: 'foo'}, {name: 'bar'}], function(err) {
if (err) return done(err);
test.SourceModel.replicate(test.startingCheckpoint, test.TargetModel,
options, function(err, conflicts) {
if (err) return done(err);
assertTargetModelEqualsSourceModel(conflicts, test.SourceModel,
test.TargetModel, done);
expect(calls.length).to.eql(2);
});
});
});
});
});
describe('Replication without chunking', function() {
beforeEach(function() {
var test = this;
SourceModel = this.SourceModel = PersistedModel.extend(
'SourceModel-' + tid,
{id: {id: true, type: String, defaultFn: 'guid'}},
{trackChanges: true});
SourceModel.attachTo(dataSource);
TargetModel = this.TargetModel = PersistedModel.extend(
'TargetModel-' + tid,
{id: {id: true, type: String, defaultFn: 'guid'}},
{trackChanges: true});
var TargetChange = TargetModel.Change;
TargetChange.Checkpoint = loopback.Checkpoint.extend('TargetCheckpoint');
TargetChange.Checkpoint.attachTo(dataSource);
TargetModel.attachTo(dataSource);
test.startingCheckpoint = -1;
});
describe('Model.replicate(since, targetModel, options, callback)', function() {
it('calls bulkUpdate only once', function(done) {
var test = this;
var options = {};
var calls = mockBulkUpdate(TargetModel);
SourceModel.create([{name: 'foo'}, {name: 'bar'}], function(err) {
if (err) return done(err);
test.SourceModel.replicate(test.startingCheckpoint, test.TargetModel,
options, function(err, conflicts) {
if (err) return done(err);
assertTargetModelEqualsSourceModel(conflicts, test.SourceModel,
test.TargetModel, done);
expect(calls.length).to.eql(1);
});
});
});
});
});
function mockBulkUpdate(modelToMock) {
var calls = [];
var originalBulkUpdateFunction = modelToMock.bulkUpdate;
modelToMock.bulkUpdate = function(since, filter, callback) {
calls.push('bulkUpdate');
originalBulkUpdateFunction.call(this, since, filter, callback);
};
return calls;
}
var _since = {};
function replicate(source, target, since, next) {
if (typeof since === 'function') {
@ -1799,4 +1873,35 @@ describe('Replication / Change APIs', function() {
function getIds(list) {
return getPropValue(list, 'id');
}
function assertTargetModelEqualsSourceModel(conflicts, sourceModel,
targetModel, done) {
var sourceData, targetData;
assert(conflicts.length === 0);
async.parallel([
function(cb) {
sourceModel.find(function(err, result) {
if (err) return cb(err);
sourceData = result;
cb();
});
},
function(cb) {
targetModel.find(function(err, result) {
if (err) return cb(err);
targetData = result;
cb();
});
},
], function(err) {
if (err) return done(err);
assert.deepEqual(sourceData, targetData);
done();
});
}
});

117
test/utils.test.js Normal file
View File

@ -0,0 +1,117 @@
// Copyright IBM Corp. 2013,2016. All Rights Reserved.
// Node module: loopback
// This file is licensed under the MIT License.
// License text available at https://opensource.org/licenses/MIT
'use strict';
var utils = require('../lib/utils');
var assert = require('assert');
describe('Utils', function() {
describe('uploadInChunks', function() {
it('calls process function for each chunk', function(done) {
var largeArray = ['item1', 'item2', 'item3'];
var calls = [];
utils.uploadInChunks(largeArray, 1, function processFunction(array, cb) {
calls.push(array);
cb();
}, function finished(err) {
if (err) return done(err);
assert.deepEqual(calls, [['item1'], ['item2'], ['item3']]);
done();
});
});
it('calls process function only once when array is smaller than chunk size', function(done) {
var largeArray = ['item1', 'item2'];
var calls = [];
utils.uploadInChunks(largeArray, 3, function processFunction(array, cb) {
calls.push(array);
cb();
}, function finished(err) {
if (err) return done(err);
assert.deepEqual(calls, [['item1', 'item2']]);
done();
});
});
it('concats results from each call to the process function', function(done) {
var largeArray = ['item1', 'item2', 'item3', 'item4'];
utils.uploadInChunks(largeArray, 2, function processFunction(array, cb) {
cb(null, array);
}, function finished(err, results) {
if (err) return done(err);
assert.deepEqual(results, ['item1', 'item2', 'item3', 'item4']);
done();
});
});
});
describe('downloadInChunks', function() {
var largeArray, calls, chunkSize, skip;
beforeEach(function() {
largeArray = ['item1', 'item2', 'item3'];
calls = [];
chunkSize = 2;
skip = 0;
});
function processFunction(filter, cb) {
calls.push(Object.assign({}, filter));
var results = [];
for (var i = 0; i < chunkSize; i++) {
if (largeArray[skip + i]) {
results.push(largeArray[skip + i]);
}
}
skip += chunkSize;
cb(null, results);
}
it('calls process function with the correct filter', function(done) {
var expectedFilters = [{skip: 0, limit: chunkSize}, {skip: chunkSize, limit: chunkSize}];
utils.downloadInChunks({}, chunkSize, processFunction, function finished(err) {
if (err) return done(err);
assert.deepEqual(calls, expectedFilters);
done();
});
});
it('concats the results of all calls of the process function', function(done) {
utils.downloadInChunks({}, chunkSize, processFunction, function finished(err, results) {
if (err) return done(err);
assert.deepEqual(results, largeArray);
done();
});
});
});
describe('concatResults', function() {
it('concats regular arrays', function() {
var array1 = ['item1', 'item2'];
var array2 = ['item3', 'item4'];
var concatResults = utils.concatResults(array1, array2);
assert.deepEqual(concatResults, ['item1', 'item2', 'item3', 'item4']);
});
it('concats objects containing arrays', function() {
var object1 = {deltas: [{change: 'change 1'}], conflict: []};
var object2 = {deltas: [{change: 'change 2'}], conflict: [{conflict: 'conflict 1'}]};
var expectedResults = {
deltas: [{change: 'change 1'}, {change: 'change 2'}],
conflict: [{conflict: 'conflict 1'}],
};
var concatResults = utils.concatResults(object1, object2);
assert.deepEqual(concatResults, expectedResults);
});
});
});