/** * Module dependencies */ var redis = require('redis'); exports.initialize = function initializeSchema(schema, callback) { schema.client = redis.createClient( schema.settings.port, schema.settings.host, schema.settings.options ); schema.client.auth(schema.settings.password); schema.client.on('connect', callback); schema.adapter = new BridgeToRedis(schema.client); }; function BridgeToRedis(client) { this._models = {}; this.client = client; this.indexes = {}; } BridgeToRedis.prototype.define = function (descr) { var m = descr.model.modelName; this._models[m] = descr; this.indexes[m] = {}; Object.keys(descr.properties).forEach(function (prop) { if (descr.properties[prop].index) { this.indexes[m][prop] = descr.properties[prop].type; } }.bind(this)); }; BridgeToRedis.prototype.defineForeignKey = function (model, key, cb) { this.indexes[model][key] = Number; cb(null, Number); }; BridgeToRedis.prototype.save = function (model, data, callback) { deleteNulls(data); var log = this.logger('HMSET ' + model + ':' + data.id + ' ...'); this.client.hmset(model + ':' + data.id, data, function (err) { log(); if (err) return callback(err); this.updateIndexes(model, data.id, data, callback); }.bind(this)); }; BridgeToRedis.prototype.updateIndexes = function (model, id, data, callback) { var i = this.indexes[model]; var schedule = []; Object.keys(data).forEach(function (key) { if (i[key]) { schedule.push([ 'sadd', 'i:' + model + ':' + key + ':' + data[key], model + ':' + id ]); } }.bind(this)); if (schedule.length) { this.client.multi(schedule).exec(function (err) { callback(err); }); } else { callback(null); } }; BridgeToRedis.prototype.create = function (model, data, callback) { var log = this.logger('INCR id:' + model); this.client.incr('id:' + model, function (err, id) { log(); data.id = id; this.save(model, data, function (err) { if (callback) { callback(err, id); } }); // push the id to the list of user ids for sorting log('SADD s:' + model + ' ' + data.id); this.client.sadd("s:" + model, data.id); }.bind(this)); }; BridgeToRedis.prototype.exists = function (model, id, callback) { var log = this.logger('EXISTS ' + model + ':' + id); this.client.exists(model + ':' + id, function (err, exists) { log(); if (callback) { callback(err, exists); } }); }; BridgeToRedis.prototype.find = function find(model, id, callback) { var t1 = Date.now(); this.client.hgetall(model + ':' + id, function (err, data) { this.log('HGETALL ' + model + ':' + id, t1); if (data && data.id) { data.id = id; } else { data = null; } callback(err, data); }.bind(this)); }; BridgeToRedis.prototype.destroy = function destroy(model, id, callback) { var t1 = Date.now(); this.client.del(model + ':' + id, function (err) { this.log('DEL ' + model + ':' + id, t1); callback(err); }.bind(this)); this.log('SREM s:' + model, t1); this.client.srem("s:" + model, id); }; BridgeToRedis.prototype.possibleIndexes = function (model, filter) { if (!filter || Object.keys(filter.where || {}).length === 0) return false; var foundIndex = []; var noIndex = []; Object.keys(filter.where).forEach(function (key) { if (this.indexes[model][key] && (typeof filter.where[key] === 'string' || typeof filter.where[key] === 'number')) { foundIndex.push('i:' + model + ':' + key + ':' + filter.where[key]); } else { noIndex.push(key); } }.bind(this)); return [foundIndex, noIndex]; }; BridgeToRedis.prototype.all = function all(model, filter, callback) { var ts = Date.now(); var client = this.client; var log = this.log; var t1 = Date.now(); var cmd; var that = this; var sortCmd = []; var props = this._models[model].properties; var allNumeric = true; // TODO: we need strict mode when filtration only possible when we have indexes // WHERE if (filter && filter.where) { var pi = this.possibleIndexes(model, filter); var indexes = pi[0]; var noIndexes = pi[1]; if (indexes.length) { cmd = 'SINTER "' + indexes.join('" "') + '"'; if (noIndexes.length) { log(model + ': no indexes found for ', noIndexes.join(', '), 'slow sorting and filtering'); } indexes.push(noIndexes.length ? orderLimitStageBad : orderLimitStage); client.sinter.apply(client, indexes); } else { // filter manually cmd = 'KEYS ' + model + ':*'; client.keys(model + ':*', orderLimitStageBad); } } else { // no filtering, just sort/limit (if any) gotKeys('*'); } // bad case when we trying to filter on non-indexed fields // in bad case we need retrieve all data and filter/limit/sort manually function orderLimitStageBad(err, keys) { log(cmd, t1); var t2 = Date.now(); if (err) { return callback(err, []); } var query = keys.map(function (key) { return ['hgetall', key]; }); client.multi(query).exec(function (err, replies) { log(query, t2); gotFilteredData(err, replies.filter(applyFilter(filter))); }); function gotFilteredData(err, nodes) { if (err) return callback(null); if (filter.order) { var allNumeric = true; var orders = filter.order; if (typeof filter.order === "string") { orders = [filter.order]; } orders.forEach(function (key) { if (props[key].type.name !== 'Number' && props[key].type.name !== 'Date') { allNumeric = false; } }); if (allNumeric) { nodes = nodes.sort(numerically.bind(orders)); } else { nodes = nodes.sort(literally.bind(orders)); } } // LIMIT if (filter && filter.limit) { var from = (filter.offset || 0), to = from + filter.limit; callback(null, nodes.slice(from, to)); } else { callback(null, nodes); } } } function orderLimitStage(err, keys) { log(cmd, t1); var t2 = Date.now(); if (err) { return callback(err, []); } gotKeys(keys); } function gotKeys(keys) { // ORDER if (filter && filter.order) { var orders = filter.order; if (typeof filter.order === "string"){ orders = [filter.order]; } orders.forEach(function (key) { if (props[key].type.name !== 'Number' && props[key].type.name !== 'Date') { allNumeric = false; } sortCmd.push("BY", model + ":*->" + key); }); } // LIMIT if (keys === '*' && filter && filter.limit){ var from = (filter.offset || 0), to = from + filter.limit; sortCmd.push("LIMIT", from, to); } // we need ALPHA modifier when sorting string values // the only case it's not required - we sort numbers // TODO: check if we sort numbers if (!allNumeric) { sortCmd.push('ALPHA'); } if (sortCmd.length) { sortCmd.unshift("s:" + model); sortCmd.push("GET", "#"); cmd = "SORT " + sortCmd.join(" "); var ttt = Date.now(); sortCmd.push(function(err, ids){ if (err) { return callback(err, []); } log(cmd, ttt); var sortedKeys = ids.map(function (i) { return model + ":" + i; }); handleKeys(err, intersect(sortedKeys, keys)); }); client.sort.apply(client, sortCmd); } else { // no sorting or filtering: just get all keys if (keys === '*') { cmd = 'KEYS ' + model + ':*'; client.keys(model + ':*', handleKeys); } else { handleKeys(null, keys); } } } function handleKeys(err, keys) { var t2 = Date.now(); var query = keys.map(function (key) { return ['hgetall', key]; }); client.multi(query).exec(function (err, replies) { log(query, t2); // console.log('Redis time: %dms', Date.now() - ts); callback(err, filter ? replies.filter(applyFilter(filter)) : replies); }); } return; function numerically(a, b) { return a[this[0]] - b[this[0]]; } function literally(a, b) { return a[this[0]] > b[this[0]]; } // TODO: find better intersection method function intersect(sortedKeys, filteredKeys) { if (filteredKeys === '*') return sortedKeys; var index = {}; filteredKeys.forEach(function (x) { index[x] = true; }); return sortedKeys.filter(function (x) { return index[x]; }); } }; function applyFilter(filter) { if (typeof filter.where === 'function') { return filter.where; } var keys = Object.keys(filter.where || {}); return function (obj) { var pass = true; keys.forEach(function (key) { if (!test(filter.where[key], obj[key])) { pass = false; } }); return pass; } function test(example, value) { if (typeof value === 'string' && example && example.constructor.name === 'RegExp') { return value.match(example); } // not strict equality return example == value; } } BridgeToRedis.prototype.destroyAll = function destroyAll(model, callback) { var keysQuery = model + ':*'; var t1 = Date.now(); this.client.keys(keysQuery, function (err, keys) { this.log('KEYS ' + keysQuery, t1); if (err) { return callback(err, []); } var query = keys.map(function (key) { return ['del', key]; }); var t2 = Date.now(); this.client.multi(query).exec(function (err, replies) { this.log(query, t2); this.client.del('s:' + model, function () { callback(err); }); }.bind(this)); }.bind(this)); }; BridgeToRedis.prototype.count = function count(model, callback, where) { var keysQuery = model + ':*'; var t1 = Date.now(); if (where && Object.keys(where).length) { this.all(model, {where: where}, function (err, data) { callback(err, err ? null : data.length); }); } else { this.client.keys(keysQuery, function (err, keys) { this.log('KEYS ' + keysQuery, t1); callback(err, err ? null : keys.length); }.bind(this)); } }; BridgeToRedis.prototype.updateAttributes = function updateAttrs(model, id, data, cb) { var t1 = Date.now(); deleteNulls(data); this.client.hmset(model + ':' + id, data, function () { this.log('HMSET ' + model + ':' + id, t1); this.updateIndexes(model, id, data, cb); }.bind(this)); }; function deleteNulls(data) { Object.keys(data).forEach(function (key) { if (data[key] === null) delete data[key]; }); } BridgeToRedis.prototype.disconnect = function disconnect() { this.log('QUIT', Date.now()); this.client.quit(); };