kvao: add iterateKeys() and keys()
Add a core implementation of KVAO.iterateKeys() which returns an AsyncIterator, inspired by - https://github.com/tc39/proposal-async-iteration - https://www.npmjs.com/package/async-iterators This way we can safely iterate even large sets of data. Also add KVAO.keys(), a sugar API converting the result of iterateKeys() into a single array.
This commit is contained in:
parent
56aeeebfb0
commit
01ce7df60f
|
@ -71,7 +71,9 @@ KeyValueMemoryConnector.prototype._removeIfExpired = function(modelName, key) {
|
||||||
debug('Removing expired key', key);
|
debug('Removing expired key', key);
|
||||||
delete store[key];
|
delete store[key];
|
||||||
item = undefined;
|
item = undefined;
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
};
|
};
|
||||||
|
|
||||||
KeyValueMemoryConnector.prototype.get =
|
KeyValueMemoryConnector.prototype.get =
|
||||||
|
@ -154,6 +156,25 @@ function(modelName, key, options, callback) {
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
KeyValueMemoryConnector.prototype.iterateKeys =
|
||||||
|
function(modelName, filter, options, callback) {
|
||||||
|
var store = this._getStoreForModel(modelName);
|
||||||
|
var self = this;
|
||||||
|
var keys = Object.keys(store).filter(function(key) {
|
||||||
|
return !self._removeIfExpired(modelName, key);
|
||||||
|
});
|
||||||
|
|
||||||
|
debug('ITERATE KEYS %j -> %s keys', modelName, keys.length);
|
||||||
|
|
||||||
|
var ix = 0;
|
||||||
|
return {
|
||||||
|
next: function(cb) {
|
||||||
|
var value = ix < keys.length ? keys[ix++] : undefined;
|
||||||
|
setImmediate(function() { cb(null, value); });
|
||||||
|
},
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
KeyValueMemoryConnector.prototype.disconnect = function(callback) {
|
KeyValueMemoryConnector.prototype.disconnect = function(callback) {
|
||||||
if (this._cleanupTimer)
|
if (this._cleanupTimer)
|
||||||
clearInterval(this._cleanupTimer);
|
clearInterval(this._cleanupTimer);
|
||||||
|
|
|
@ -9,6 +9,8 @@ KeyValueAccessObject.get = require('./get');
|
||||||
KeyValueAccessObject.set = require('./set');
|
KeyValueAccessObject.set = require('./set');
|
||||||
KeyValueAccessObject.expire = require('./expire');
|
KeyValueAccessObject.expire = require('./expire');
|
||||||
KeyValueAccessObject.ttl = require('./ttl');
|
KeyValueAccessObject.ttl = require('./ttl');
|
||||||
|
KeyValueAccessObject.iterateKeys = require('./iterate-keys');
|
||||||
|
KeyValueAccessObject.keys = require('./keys');
|
||||||
|
|
||||||
KeyValueAccessObject.getConnector = function() {
|
KeyValueAccessObject.getConnector = function() {
|
||||||
return this.getDataSource().connector;
|
return this.getDataSource().connector;
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
'use strict';
|
||||||
|
|
||||||
|
var assert = require('assert');
|
||||||
|
var utils = require('../utils');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Asynchronously iterate all keys.
|
||||||
|
*
|
||||||
|
* @param {Object} filter An optional filter object with the following
|
||||||
|
* properties:
|
||||||
|
* - `match` - glob string to use to filter returned keys, e.g. 'userid.*'
|
||||||
|
* @param {Object} options
|
||||||
|
*
|
||||||
|
* @returns {AsyncIterator} An object implementing "next(cb) -> Promise"
|
||||||
|
* function that can be used to iterate all keys.
|
||||||
|
*
|
||||||
|
* @header KVAO.iterateKeys(filter)
|
||||||
|
*/
|
||||||
|
module.exports = function keyValueIterateKeys(filter, options) {
|
||||||
|
filter = filter || {};
|
||||||
|
options = options || {};
|
||||||
|
|
||||||
|
assert(typeof filter === 'object', 'filter must be an object');
|
||||||
|
assert(typeof options === 'object', 'options must be an object');
|
||||||
|
|
||||||
|
var iter = this.getConnector().iterateKeys(this.modelName, filter, options);
|
||||||
|
// promisify the returned iterator
|
||||||
|
return {
|
||||||
|
next: function(callback) {
|
||||||
|
callback = callback || utils.createPromiseCallback();
|
||||||
|
iter.next(callback);
|
||||||
|
return callback.promise;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
};
|
|
@ -0,0 +1,57 @@
|
||||||
|
'use strict';
|
||||||
|
|
||||||
|
var assert = require('assert');
|
||||||
|
var utils = require('../utils');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all keys.
|
||||||
|
*
|
||||||
|
* **NOTE**
|
||||||
|
* Building an in-memory array of all keys may be expensive.
|
||||||
|
* Consider using `iterateKeys` instead.
|
||||||
|
*
|
||||||
|
* @param {Object} filter An optional filter object with the following
|
||||||
|
* properties:
|
||||||
|
* - `match` - glob string to use to filter returned keys, e.g. 'userid.*'
|
||||||
|
* @param {Object} options
|
||||||
|
* @callback callback
|
||||||
|
* @param {Error=} err
|
||||||
|
* @param {[String]} keys The list of keys.
|
||||||
|
*
|
||||||
|
* @promise
|
||||||
|
*
|
||||||
|
* @header KVAO.keys(filter, callback)
|
||||||
|
*/
|
||||||
|
module.exports = function keyValueKeys(filter, options, callback) {
|
||||||
|
if (callback === undefined) {
|
||||||
|
if (typeof options === 'function') {
|
||||||
|
callback = options;
|
||||||
|
options = undefined;
|
||||||
|
} else if (options === undefined && typeof filter === 'function') {
|
||||||
|
callback = filter;
|
||||||
|
filter = undefined;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
filter = filter || {};
|
||||||
|
options = options || {};
|
||||||
|
|
||||||
|
assert(typeof filter === 'object', 'filter must be an object');
|
||||||
|
assert(typeof options === 'object', 'options must be an object');
|
||||||
|
|
||||||
|
callback = callback || utils.createPromiseCallback();
|
||||||
|
|
||||||
|
var iter = this.iterateKeys(filter, options);
|
||||||
|
var keys = [];
|
||||||
|
iter.next(onNextKey);
|
||||||
|
|
||||||
|
function onNextKey(err, key) {
|
||||||
|
if (err) return callback(err);
|
||||||
|
if (key === undefined) return callback(null, keys);
|
||||||
|
keys.push(key);
|
||||||
|
iter.next(onNextKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
return callback.promise;
|
||||||
|
};
|
||||||
|
|
|
@ -32,6 +32,7 @@
|
||||||
"node >= 0.6"
|
"node >= 0.6"
|
||||||
],
|
],
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
|
"async-iterators": "^0.2.2",
|
||||||
"eslint": "^2.5.3",
|
"eslint": "^2.5.3",
|
||||||
"eslint-config-loopback": "^2.0.0",
|
"eslint-config-loopback": "^2.0.0",
|
||||||
"mocha": "^2.1.0",
|
"mocha": "^2.1.0",
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
'use strict';
|
'use strict';
|
||||||
|
|
||||||
|
var Promise = require('bluebird');
|
||||||
|
|
||||||
exports.givenCacheItem = function(dataSourceFactory) {
|
exports.givenCacheItem = function(dataSourceFactory) {
|
||||||
var dataSource = dataSourceFactory();
|
var dataSource = dataSourceFactory();
|
||||||
return dataSource.createModel('CacheItem', {
|
return dataSource.createModel('CacheItem', {
|
||||||
|
@ -7,3 +9,15 @@ exports.givenCacheItem = function(dataSourceFactory) {
|
||||||
value: 'any',
|
value: 'any',
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
exports.givenKeys = function(Model, keys, cb) {
|
||||||
|
var p = Promise.all(
|
||||||
|
keys.map(function(k) {
|
||||||
|
return Model.set(k, 'value-' + k);
|
||||||
|
})
|
||||||
|
);
|
||||||
|
if (cb) {
|
||||||
|
p = p.then(function(r) { cb(null, r); }, cb);
|
||||||
|
}
|
||||||
|
return p;
|
||||||
|
};
|
||||||
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
'use strict';
|
||||||
|
|
||||||
|
var asyncIterators = require('async-iterators');
|
||||||
|
var helpers = require('./_helpers');
|
||||||
|
var Promise = require('bluebird');
|
||||||
|
var should = require('should');
|
||||||
|
var toArray = Promise.promisify(asyncIterators.toArray);
|
||||||
|
|
||||||
|
module.exports = function(dataSourceFactory, connectorCapabilities) {
|
||||||
|
describe('iterateKeys', function() {
|
||||||
|
var CacheItem;
|
||||||
|
beforeEach(function unpackContext() {
|
||||||
|
CacheItem = helpers.givenCacheItem(dataSourceFactory);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns AsyncIterator covering all keys', function() {
|
||||||
|
return helpers.givenKeys(CacheItem, ['key1', 'key2'])
|
||||||
|
.then(function() {
|
||||||
|
var it = CacheItem.iterateKeys();
|
||||||
|
should(it).have.property('next');
|
||||||
|
return toArray(it);
|
||||||
|
})
|
||||||
|
.then(function(keys) {
|
||||||
|
keys.sort();
|
||||||
|
should(keys).eql(['key1', 'key2']);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns AsyncIterator supporting Promises', function() {
|
||||||
|
var iterator;
|
||||||
|
return helpers.givenKeys(CacheItem, ['key'])
|
||||||
|
.then(function() {
|
||||||
|
iterator = CacheItem.iterateKeys();
|
||||||
|
return iterator.next();
|
||||||
|
})
|
||||||
|
.then(function(key) {
|
||||||
|
should(key).equal('key');
|
||||||
|
return iterator.next();
|
||||||
|
})
|
||||||
|
.then(function(key) {
|
||||||
|
// Note: AsyncIterator contract requires `undefined` to signal
|
||||||
|
// the end of the sequence. Other false-y values like `null`
|
||||||
|
// don't work.
|
||||||
|
should(key).equal(undefined);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
};
|
|
@ -0,0 +1,68 @@
|
||||||
|
'use strict';
|
||||||
|
|
||||||
|
var helpers = require('./_helpers');
|
||||||
|
var Promise = require('bluebird');
|
||||||
|
var should = require('should');
|
||||||
|
|
||||||
|
module.exports = function(dataSourceFactory, connectorCapabilities) {
|
||||||
|
describe('keys', function() {
|
||||||
|
var CacheItem;
|
||||||
|
beforeEach(function unpackContext() {
|
||||||
|
CacheItem = helpers.givenCacheItem(dataSourceFactory);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns all keys - Callback API', function(done) {
|
||||||
|
helpers.givenKeys(CacheItem, ['key1', 'key2'], function(err) {
|
||||||
|
if (err) return done(err);
|
||||||
|
CacheItem.keys(function(err, keys) {
|
||||||
|
if (err) return done(err);
|
||||||
|
keys.sort();
|
||||||
|
should(keys).eql(['key1', 'key2']);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns all keys - Promise API', function() {
|
||||||
|
return helpers.givenKeys(CacheItem, ['key1', 'key2'])
|
||||||
|
.then(function() {
|
||||||
|
return CacheItem.keys();
|
||||||
|
})
|
||||||
|
.then(function(keys) {
|
||||||
|
keys.sort();
|
||||||
|
should(keys).eql(['key1', 'key2']);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns keys of the given model only', function() {
|
||||||
|
var AnotherModel = CacheItem.dataSource.createModel('AnotherModel');
|
||||||
|
return helpers.givenKeys(CacheItem, ['key1', 'key2'])
|
||||||
|
.then(function() {
|
||||||
|
return helpers.givenKeys(AnotherModel, ['otherKey1', 'otherKey2']);
|
||||||
|
})
|
||||||
|
.then(function() {
|
||||||
|
return CacheItem.keys();
|
||||||
|
})
|
||||||
|
.then(function(keys) {
|
||||||
|
keys.sort();
|
||||||
|
should(keys).eql(['key1', 'key2']);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('handles large key set', function() {
|
||||||
|
var expectedKeys = [];
|
||||||
|
for (var ix = 0; ix < 1000; ix++)
|
||||||
|
expectedKeys.push('key-' + ix);
|
||||||
|
|
||||||
|
return helpers.givenKeys(CacheItem, expectedKeys)
|
||||||
|
.then(function() {
|
||||||
|
return CacheItem.keys();
|
||||||
|
})
|
||||||
|
.then(function(keys) {
|
||||||
|
keys.sort();
|
||||||
|
expectedKeys.sort();
|
||||||
|
should(keys).eql(expectedKeys);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
};
|
Loading…
Reference in New Issue