diff --git a/lib/persisted-model.js b/lib/persisted-model.js index d5f405f8..db066e1d 100644 --- a/lib/persisted-model.js +++ b/lib/persisted-model.js @@ -15,6 +15,7 @@ var deprecated = require('depd')('loopback'); var debug = require('debug')('loopback:persisted-model'); var PassThrough = require('stream').PassThrough; var utils = require('./utils'); +var REPLICATION_CHUNK_SIZE = -1; module.exports = function(registry) { var Model = registry.getModel('Model'); @@ -1192,6 +1193,11 @@ module.exports = function(registry) { var Change = sourceModel.getChangeModel(); var TargetChange = targetModel.getChangeModel(); var changeTrackingEnabled = Change && TargetChange; + var replicationChunkSize = REPLICATION_CHUNK_SIZE; + + if (sourceModel.settings && sourceModel.settings.replicationChunkSize) { + replicationChunkSize = sourceModel.settings.replicationChunkSize; + } assert( changeTrackingEnabled, @@ -1211,7 +1217,13 @@ module.exports = function(registry) { async.waterfall(tasks, done); function getSourceChanges(cb) { - sourceModel.changes(since.source, options.filter, debug.enabled ? log : cb); + utils.downloadInChunks( + options.filter, + replicationChunkSize, + function(filter, pagingCallback) { + sourceModel.changes(since.source, filter, pagingCallback); + }, + debug.enabled ? log : cb); function log(err, result) { if (err) return cb(err); @@ -1222,7 +1234,13 @@ module.exports = function(registry) { } function getDiffFromTarget(sourceChanges, cb) { - targetModel.diff(since.target, sourceChanges, debug.enabled ? log : cb); + utils.uploadInChunks( + sourceChanges, + replicationChunkSize, + function(smallArray, chunkCallback) { + return targetModel.diff(since.target, smallArray, chunkCallback); + }, + debug.enabled ? log : cb); function log(err, result) { if (err) return cb(err); @@ -1241,9 +1259,16 @@ module.exports = function(registry) { function createSourceUpdates(_diff, cb) { diff = _diff; diff.conflicts = diff.conflicts || []; + if (diff && diff.deltas && diff.deltas.length) { debug('\tbuilding a list of updates'); - sourceModel.createUpdates(diff.deltas, cb); + utils.uploadInChunks( + diff.deltas, + replicationChunkSize, + function(smallArray, chunkCallback) { + return sourceModel.createUpdates(smallArray, chunkCallback); + }, + cb); } else { // nothing to replicate done(); @@ -1253,20 +1278,29 @@ module.exports = function(registry) { function bulkUpdate(_updates, cb) { debug('\tstarting bulk update'); updates = _updates; - targetModel.bulkUpdate(updates, options, function(err) { - var conflicts = err && err.details && err.details.conflicts; - if (conflicts && err.statusCode == 409) { - diff.conflicts = conflicts; - // filter out updates that were not applied - updates = updates.filter(function(u) { - return conflicts - .filter(function(d) { return d.modelId === u.change.modelId; }) - .length === 0; + utils.uploadInChunks( + updates, + replicationChunkSize, + function(smallArray, chunkCallback) { + return targetModel.bulkUpdate(smallArray, options, function(err) { + // bulk update is a special case where we want to process all chunks and aggregate all errors + chunkCallback(null, err); }); - return cb(); - } - cb(err); - }); + }, + function(notUsed, err) { + var conflicts = err && err.details && err.details.conflicts; + if (conflicts && err.statusCode == 409) { + diff.conflicts = conflicts; + // filter out updates that were not applied + updates = updates.filter(function(u) { + return conflicts + .filter(function(d) { return d.modelId === u.change.modelId; }) + .length === 0; + }); + return cb(); + } + cb(err); + }); } function checkpoints() { diff --git a/lib/utils.js b/lib/utils.js index 6e1272d9..462b2ae4 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -4,8 +4,14 @@ // License text available at https://opensource.org/licenses/MIT 'use strict'; + exports.createPromiseCallback = createPromiseCallback; +exports.uploadInChunks = uploadInChunks; +exports.downloadInChunks = downloadInChunks; +exports.concatResults = concatResults; + var Promise = require('bluebird'); +var async = require('async'); function createPromiseCallback() { var cb; @@ -18,3 +24,117 @@ function createPromiseCallback() { cb.promise = promise; return cb; } + +function throwPromiseNotDefined() { + throw new Error( + 'Your Node runtime does support ES6 Promises. ' + + 'Set "global.Promise" to your preferred implementation of promises.'); +} + +/** + * Divide an async call with large array into multiple calls using smaller chunks + * @param {Array} largeArray - the large array to be chunked + * @param {Number} chunkSize - size of each chunks + * @param {Function} processFunction - the function to be called multiple times + * @param {Function} cb - the callback + */ +function uploadInChunks(largeArray, chunkSize, processFunction, cb) { + var chunkArrays = []; + + if (!chunkSize || chunkSize < 1 || largeArray.length <= chunkSize) { + // if chunking not required + processFunction(largeArray, cb); + } else { + // copying so that the largeArray object does not get affected during splice + var copyOfLargeArray = [].concat(largeArray); + + // chunking to smaller arrays + while (copyOfLargeArray.length > 0) { + chunkArrays.push(copyOfLargeArray.splice(0, chunkSize)); + } + + var tasks = chunkArrays.map(function(chunkArray) { + return function(previousResults, chunkCallback) { + var lastArg = arguments[arguments.length - 1]; + + if (typeof lastArg === 'function') { + chunkCallback = lastArg; + } + + processFunction(chunkArray, function(err, results) { + if (err) { + return chunkCallback(err); + } + + // if this is the first async waterfall call or if previous results was not defined + if (typeof previousResults === 'function' || typeof previousResults === 'undefined' || + previousResults === null) { + previousResults = results; + } else if (results) { + previousResults = concatResults(previousResults, results); + } + + chunkCallback(err, previousResults); + }); + }; + }); + + async.waterfall(tasks, cb); + } +} + +/** + * Page async download calls + * @param {Object} filter - filter object used for the async call + * @param {Number} chunkSize - size of each chunks + * @param {Function} processFunction - the function to be called multiple times + * @param {Function} cb - the callback + */ +function downloadInChunks(filter, chunkSize, processFunction, cb) { + var results = []; + filter = filter ? JSON.parse(JSON.stringify(filter)) : {}; + + if (!chunkSize || chunkSize < 1) { + // if chunking not required + processFunction(filter, cb); + } else { + filter.skip = 0; + filter.limit = chunkSize; + + processFunction(JSON.parse(JSON.stringify(filter)), pageAndConcatResults); + } + + function pageAndConcatResults(err, pagedResults) { + if (err) { + return cb(err); + } else { + results = concatResults(results, pagedResults); + if (pagedResults.length >= chunkSize) { + filter.skip += pagedResults.length; + processFunction(JSON.parse(JSON.stringify(filter)), pageAndConcatResults); + } else { + cb(null, results); + } + } + } +} + +/** + * Concat current results into previous results + * Assumption made here that the previous results and current results are homogeneous + * @param {Object|Array} previousResults + * @param {Object|Array} currentResults + */ +function concatResults(previousResults, currentResults) { + if (Array.isArray(currentResults)) { + previousResults = previousResults.concat(currentResults); + } else if (typeof currentResults === 'object') { + Object.keys(currentResults).forEach(function(key) { + previousResults[key] = concatResults(previousResults[key], currentResults[key]); + }); + } else { + previousResults = currentResults; + } + + return previousResults; +} diff --git a/test/replication.test.js b/test/replication.test.js index c57fd5ec..191f4186 100644 --- a/test/replication.test.js +++ b/test/replication.test.js @@ -297,37 +297,6 @@ describe('Replication / Change APIs', function() { }); describe('Model.replicate(since, targetModel, options, callback)', function() { - function assertTargetModelEqualsSourceModel(conflicts, sourceModel, - targetModel, done) { - var sourceData, targetData; - - assert(conflicts.length === 0); - async.parallel([ - function(cb) { - sourceModel.find(function(err, result) { - if (err) return cb(err); - - sourceData = result; - cb(); - }); - }, - function(cb) { - targetModel.find(function(err, result) { - if (err) return cb(err); - - targetData = result; - cb(); - }); - }, - ], function(err) { - if (err) return done(err); - - assert.deepEqual(sourceData, targetData); - - done(); - }); - } - it('Replicate data using the target model', function(done) { var test = this; var options = {}; @@ -1695,6 +1664,111 @@ describe('Replication / Change APIs', function() { }); }); + describe('Replication with chunking', function() { + beforeEach(function() { + var test = this; + SourceModel = this.SourceModel = PersistedModel.extend( + 'SourceModel-' + tid, + {id: {id: true, type: String, defaultFn: 'guid'}}, + {trackChanges: true, replicationChunkSize: 1}); + + SourceModel.attachTo(dataSource); + + TargetModel = this.TargetModel = PersistedModel.extend( + 'TargetModel-' + tid, + {id: {id: true, type: String, defaultFn: 'guid'}}, + {trackChanges: true, replicationChunkSize: 1}); + + var TargetChange = TargetModel.Change; + TargetChange.Checkpoint = loopback.Checkpoint.extend('TargetCheckpoint'); + TargetChange.Checkpoint.attachTo(dataSource); + + TargetModel.attachTo(dataSource); + + test.startingCheckpoint = -1; + }); + + describe('Model.replicate(since, targetModel, options, callback)', function() { + it('calls bulkUpdate multiple times', function(done) { + var test = this; + var options = {}; + var calls = mockBulkUpdate(TargetModel); + + SourceModel.create([{name: 'foo'}, {name: 'bar'}], function(err) { + if (err) return done(err); + + test.SourceModel.replicate(test.startingCheckpoint, test.TargetModel, + options, function(err, conflicts) { + if (err) return done(err); + + assertTargetModelEqualsSourceModel(conflicts, test.SourceModel, + test.TargetModel, done); + expect(calls.length).to.eql(2); + }); + }); + }); + }); + }); + + describe('Replication without chunking', function() { + beforeEach(function() { + var test = this; + SourceModel = this.SourceModel = PersistedModel.extend( + 'SourceModel-' + tid, + {id: {id: true, type: String, defaultFn: 'guid'}}, + {trackChanges: true}); + + SourceModel.attachTo(dataSource); + + TargetModel = this.TargetModel = PersistedModel.extend( + 'TargetModel-' + tid, + {id: {id: true, type: String, defaultFn: 'guid'}}, + {trackChanges: true}); + + var TargetChange = TargetModel.Change; + TargetChange.Checkpoint = loopback.Checkpoint.extend('TargetCheckpoint'); + TargetChange.Checkpoint.attachTo(dataSource); + + TargetModel.attachTo(dataSource); + + test.startingCheckpoint = -1; + }); + + describe('Model.replicate(since, targetModel, options, callback)', function() { + it('calls bulkUpdate only once', function(done) { + var test = this; + var options = {}; + var calls = mockBulkUpdate(TargetModel); + + SourceModel.create([{name: 'foo'}, {name: 'bar'}], function(err) { + if (err) return done(err); + + test.SourceModel.replicate(test.startingCheckpoint, test.TargetModel, + options, function(err, conflicts) { + if (err) return done(err); + + assertTargetModelEqualsSourceModel(conflicts, test.SourceModel, + test.TargetModel, done); + expect(calls.length).to.eql(1); + }); + }); + }); + }); + }); + + function mockBulkUpdate(modelToMock) { + var calls = []; + + var originalBulkUpdateFunction = modelToMock.bulkUpdate; + + modelToMock.bulkUpdate = function(since, filter, callback) { + calls.push('bulkUpdate'); + originalBulkUpdateFunction.call(this, since, filter, callback); + }; + + return calls; + } + var _since = {}; function replicate(source, target, since, next) { if (typeof since === 'function') { @@ -1799,4 +1873,35 @@ describe('Replication / Change APIs', function() { function getIds(list) { return getPropValue(list, 'id'); } + + function assertTargetModelEqualsSourceModel(conflicts, sourceModel, + targetModel, done) { + var sourceData, targetData; + + assert(conflicts.length === 0); + async.parallel([ + function(cb) { + sourceModel.find(function(err, result) { + if (err) return cb(err); + + sourceData = result; + cb(); + }); + }, + function(cb) { + targetModel.find(function(err, result) { + if (err) return cb(err); + + targetData = result; + cb(); + }); + }, + ], function(err) { + if (err) return done(err); + + assert.deepEqual(sourceData, targetData); + + done(); + }); + } }); diff --git a/test/utils.test.js b/test/utils.test.js new file mode 100644 index 00000000..fc16e413 --- /dev/null +++ b/test/utils.test.js @@ -0,0 +1,117 @@ +// Copyright IBM Corp. 2013,2016. All Rights Reserved. +// Node module: loopback +// This file is licensed under the MIT License. +// License text available at https://opensource.org/licenses/MIT + +'use strict'; + +var utils = require('../lib/utils'); +var assert = require('assert'); + +describe('Utils', function() { + describe('uploadInChunks', function() { + it('calls process function for each chunk', function(done) { + var largeArray = ['item1', 'item2', 'item3']; + var calls = []; + + utils.uploadInChunks(largeArray, 1, function processFunction(array, cb) { + calls.push(array); + cb(); + }, function finished(err) { + if (err) return done(err); + assert.deepEqual(calls, [['item1'], ['item2'], ['item3']]); + done(); + }); + }); + + it('calls process function only once when array is smaller than chunk size', function(done) { + var largeArray = ['item1', 'item2']; + var calls = []; + + utils.uploadInChunks(largeArray, 3, function processFunction(array, cb) { + calls.push(array); + cb(); + }, function finished(err) { + if (err) return done(err); + assert.deepEqual(calls, [['item1', 'item2']]); + done(); + }); + }); + + it('concats results from each call to the process function', function(done) { + var largeArray = ['item1', 'item2', 'item3', 'item4']; + + utils.uploadInChunks(largeArray, 2, function processFunction(array, cb) { + cb(null, array); + }, function finished(err, results) { + if (err) return done(err); + assert.deepEqual(results, ['item1', 'item2', 'item3', 'item4']); + done(); + }); + }); + }); + + describe('downloadInChunks', function() { + var largeArray, calls, chunkSize, skip; + + beforeEach(function() { + largeArray = ['item1', 'item2', 'item3']; + calls = []; + chunkSize = 2; + skip = 0; + }); + + function processFunction(filter, cb) { + calls.push(Object.assign({}, filter)); + var results = []; + + for (var i = 0; i < chunkSize; i++) { + if (largeArray[skip + i]) { + results.push(largeArray[skip + i]); + } + } + + skip += chunkSize; + cb(null, results); + } + + it('calls process function with the correct filter', function(done) { + var expectedFilters = [{skip: 0, limit: chunkSize}, {skip: chunkSize, limit: chunkSize}]; + utils.downloadInChunks({}, chunkSize, processFunction, function finished(err) { + if (err) return done(err); + assert.deepEqual(calls, expectedFilters); + done(); + }); + }); + + it('concats the results of all calls of the process function', function(done) { + utils.downloadInChunks({}, chunkSize, processFunction, function finished(err, results) { + if (err) return done(err); + assert.deepEqual(results, largeArray); + done(); + }); + }); + }); + + describe('concatResults', function() { + it('concats regular arrays', function() { + var array1 = ['item1', 'item2']; + var array2 = ['item3', 'item4']; + + var concatResults = utils.concatResults(array1, array2); + assert.deepEqual(concatResults, ['item1', 'item2', 'item3', 'item4']); + }); + + it('concats objects containing arrays', function() { + var object1 = {deltas: [{change: 'change 1'}], conflict: []}; + var object2 = {deltas: [{change: 'change 2'}], conflict: [{conflict: 'conflict 1'}]}; + var expectedResults = { + deltas: [{change: 'change 1'}, {change: 'change 2'}], + conflict: [{conflict: 'conflict 1'}], + }; + + var concatResults = utils.concatResults(object1, object2); + assert.deepEqual(concatResults, expectedResults); + }); + }); +});