Mongodb adapter

This commit is contained in:
Anatoliy Chakkaev 2012-03-11 08:48:38 +04:00
parent 310c3a7cde
commit 8bb855c88c
8 changed files with 288 additions and 109 deletions

165
lib/adapters/mongodb.js Normal file
View File

@ -0,0 +1,165 @@
var safeRequire = require('../utils').safeRequire;
/**
* Module dependencies
*/
var mongodb = safeRequire('mongodb');
var ObjectID = mongodb.ObjectID;
exports.initialize = function initializeSchema(schema, callback) {
if (!mongodb) return;
var s = schema.settings;
if (schema.settings.url) {
var url = require('url').parse(schema.settings.url);
s.host = url.hostname;
s.port = url.port;
s.database = url.path.replace(/^\//, '');
s.username = url.auth && url.auth.split(':')[0];
s.password = url.auth && url.auth.split(':')[1];
}
s.host = s.host || 'localhost';
s.port = parseInt(s.port || '27017', 10);
s.database = s.database || 'test';
var server = new mongodb.Server(s.host, s.port, {});
new mongodb.Db(s.database, server, {}).open(function (err, client) {
if (err) throw err;
schema.client = client;
schema.adapter = new MongoDB(client);
callback();
});
};
function MongoDB(client) {
this._models = {};
this.client = client;
this.collections = {};
}
MongoDB.prototype.define = function (descr) {
if (!descr.settings) descr.settings = {};
this._models[descr.model.modelName] = descr;
};
MongoDB.prototype.defineProperty = function (model, prop, params) {
this._models[model].properties[prop] = params;
};
MongoDB.prototype.collection = function (name) {
if (!this.collections[name]) {
this.collections[name] = new mongodb.Collection(this.client, name);
}
return this.collections[name];
};
MongoDB.prototype.create = function (model, data, callback) {
this.collection(model).insert(data, {}, function (err, m) {
callback(err, err ? null : m[0]._id.toString());
});
};
MongoDB.prototype.save = function (model, data, callback) {
this.collection(model).save({_id: new ObjectID(data.id)}, data, function (err) {
callback(err);
});
};
MongoDB.prototype.exists = function (model, id, callback) {
this.collection(model).findOne({_id: new ObjectID(id)}, function (err, data) {
callback(err, !err && data);
});
};
MongoDB.prototype.find = function find(model, id, callback) {
this.collection(model).findOne({_id: new ObjectID(id)}, function (err, data) {
if (data) data.id = id;
callback(err, data);
});
};
MongoDB.prototype.destroy = function destroy(model, id, callback) {
this.collection(model).remove({_id: new ObjectID(id)}, callback);
};
MongoDB.prototype.all = function all(model, filter, callback) {
if (!filter) {
filter = {};
}
var query = {};
if (filter.where) {
Object.keys(filter.where).forEach(function (k) {
var cond = filter.where[k];
var spec = false;
if (cond && cond.constructor.name === 'Object') {
spec = Object.keys(cond)[0];
cond = cond[spec];
}
if (spec) {
if (spec === 'between') {
query[k] = { $gte: cond[0], $lte: cond[1]};
} else {
query[k] = {};
query[k]['$' + spec] = cond;
}
} else {
if (cond === null) {
query[k] = {$type: 10};
} else {
query[k] = cond;
}
}
});
}
var cursor = this.collection(model).find(query);
if (filter.order) {
var m = filter.order.match(/\s+(A|DE)SC$/);
var key = filter.order;
var reverse = false;
if (m) {
key = key.replace(/\s+(A|DE)SC$/, '');
if (m[1] === 'DE') reverse = true;
}
if (reverse) {
cursor.sort([[key, 'desc']]);
} else {
cursor.sort(key);
}
}
if (filter.limit) {
cursor.limit(filter.limit);
}
if (filter.skip) {
cursor.skip(filter.skip);
} else if (filter.offset) {
cursor.skip(filter.offset);
}
cursor.toArray(function (err, data) {
if (err) return callback(err);
callback(null, data.map(function (o) { o.id = o._id.toString(); delete o._id; return o; }));
});
};
MongoDB.prototype.destroyAll = function destroyAll(model, callback) {
this.collection(model).remove({}, callback);
};
MongoDB.prototype.count = function count(model, callback, where) {
this.collection(model).count(where, function (err, count) {
callback(err, count);
});
};
MongoDB.prototype.updateAttributes = function updateAttrs(model, id, data, cb) {
this.collection(model).findAndModify({_id: new ObjectID(id)}, [['_id','asc']], {$set: data}, {}, function(err, object) {
cb(err, object);
});
};
MongoDB.prototype.disconnect = function () {
this.client.close();
};

View File

@ -36,7 +36,7 @@ MySQL.prototype.query = function (sql, callback) {
var log = this.log;
if (typeof callback !== 'function') throw new Error('callback should be a function');
this.client.query(sql, function (err, data) {
log(sql, time);
if (log) log(sql, time);
callback(err, data);
});
};

View File

@ -20,14 +20,9 @@ exports.initialize = function initializeSchema(schema, callback) {
debug: s.debug
});
schema.adapter = new PG(schema.client);
schema.client.connect(function(err){
if(!err){
process.nextTick(callback);
}else{
console.error(err);
throw err;
}
});
if (s.autoconnect === false) return callback();
schema.adapter.connect(callback);
};
function PG(client) {
@ -37,6 +32,17 @@ function PG(client) {
require('util').inherits(PG, BaseSQL);
PG.prototype.connect = function (callback) {
this.client.connect(function (err) {
if (!err){
callback();
}else{
console.error(err);
throw err;
}
});
};
PG.prototype.query = function (sql, callback) {
var time = Date.now();
var log = this.log;

View File

@ -1,126 +1,110 @@
var safeRequire = require('../utils').safeRequire;
/**
* Module dependencies
*/
var riak= require('riak-js');
var uuid = require('node-uuid');
var riak = safeRequire('riak-js');
exports.initialize = function initializeSchema(schema, callback) {
var config = {
host = schema.settings.host || '127.0.0.1',
port = schema.settings.port || 8098
};
schema.client = riak_lib.getClient(config);
schema.adapter = new BridgeToRedis(schema.client);
schema.client = riak.getClient({
host: schema.settings.host || '127.0.0.1',
port: schema.settings.port || 8091
});
schema.adapter = new Riak(schema.client);
};
function BridgeToRedis(client) {
function Riak(client) {
this._models = {};
this.client = client;
}
BridgeToRedis.prototype.define = function (descr) {
Riak.prototype.define = function (descr) {
this._models[descr.model.modelName] = descr;
};
BridgeToRedis.prototype.save = function (model, data, callback) {
this.client.hmset(model + ':' + data.id, data, callback);
Riak.prototype.save = function (model, data, callback) {
this.client.save(model, data.id, data, callback);
};
BridgeToRedis.prototype.create = function (model, data, callback) {
this.client.incr(model + ':id', function (err, id) {
data.id = id;
Riak.prototype.create = function (model, data, callback) {
data.id = uuid();
this.save(model, data, function (err) {
if (callback) {
callback(err, id);
callback(err, data.id);
}
});
}.bind(this));
};
BridgeToRedis.prototype.exists = function (model, id, callback) {
this.client.exists(model + ':' + id, function (err, exists) {
Riak.prototype.exists = function (model, id, callback) {
this.client.exists(model, id, function (err, exists, meta) {
if (callback) {
callback(err, exists);
}
});
};
BridgeToRedis.prototype.find = function find(model, id, callback) {
this.client.hgetall(model + ':' + id, function (err, data) {
Riak.prototype.find = function find(model, id, callback) {
this.client.get(model, id, function (err, data, meta) {
if (data && data.id) {
data.id = id;
} else {
data = null;
}
callback(err, data);
if (typeof callback === 'function') callback(err, data);
});
};
BridgeToRedis.prototype.destroy = function destroy(model, id, callback) {
this.client.del(model + ':' + id, function (err) {
Riak.prototype.destroy = function destroy(model, id, callback) {
this.client.remove(model, id, function (err) {
callback(err);
});
};
BridgeToRedis.prototype.all = function all(model, filter, callback) {
this.client.keys(model + ':*', function (err, keys) {
if (err) {
return callback(err, []);
}
var query = keys.map(function (key) {
return ['hgetall', key];
});
this.client.multi(query).exec(function (err, replies) {
callback(err, filter ? replies.filter(applyFilter(filter)) : replies);
Riak.prototype.all = function all(model, filter, callback) {
var opts = {};
if (filter && filter.where) opts.where = filter.where;
this.client.getAll(model, function (err, result, meta) {
if (err) return callback(err, []);
/// return callback(err, result.map(function (x) { return {id: x}; }));
result = (result || []).map(function (row) {
var record = row.data;
record.id = row.meta.key;
console.log(record);
return record;
});
return callback(err, result);
}.bind(this));
};
function applyFilter(filter) {
if (typeof filter === 'function') {
return filter;
}
var keys = Object.keys(filter);
return function (obj) {
var pass = true;
keys.forEach(function (key) {
if (!test(filter[key], obj[key])) {
pass = false;
}
});
return pass;
Riak.prototype.destroyAll = function destroyAll(model, callback) {
var self = this;
this.all(model, {}, function (err, recs) {
if (err) callback(err);
removeOne();
function removeOne(error) {
err = err || error;
var rec = recs.pop();
if (!rec) return callback(err && err.statusCode != '404' ? err : null);
console.log(rec.id);
self.client.remove(model, rec.id, removeOne);
}
function test(example, value) {
if (typeof value === 'string' && example && example.constructor.name === 'RegExp') {
return value.match(example);
}
// not strict equality
return example == value;
}
}
});
BridgeToRedis.prototype.destroyAll = function destroyAll(model, callback) {
this.client.keys(model + ':*', function (err, keys) {
if (err) {
return callback(err, []);
}
var query = keys.map(function (key) {
return ['del', key];
});
this.client.multi(query).exec(function (err, replies) {
callback(err);
});
}.bind(this));
};
BridgeToRedis.prototype.count = function count(model, callback) {
Riak.prototype.count = function count(model, callback) {
this.client.keys(model + ':*', function (err, keys) {
callback(err, err ? null : keys.length);
});
};
BridgeToRedis.prototype.updateAttributes = function updateAttrs(model, id, data, cb) {
this.client.hmset(model + ':' + id, data, cb);
Riak.prototype.updateAttributes = function updateAttrs(model, id, data, cb) {
data.id = id;
this.save(model, data, cb);
};

View File

@ -47,7 +47,7 @@ SQLite3.prototype.query = function (method, args) {
var cb = args.pop();
if (typeof cb === 'function') {
args.push(function (err, data) {
log(args[0], time);
if (log) log(args[0], time);
cb.call(this, err, data);
});
} else {

View File

@ -47,9 +47,6 @@ function Schema(name, settings) {
}
adapter.initialize(this, function () {
this.connected = true;
this.emit('connected');
}.bind(this));
// we have an adaper now?
if (!this.adapter) {
@ -67,6 +64,11 @@ function Schema(name, settings) {
log(q || query, t1);
};
};
this.connected = true;
this.emit('connected');
}.bind(this));
};
util.inherits(Schema, require('events').EventEmitter);

View File

@ -14,6 +14,7 @@
"node >= 0.4.0"
],
"dependencies": {
"node-uuid": "*"
},
"devDependencies": {
"redis": ">= 0.6.7",
@ -24,6 +25,8 @@
"sqlite3": "*",
"nodeunit": ">= 0",
"coffee-script": ">= 0",
"neo4j": ">= 0"
"riak-js": ">= 0",
"neo4j": ">= 0",
"mongodb": "*"
}
}

View File

@ -4,9 +4,7 @@ var Text = Schema.Text;
require('./spec_helper').init(exports);
var schemas = {
/*
riak: {},
*/
// riak: {},
mysql: {
database: 'myapp_test',
username: 'root'
@ -20,6 +18,7 @@ var schemas = {
},
neo4j: { url: 'http://localhost:7474/' },
mongoose: { url: 'mongodb://travis:test@localhost:27017/myapp' },
mongodb: { url: 'mongodb://travis:test@localhost:27017/myapp' },
redis: {},
memory: {}
};
@ -30,7 +29,21 @@ Object.keys(schemas).forEach(function (schemaName) {
if (process.env.ONLY && process.env.ONLY !== schemaName) return;
if (process.env.EXCEPT && ~process.env.EXCEPT.indexOf(schemaName)) return;
context(schemaName, function () {
schemas[schemaName].autoconnect = false;
var schema = new Schema(schemaName, schemas[schemaName]);
it('should connect to database', function (test) {
console.log('Connecting:', schemaName);
if (schema.adapter && schema.adapter.connect) {
schema.adapter.connect(function () {
test.done();
});
} else if (!schema.adapter) {
setTimeout(test.done, 1000);
} else {
test.done()
}
});
schema.log = function (a) {
console.log(a);
};
@ -333,7 +346,7 @@ function testOrm(schema) {
Post.count(function (err, count) {
test.equal(countOfposts, count);
Post.count({title: 'title'}, function (err, count) {
test.equal(countOfpostsFiltered, count);
test.equal(countOfpostsFiltered, count, 'filtered count');
test.done();
});
});
@ -442,6 +455,7 @@ function testOrm(schema) {
Post.destroyAll(function (err) {
if (err) {
console.log('Error in destroyAll');
console.log(err);
throw err;
}
Post.all(function (err, posts) {
@ -495,7 +509,7 @@ function testOrm(schema) {
if (err) console.log(err);
test.equal(posts.length, 5);
titles.sort().forEach(function (t, i) {
test.equal(posts[i].title, t);
if (posts[i]) test.equal(posts[i].title, t);
});
finished();
});
@ -508,6 +522,7 @@ function testOrm(schema) {
test.equal(posts.length, 5);
dates.sort(numerically).forEach(function (d, i) {
// fix inappropriated tz convert
if (posts[i])
test.equal(posts[i].date.toString(), d.toString());
});
finished();
@ -691,10 +706,14 @@ function testOrm(schema) {
});
it('should query one record', function (test) {
test.expect(4);
Post.findOne(function (err, post) {
test.ok(post.id);
test.ok(post && post.id);
Post.findOne({ where: { title: 'hey' } }, function (err, post) {
if (err) throw err;
if (err) {
console.log(err);
return test.done();
}
test.equal(post.constructor.modelName, 'Post');
test.equal(post.title, 'hey');
Post.findOne({ where: { title: 'not exists' } }, function (err, post) {