2012-01-30 19:34:30 +00:00
var safeRequire = require ( '../utils' ) . safeRequire ;
2011-10-01 15:51:51 +00:00
/ * *
* Module dependencies
* /
2012-01-30 19:34:30 +00:00
var redis = safeRequire ( 'redis' ) ;
2011-10-01 15:51:51 +00:00
exports . initialize = function initializeSchema ( schema , callback ) {
2012-09-09 12:51:53 +00:00
console . log ( 'GOOD NEWS! This redis adapter version is deprecated, use redis2 instead. A lot of improvements, and new indexes incompatible with old (sorry about that): now we only store id and not ModelName:id in indexes. Also dates format in indexes changed to unix timestamp for better sorting and filtering performance' ) ;
2012-01-30 19:34:30 +00:00
if ( ! redis ) return ;
2012-04-10 13:47:11 +00:00
if ( schema . settings . url ) {
var url = require ( 'url' ) ;
var redisUrl = url . parse ( schema . settings . url ) ;
2012-04-10 14:30:55 +00:00
var redisAuth = ( redisUrl . auth || '' ) . split ( ':' ) ;
2012-04-10 13:47:11 +00:00
schema . settings . host = redisUrl . hostname ;
schema . settings . port = redisUrl . port ;
2012-04-10 14:30:55 +00:00
if ( redisAuth . length == 2 ) {
schema . settings . db = redisAuth [ 0 ] ;
schema . settings . password = redisAuth [ 1 ] ;
}
2012-04-10 13:47:11 +00:00
}
2011-10-01 15:51:51 +00:00
schema . client = redis . createClient (
schema . settings . port ,
schema . settings . host ,
schema . settings . options
) ;
2011-11-05 09:55:11 +00:00
schema . client . auth ( schema . settings . password ) ;
2012-08-28 09:59:40 +00:00
var callbackCalled = false ;
var database = schema . settings . hasOwnProperty ( 'database' ) && schema . settings . database ;
schema . client . on ( 'connect' , function ( ) {
if ( ! callbackCalled && database === false ) {
callbackCalled = true ;
callback ( ) ;
} else if ( database !== false ) {
if ( callbackCalled ) {
return schema . client . select ( schema . settings . database ) ;
} else {
callbackCalled = true ;
2012-09-04 13:23:57 +00:00
return schema . client . select ( schema . settings . database , callback ) ;
2012-08-28 09:59:40 +00:00
}
}
} ) ;
2011-10-01 15:51:51 +00:00
schema . adapter = new BridgeToRedis ( schema . client ) ;
} ;
function BridgeToRedis ( client ) {
this . _models = { } ;
this . client = client ;
2011-10-04 20:17:06 +00:00
this . indexes = { } ;
2011-10-01 15:51:51 +00:00
}
BridgeToRedis . prototype . define = function ( descr ) {
2011-10-04 20:17:06 +00:00
var m = descr . model . modelName ;
this . _models [ m ] = descr ;
this . indexes [ m ] = { } ;
Object . keys ( descr . properties ) . forEach ( function ( prop ) {
if ( descr . properties [ prop ] . index ) {
this . indexes [ m ] [ prop ] = descr . properties [ prop ] . type ;
}
} . bind ( this ) ) ;
} ;
BridgeToRedis . prototype . defineForeignKey = function ( model , key , cb ) {
this . indexes [ model ] [ key ] = Number ;
cb ( null , Number ) ;
2011-10-01 15:51:51 +00:00
} ;
BridgeToRedis . prototype . save = function ( model , data , callback ) {
2012-01-13 20:06:57 +00:00
deleteNulls ( data ) ;
2011-11-11 13:16:09 +00:00
var log = this . logger ( 'HMSET ' + model + ':' + data . id + ' ...' ) ;
2011-10-04 20:17:06 +00:00
this . client . hmset ( model + ':' + data . id , data , function ( err ) {
2011-11-11 13:16:09 +00:00
log ( ) ;
2011-10-04 20:17:06 +00:00
if ( err ) return callback ( err ) ;
2011-10-19 17:17:48 +00:00
this . updateIndexes ( model , data . id , data , callback ) ;
2011-10-04 20:17:06 +00:00
} . bind ( this ) ) ;
} ;
2011-10-19 17:17:48 +00:00
BridgeToRedis . prototype . updateIndexes = function ( model , id , data , callback ) {
2011-10-04 20:17:06 +00:00
var i = this . indexes [ model ] ;
2012-03-27 14:22:24 +00:00
var schedule = [ [ 'sadd' , 's:' + model , id ] ] ;
2011-10-04 20:17:06 +00:00
Object . keys ( data ) . forEach ( function ( key ) {
if ( i [ key ] ) {
schedule . push ( [
'sadd' ,
'i:' + model + ':' + key + ':' + data [ key ] ,
2011-10-19 17:17:48 +00:00
model + ':' + id
2011-10-04 20:17:06 +00:00
] ) ;
}
} . bind ( this ) ) ;
if ( schedule . length ) {
2011-10-05 14:47:26 +00:00
this . client . multi ( schedule ) . exec ( function ( err ) {
2012-03-22 20:24:15 +00:00
callback ( err , data ) ;
2011-10-05 14:47:26 +00:00
} ) ;
2011-10-04 20:17:06 +00:00
} else {
callback ( null ) ;
}
2011-10-01 15:51:51 +00:00
} ;
BridgeToRedis . prototype . create = function ( model , data , callback ) {
2012-03-22 20:24:15 +00:00
if ( data . id ) return create . call ( this , data . id , true ) ;
2011-11-11 13:16:09 +00:00
var log = this . logger ( 'INCR id:' + model ) ;
2011-10-04 20:17:06 +00:00
this . client . incr ( 'id:' + model , function ( err , id ) {
2011-11-11 13:16:09 +00:00
log ( ) ;
2012-03-22 20:24:15 +00:00
create . call ( this , id ) ;
} . bind ( this ) ) ;
function create ( id , upsert ) {
2011-10-01 15:51:51 +00:00
data . id = id ;
this . save ( model , data , function ( err ) {
if ( callback ) {
callback ( err , id ) ;
}
} ) ;
2011-12-17 02:42:13 +00:00
// push the id to the list of user ids for sorting
2012-01-18 18:49:30 +00:00
log ( 'SADD s:' + model + ' ' + data . id ) ;
2012-03-22 20:24:15 +00:00
this . client . sadd ( "s:" + model , upsert ? data : data . id ) ;
}
2011-10-01 15:51:51 +00:00
} ;
2012-03-22 20:33:09 +00:00
BridgeToRedis . prototype . updateOrCreate = function ( model , data , callback ) {
if ( ! data . id ) return this . create ( model , data , callback ) ;
this . save ( model , data , callback ) ;
} ;
2011-10-01 15:51:51 +00:00
BridgeToRedis . prototype . exists = function ( model , id , callback ) {
2011-11-11 13:16:09 +00:00
var log = this . logger ( 'EXISTS ' + model + ':' + id ) ;
2011-10-01 15:51:51 +00:00
this . client . exists ( model + ':' + id , function ( err , exists ) {
2011-11-11 13:16:09 +00:00
log ( ) ;
2011-10-01 15:51:51 +00:00
if ( callback ) {
callback ( err , exists ) ;
}
} ) ;
} ;
BridgeToRedis . prototype . find = function find ( model , id , callback ) {
2011-11-11 13:16:09 +00:00
var t1 = Date . now ( ) ;
2011-10-01 15:51:51 +00:00
this . client . hgetall ( model + ':' + id , function ( err , data ) {
2011-11-11 13:16:09 +00:00
this . log ( 'HGETALL ' + model + ':' + id , t1 ) ;
2012-03-22 20:24:15 +00:00
if ( data && Object . keys ( data ) . length > 0 ) {
2011-10-01 15:51:51 +00:00
data . id = id ;
} else {
data = null ;
}
callback ( err , data ) ;
2011-11-11 13:16:09 +00:00
} . bind ( this ) ) ;
2011-10-01 15:51:51 +00:00
} ;
BridgeToRedis . prototype . destroy = function destroy ( model , id , callback ) {
2011-11-11 13:16:09 +00:00
var t1 = Date . now ( ) ;
2011-10-01 15:51:51 +00:00
this . client . del ( model + ':' + id , function ( err ) {
2011-11-11 13:16:09 +00:00
this . log ( 'DEL ' + model + ':' + id , t1 ) ;
2011-10-01 15:51:51 +00:00
callback ( err ) ;
2011-11-11 13:16:09 +00:00
} . bind ( this ) ) ;
2011-12-17 02:42:13 +00:00
this . log ( 'SREM s:' + model , t1 ) ;
this . client . srem ( "s:" + model , id ) ;
2011-10-01 15:51:51 +00:00
} ;
2011-10-05 14:47:26 +00:00
BridgeToRedis . prototype . possibleIndexes = function ( model , filter ) {
2011-11-14 08:46:48 +00:00
if ( ! filter || Object . keys ( filter . where || { } ) . length === 0 ) return false ;
2011-10-04 20:17:06 +00:00
2011-10-05 14:47:26 +00:00
var foundIndex = [ ] ;
2012-01-19 16:18:57 +00:00
var noIndex = [ ] ;
2011-11-04 07:30:25 +00:00
Object . keys ( filter . where ) . forEach ( function ( key ) {
2012-01-19 16:18:57 +00:00
if ( this . indexes [ model ] [ key ] && ( typeof filter . where [ key ] === 'string' || typeof filter . where [ key ] === 'number' ) ) {
2011-11-04 07:30:25 +00:00
foundIndex . push ( 'i:' + model + ':' + key + ':' + filter . where [ key ] ) ;
2012-01-19 16:18:57 +00:00
} else {
noIndex . push ( key ) ;
2011-10-04 20:17:06 +00:00
}
} . bind ( this ) ) ;
2012-01-19 16:18:57 +00:00
return [ foundIndex , noIndex ] ;
2011-10-04 20:17:06 +00:00
} ;
2011-10-01 15:51:51 +00:00
BridgeToRedis . prototype . all = function all ( model , filter , callback ) {
2011-10-04 20:17:06 +00:00
var ts = Date . now ( ) ;
2011-10-05 14:47:26 +00:00
var client = this . client ;
2011-11-11 13:16:09 +00:00
var log = this . log ;
var t1 = Date . now ( ) ;
var cmd ;
2011-12-17 02:42:13 +00:00
var that = this ;
var sortCmd = [ ] ;
2012-01-18 18:49:30 +00:00
var props = this . _models [ model ] . properties ;
var allNumeric = true ;
2011-12-17 02:42:13 +00:00
2012-01-19 16:18:57 +00:00
// TODO: we need strict mode when filtration only possible when we have indexes
// WHERE
if ( filter && filter . where ) {
var pi = this . possibleIndexes ( model , filter ) ;
var indexes = pi [ 0 ] ;
var noIndexes = pi [ 1 ] ;
2012-02-28 14:22:01 +00:00
if ( indexes && indexes . length ) {
2012-01-19 16:18:57 +00:00
cmd = 'SINTER "' + indexes . join ( '" "' ) + '"' ;
if ( noIndexes . length ) {
log ( model + ': no indexes found for ' , noIndexes . join ( ', ' ) ,
'slow sorting and filtering' ) ;
2012-01-18 18:49:30 +00:00
}
2012-01-19 16:18:57 +00:00
indexes . push ( noIndexes . length ? orderLimitStageBad : orderLimitStage ) ;
client . sinter . apply ( client , indexes ) ;
} else {
// filter manually
cmd = 'KEYS ' + model + ':*' ;
client . keys ( model + ':*' , orderLimitStageBad ) ;
}
} else {
// no filtering, just sort/limit (if any)
gotKeys ( '*' ) ;
2011-12-17 02:42:13 +00:00
}
2012-01-19 16:18:57 +00:00
// bad case when we trying to filter on non-indexed fields
// in bad case we need retrieve all data and filter/limit/sort manually
function orderLimitStageBad ( err , keys ) {
log ( cmd , t1 ) ;
var t2 = Date . now ( ) ;
if ( err ) {
return callback ( err , [ ] ) ;
}
var query = keys . map ( function ( key ) {
return [ 'hgetall' , key ] ;
} ) ;
client . multi ( query ) . exec ( function ( err , replies ) {
log ( query , t2 ) ;
gotFilteredData ( err , replies . filter ( applyFilter ( filter ) ) ) ;
} ) ;
function gotFilteredData ( err , nodes ) {
if ( err ) return callback ( null ) ;
if ( filter . order ) {
var allNumeric = true ;
var orders = filter . order ;
if ( typeof filter . order === "string" ) {
orders = [ filter . order ] ;
}
orders . forEach ( function ( key ) {
2012-03-06 13:05:22 +00:00
key = key . split ( ' ' ) [ 0 ] ;
2012-01-19 16:18:57 +00:00
if ( props [ key ] . type . name !== 'Number' && props [ key ] . type . name !== 'Date' ) {
allNumeric = false ;
}
} ) ;
if ( allNumeric ) {
nodes = nodes . sort ( numerically . bind ( orders ) ) ;
} else {
nodes = nodes . sort ( literally . bind ( orders ) ) ;
}
}
// LIMIT
if ( filter && filter . limit ) {
var from = ( filter . offset || 0 ) , to = from + filter . limit ;
callback ( null , nodes . slice ( from , to ) ) ;
} else {
callback ( null , nodes ) ;
}
}
2011-12-17 02:42:13 +00:00
}
2011-10-04 20:17:06 +00:00
2012-01-19 16:18:57 +00:00
function orderLimitStage ( err , keys ) {
log ( cmd , t1 ) ;
var t2 = Date . now ( ) ;
if ( err ) {
return callback ( err , [ ] ) ;
}
gotKeys ( keys ) ;
2012-01-18 18:49:30 +00:00
}
2012-01-19 16:18:57 +00:00
function gotKeys ( keys ) {
// ORDER
2012-03-01 19:57:48 +00:00
var reverse = false ;
2012-01-19 16:18:57 +00:00
if ( filter && filter . order ) {
var orders = filter . order ;
if ( typeof filter . order === "string" ) {
orders = [ filter . order ] ;
2011-12-17 02:42:13 +00:00
}
2012-01-19 16:18:57 +00:00
orders . forEach ( function ( key ) {
2012-03-01 19:57:48 +00:00
var m = key . match ( /\s+(A|DE)SC$/i ) ;
if ( m ) {
2012-03-06 13:05:22 +00:00
key = key . replace ( /\s+(A|DE)SC/i , '' ) ;
2012-03-01 19:57:48 +00:00
if ( m [ 1 ] === 'DE' ) reverse = true ;
}
2012-02-11 10:47:54 +00:00
if ( key !== 'id' ) {
if ( props [ key ] . type . name !== 'Number' && props [ key ] . type . name !== 'Date' ) {
allNumeric = false ;
}
2012-01-19 16:18:57 +00:00
}
sortCmd . push ( "BY" , model + ":*->" + key ) ;
2011-12-17 02:42:13 +00:00
} ) ;
2012-01-19 16:18:57 +00:00
}
// LIMIT
if ( keys === '*' && filter && filter . limit ) {
var from = ( filter . offset || 0 ) , to = from + filter . limit ;
sortCmd . push ( "LIMIT" , from , to ) ;
}
// we need ALPHA modifier when sorting string values
// the only case it's not required - we sort numbers
// TODO: check if we sort numbers
if ( ! allNumeric ) {
sortCmd . push ( 'ALPHA' ) ;
}
2012-03-01 19:57:48 +00:00
if ( reverse ) {
sortCmd . push ( 'DESC' ) ;
}
2012-01-19 16:18:57 +00:00
if ( sortCmd . length ) {
sortCmd . unshift ( "s:" + model ) ;
sortCmd . push ( "GET" , "#" ) ;
cmd = "SORT " + sortCmd . join ( " " ) ;
var ttt = Date . now ( ) ;
sortCmd . push ( function ( err , ids ) {
if ( err ) {
return callback ( err , [ ] ) ;
}
log ( cmd , ttt ) ;
var sortedKeys = ids . map ( function ( i ) {
return model + ":" + i ;
} ) ;
handleKeys ( err , intersect ( sortedKeys , keys ) ) ;
} ) ;
client . sort . apply ( client , sortCmd ) ;
2011-12-17 02:42:13 +00:00
} else {
2012-01-19 16:18:57 +00:00
// no sorting or filtering: just get all keys
if ( keys === '*' ) {
cmd = 'KEYS ' + model + ':*' ;
client . keys ( model + ':*' , handleKeys ) ;
} else {
handleKeys ( null , keys ) ;
}
2011-12-17 02:42:13 +00:00
}
2011-10-04 20:17:06 +00:00
}
function handleKeys ( err , keys ) {
2011-11-11 13:16:09 +00:00
var t2 = Date . now ( ) ;
2011-10-01 15:51:51 +00:00
var query = keys . map ( function ( key ) {
return [ 'hgetall' , key ] ;
} ) ;
2011-10-05 14:47:26 +00:00
client . multi ( query ) . exec ( function ( err , replies ) {
2011-11-11 13:16:09 +00:00
log ( query , t2 ) ;
2011-10-04 20:17:06 +00:00
// console.log('Redis time: %dms', Date.now() - ts);
2011-10-01 15:51:51 +00:00
callback ( err , filter ? replies . filter ( applyFilter ( filter ) ) : replies ) ;
} ) ;
2011-10-04 20:17:06 +00:00
}
2012-01-19 16:18:57 +00:00
return ;
function numerically ( a , b ) {
return a [ this [ 0 ] ] - b [ this [ 0 ] ] ;
}
function literally ( a , b ) {
return a [ this [ 0 ] ] > b [ this [ 0 ] ] ;
}
// TODO: find better intersection method
function intersect ( sortedKeys , filteredKeys ) {
if ( filteredKeys === '*' ) return sortedKeys ;
var index = { } ;
filteredKeys . forEach ( function ( x ) {
index [ x ] = true ;
} ) ;
return sortedKeys . filter ( function ( x ) {
return index [ x ] ;
} ) ;
}
2011-10-01 15:51:51 +00:00
} ;
function applyFilter ( filter ) {
2011-11-04 07:30:25 +00:00
if ( typeof filter . where === 'function' ) {
return filter . where ;
2011-10-01 15:51:51 +00:00
}
2011-11-14 08:46:48 +00:00
var keys = Object . keys ( filter . where || { } ) ;
2011-10-01 15:51:51 +00:00
return function ( obj ) {
var pass = true ;
2012-05-29 11:16:24 +00:00
if ( ! obj ) return false ;
2011-10-01 15:51:51 +00:00
keys . forEach ( function ( key ) {
2011-11-04 07:30:25 +00:00
if ( ! test ( filter . where [ key ] , obj [ key ] ) ) {
2011-10-01 15:51:51 +00:00
pass = false ;
}
} ) ;
return pass ;
2012-04-10 13:47:11 +00:00
} ;
2011-10-01 15:51:51 +00:00
function test ( example , value ) {
if ( typeof value === 'string' && example && example . constructor . name === 'RegExp' ) {
return value . match ( example ) ;
}
// not strict equality
return example == value ;
}
}
BridgeToRedis . prototype . destroyAll = function destroyAll ( model , callback ) {
2011-11-11 13:16:09 +00:00
var keysQuery = model + ':*' ;
var t1 = Date . now ( ) ;
this . client . keys ( keysQuery , function ( err , keys ) {
this . log ( 'KEYS ' + keysQuery , t1 ) ;
2011-10-01 15:51:51 +00:00
if ( err ) {
return callback ( err , [ ] ) ;
}
var query = keys . map ( function ( key ) {
return [ 'del' , key ] ;
} ) ;
2011-11-11 13:16:09 +00:00
var t2 = Date . now ( ) ;
2011-10-01 15:51:51 +00:00
this . client . multi ( query ) . exec ( function ( err , replies ) {
2011-11-11 13:16:09 +00:00
this . log ( query , t2 ) ;
2012-01-18 18:49:30 +00:00
this . client . del ( 's:' + model , function ( ) {
callback ( err ) ;
} ) ;
2011-11-11 13:16:09 +00:00
} . bind ( this ) ) ;
2011-10-01 15:51:51 +00:00
} . bind ( this ) ) ;
} ;
2012-01-30 13:27:26 +00:00
BridgeToRedis . prototype . count = function count ( model , callback , where ) {
2011-11-11 13:16:09 +00:00
var keysQuery = model + ':*' ;
var t1 = Date . now ( ) ;
2012-01-30 13:27:26 +00:00
if ( where && Object . keys ( where ) . length ) {
this . all ( model , { where : where } , function ( err , data ) {
callback ( err , err ? null : data . length ) ;
} ) ;
} else {
this . client . keys ( keysQuery , function ( err , keys ) {
this . log ( 'KEYS ' + keysQuery , t1 ) ;
callback ( err , err ? null : keys . length ) ;
} . bind ( this ) ) ;
}
2011-10-01 15:51:51 +00:00
} ;
BridgeToRedis . prototype . updateAttributes = function updateAttrs ( model , id , data , cb ) {
2011-11-11 13:16:09 +00:00
var t1 = Date . now ( ) ;
2012-01-13 20:06:57 +00:00
deleteNulls ( data ) ;
2011-10-19 17:17:48 +00:00
this . client . hmset ( model + ':' + id , data , function ( ) {
2011-11-11 13:16:09 +00:00
this . log ( 'HMSET ' + model + ':' + id , t1 ) ;
2011-10-19 17:17:48 +00:00
this . updateIndexes ( model , id , data , cb ) ;
} . bind ( this ) ) ;
2011-10-01 15:51:51 +00:00
} ;
2012-01-13 20:06:57 +00:00
function deleteNulls ( data ) {
Object . keys ( data ) . forEach ( function ( key ) {
if ( data [ key ] === null ) delete data [ key ] ;
} ) ;
}
2011-10-21 12:46:09 +00:00
BridgeToRedis . prototype . disconnect = function disconnect ( ) {
2011-11-11 13:16:09 +00:00
this . log ( 'QUIT' , Date . now ( ) ) ;
2011-10-21 12:46:09 +00:00
this . client . quit ( ) ;
} ;