From 8bb855c88c78ac39f6d6a14d443c38e1d92602b5 Mon Sep 17 00:00:00 2001 From: Anatoliy Chakkaev Date: Sun, 11 Mar 2012 08:48:38 +0400 Subject: [PATCH] Mongodb adapter --- lib/adapters/mongodb.js | 165 +++++++++++++++++++++++++++++++++++++++ lib/adapters/mysql.js | 2 +- lib/adapters/postgres.js | 22 ++++-- lib/adapters/riak.js | 132 ++++++++++++++----------------- lib/adapters/sqlite3.js | 2 +- lib/schema.js | 36 +++++---- package.json | 5 +- test/common_test.js | 33 ++++++-- 8 files changed, 288 insertions(+), 109 deletions(-) create mode 100644 lib/adapters/mongodb.js diff --git a/lib/adapters/mongodb.js b/lib/adapters/mongodb.js new file mode 100644 index 00000000..972bc378 --- /dev/null +++ b/lib/adapters/mongodb.js @@ -0,0 +1,165 @@ +var safeRequire = require('../utils').safeRequire; + +/** + * Module dependencies + */ +var mongodb = safeRequire('mongodb'); +var ObjectID = mongodb.ObjectID; + +exports.initialize = function initializeSchema(schema, callback) { + if (!mongodb) return; + + var s = schema.settings; + + if (schema.settings.url) { + var url = require('url').parse(schema.settings.url); + s.host = url.hostname; + s.port = url.port; + s.database = url.path.replace(/^\//, ''); + s.username = url.auth && url.auth.split(':')[0]; + s.password = url.auth && url.auth.split(':')[1]; + } + + s.host = s.host || 'localhost'; + s.port = parseInt(s.port || '27017', 10); + s.database = s.database || 'test'; + + var server = new mongodb.Server(s.host, s.port, {}); + new mongodb.Db(s.database, server, {}).open(function (err, client) { + if (err) throw err; + schema.client = client; + schema.adapter = new MongoDB(client); + callback(); + }); +}; + +function MongoDB(client) { + this._models = {}; + this.client = client; + this.collections = {}; +} + +MongoDB.prototype.define = function (descr) { + if (!descr.settings) descr.settings = {}; + this._models[descr.model.modelName] = descr; +}; + +MongoDB.prototype.defineProperty = function (model, prop, params) { + this._models[model].properties[prop] = params; +}; + +MongoDB.prototype.collection = function (name) { + if (!this.collections[name]) { + this.collections[name] = new mongodb.Collection(this.client, name); + } + return this.collections[name]; +}; + +MongoDB.prototype.create = function (model, data, callback) { + this.collection(model).insert(data, {}, function (err, m) { + callback(err, err ? null : m[0]._id.toString()); + }); +}; + +MongoDB.prototype.save = function (model, data, callback) { + this.collection(model).save({_id: new ObjectID(data.id)}, data, function (err) { + callback(err); + }); +}; + +MongoDB.prototype.exists = function (model, id, callback) { + this.collection(model).findOne({_id: new ObjectID(id)}, function (err, data) { + callback(err, !err && data); + }); +}; + +MongoDB.prototype.find = function find(model, id, callback) { + this.collection(model).findOne({_id: new ObjectID(id)}, function (err, data) { + if (data) data.id = id; + callback(err, data); + }); +}; + +MongoDB.prototype.destroy = function destroy(model, id, callback) { + this.collection(model).remove({_id: new ObjectID(id)}, callback); +}; + +MongoDB.prototype.all = function all(model, filter, callback) { + if (!filter) { + filter = {}; + } + var query = {}; + if (filter.where) { + Object.keys(filter.where).forEach(function (k) { + var cond = filter.where[k]; + var spec = false; + if (cond && cond.constructor.name === 'Object') { + spec = Object.keys(cond)[0]; + cond = cond[spec]; + } + if (spec) { + if (spec === 'between') { + query[k] = { $gte: cond[0], $lte: cond[1]}; + } else { + query[k] = {}; + query[k]['$' + spec] = cond; + } + } else { + if (cond === null) { + query[k] = {$type: 10}; + } else { + query[k] = cond; + } + } + }); + } + var cursor = this.collection(model).find(query); + + if (filter.order) { + var m = filter.order.match(/\s+(A|DE)SC$/); + var key = filter.order; + var reverse = false; + if (m) { + key = key.replace(/\s+(A|DE)SC$/, ''); + if (m[1] === 'DE') reverse = true; + } + if (reverse) { + cursor.sort([[key, 'desc']]); + } else { + cursor.sort(key); + } + } + if (filter.limit) { + cursor.limit(filter.limit); + } + if (filter.skip) { + cursor.skip(filter.skip); + } else if (filter.offset) { + cursor.skip(filter.offset); + } + cursor.toArray(function (err, data) { + if (err) return callback(err); + callback(null, data.map(function (o) { o.id = o._id.toString(); delete o._id; return o; })); + }); +}; + +MongoDB.prototype.destroyAll = function destroyAll(model, callback) { + this.collection(model).remove({}, callback); +}; + +MongoDB.prototype.count = function count(model, callback, where) { + this.collection(model).count(where, function (err, count) { + callback(err, count); + }); +}; + +MongoDB.prototype.updateAttributes = function updateAttrs(model, id, data, cb) { + this.collection(model).findAndModify({_id: new ObjectID(id)}, [['_id','asc']], {$set: data}, {}, function(err, object) { + cb(err, object); + }); +}; + +MongoDB.prototype.disconnect = function () { + this.client.close(); +}; + diff --git a/lib/adapters/mysql.js b/lib/adapters/mysql.js index e5a06421..65606a09 100644 --- a/lib/adapters/mysql.js +++ b/lib/adapters/mysql.js @@ -36,7 +36,7 @@ MySQL.prototype.query = function (sql, callback) { var log = this.log; if (typeof callback !== 'function') throw new Error('callback should be a function'); this.client.query(sql, function (err, data) { - log(sql, time); + if (log) log(sql, time); callback(err, data); }); }; diff --git a/lib/adapters/postgres.js b/lib/adapters/postgres.js index 69f3a940..5a4b1706 100644 --- a/lib/adapters/postgres.js +++ b/lib/adapters/postgres.js @@ -20,14 +20,9 @@ exports.initialize = function initializeSchema(schema, callback) { debug: s.debug }); schema.adapter = new PG(schema.client); - schema.client.connect(function(err){ - if(!err){ - process.nextTick(callback); - }else{ - console.error(err); - throw err; - } - }); + if (s.autoconnect === false) return callback(); + + schema.adapter.connect(callback); }; function PG(client) { @@ -37,6 +32,17 @@ function PG(client) { require('util').inherits(PG, BaseSQL); +PG.prototype.connect = function (callback) { + this.client.connect(function (err) { + if (!err){ + callback(); + }else{ + console.error(err); + throw err; + } + }); +}; + PG.prototype.query = function (sql, callback) { var time = Date.now(); var log = this.log; diff --git a/lib/adapters/riak.js b/lib/adapters/riak.js index 57cf41a9..b6fc2853 100644 --- a/lib/adapters/riak.js +++ b/lib/adapters/riak.js @@ -1,126 +1,110 @@ +var safeRequire = require('../utils').safeRequire; + /** * Module dependencies */ -var riak= require('riak-js'); +var uuid = require('node-uuid'); +var riak = safeRequire('riak-js'); exports.initialize = function initializeSchema(schema, callback) { - var config = { - host = schema.settings.host || '127.0.0.1', - port = schema.settings.port || 8098 - }; - - schema.client = riak_lib.getClient(config); - schema.adapter = new BridgeToRedis(schema.client); + schema.client = riak.getClient({ + host: schema.settings.host || '127.0.0.1', + port: schema.settings.port || 8091 + }); + schema.adapter = new Riak(schema.client); }; -function BridgeToRedis(client) { +function Riak(client) { this._models = {}; this.client = client; } -BridgeToRedis.prototype.define = function (descr) { +Riak.prototype.define = function (descr) { this._models[descr.model.modelName] = descr; }; -BridgeToRedis.prototype.save = function (model, data, callback) { - this.client.hmset(model + ':' + data.id, data, callback); +Riak.prototype.save = function (model, data, callback) { + this.client.save(model, data.id, data, callback); }; -BridgeToRedis.prototype.create = function (model, data, callback) { - this.client.incr(model + ':id', function (err, id) { - data.id = id; - this.save(model, data, function (err) { - if (callback) { - callback(err, id); - } - }); - }.bind(this)); +Riak.prototype.create = function (model, data, callback) { + data.id = uuid(); + this.save(model, data, function (err) { + if (callback) { + callback(err, data.id); + } + }); }; -BridgeToRedis.prototype.exists = function (model, id, callback) { - this.client.exists(model + ':' + id, function (err, exists) { +Riak.prototype.exists = function (model, id, callback) { + this.client.exists(model, id, function (err, exists, meta) { if (callback) { callback(err, exists); } }); }; -BridgeToRedis.prototype.find = function find(model, id, callback) { - this.client.hgetall(model + ':' + id, function (err, data) { +Riak.prototype.find = function find(model, id, callback) { + this.client.get(model, id, function (err, data, meta) { if (data && data.id) { data.id = id; } else { data = null; } - callback(err, data); + if (typeof callback === 'function') callback(err, data); }); }; -BridgeToRedis.prototype.destroy = function destroy(model, id, callback) { - this.client.del(model + ':' + id, function (err) { +Riak.prototype.destroy = function destroy(model, id, callback) { + this.client.remove(model, id, function (err) { callback(err); }); }; -BridgeToRedis.prototype.all = function all(model, filter, callback) { - this.client.keys(model + ':*', function (err, keys) { - if (err) { - return callback(err, []); - } - var query = keys.map(function (key) { - return ['hgetall', key]; - }); - this.client.multi(query).exec(function (err, replies) { - callback(err, filter ? replies.filter(applyFilter(filter)) : replies); +Riak.prototype.all = function all(model, filter, callback) { + var opts = {}; + if (filter && filter.where) opts.where = filter.where; + this.client.getAll(model, function (err, result, meta) { + if (err) return callback(err, []); + /// return callback(err, result.map(function (x) { return {id: x}; })); + result = (result || []).map(function (row) { + var record = row.data; + record.id = row.meta.key; + console.log(record); + return record; }); + + return callback(err, result); }.bind(this)); }; -function applyFilter(filter) { - if (typeof filter === 'function') { - return filter; - } - var keys = Object.keys(filter); - return function (obj) { - var pass = true; - keys.forEach(function (key) { - if (!test(filter[key], obj[key])) { - pass = false; - } - }); - return pass; - } +Riak.prototype.destroyAll = function destroyAll(model, callback) { + var self = this; + this.all(model, {}, function (err, recs) { + if (err) callback(err); - function test(example, value) { - if (typeof value === 'string' && example && example.constructor.name === 'RegExp') { - return value.match(example); - } - // not strict equality - return example == value; - } -} + removeOne(); -BridgeToRedis.prototype.destroyAll = function destroyAll(model, callback) { - this.client.keys(model + ':*', function (err, keys) { - if (err) { - return callback(err, []); + function removeOne(error) { + err = err || error; + var rec = recs.pop(); + if (!rec) return callback(err && err.statusCode != '404' ? err : null); + console.log(rec.id); + self.client.remove(model, rec.id, removeOne); } - var query = keys.map(function (key) { - return ['del', key]; - }); - this.client.multi(query).exec(function (err, replies) { - callback(err); - }); - }.bind(this)); + + }); + }; -BridgeToRedis.prototype.count = function count(model, callback) { +Riak.prototype.count = function count(model, callback) { this.client.keys(model + ':*', function (err, keys) { callback(err, err ? null : keys.length); }); }; -BridgeToRedis.prototype.updateAttributes = function updateAttrs(model, id, data, cb) { - this.client.hmset(model + ':' + id, data, cb); +Riak.prototype.updateAttributes = function updateAttrs(model, id, data, cb) { + data.id = id; + this.save(model, data, cb); }; diff --git a/lib/adapters/sqlite3.js b/lib/adapters/sqlite3.js index 4b5ea88f..387b2cdb 100644 --- a/lib/adapters/sqlite3.js +++ b/lib/adapters/sqlite3.js @@ -47,7 +47,7 @@ SQLite3.prototype.query = function (method, args) { var cb = args.pop(); if (typeof cb === 'function') { args.push(function (err, data) { - log(args[0], time); + if (log) log(args[0], time); cb.call(this, err, data); }); } else { diff --git a/lib/schema.js b/lib/schema.js index a0aeb7b6..564ca179 100644 --- a/lib/schema.js +++ b/lib/schema.js @@ -47,26 +47,28 @@ function Schema(name, settings) { } adapter.initialize(this, function () { + + // we have an adaper now? + if (!this.adapter) { + throw new Error('Adapter is not defined correctly: it should create `adapter` member of schema'); + } + + this.adapter.log = function (query, start) { + schema.log(query, start); + }; + + this.adapter.logger = function (query) { + var t1 = Date.now(); + var log = this.log; + return function (q) { + log(q || query, t1); + }; + }; + this.connected = true; this.emit('connected'); + }.bind(this)); - - // we have an adaper now? - if (!this.adapter) { - throw new Error('Adapter is not defined correctly: it should create `adapter` member of schema'); - } - - this.adapter.log = function (query, start) { - schema.log(query, start); - }; - - this.adapter.logger = function (query) { - var t1 = Date.now(); - var log = this.log; - return function (q) { - log(q || query, t1); - }; - }; }; util.inherits(Schema, require('events').EventEmitter); diff --git a/package.json b/package.json index 5b76b3ca..6ccfc7b3 100644 --- a/package.json +++ b/package.json @@ -14,6 +14,7 @@ "node >= 0.4.0" ], "dependencies": { + "node-uuid": "*" }, "devDependencies": { "redis": ">= 0.6.7", @@ -24,6 +25,8 @@ "sqlite3": "*", "nodeunit": ">= 0", "coffee-script": ">= 0", - "neo4j": ">= 0" + "riak-js": ">= 0", + "neo4j": ">= 0", + "mongodb": "*" } } diff --git a/test/common_test.js b/test/common_test.js index 0e5e5fea..05965b37 100644 --- a/test/common_test.js +++ b/test/common_test.js @@ -4,9 +4,7 @@ var Text = Schema.Text; require('./spec_helper').init(exports); var schemas = { - /* - riak: {}, - */ + // riak: {}, mysql: { database: 'myapp_test', username: 'root' @@ -20,6 +18,7 @@ var schemas = { }, neo4j: { url: 'http://localhost:7474/' }, mongoose: { url: 'mongodb://travis:test@localhost:27017/myapp' }, + mongodb: { url: 'mongodb://travis:test@localhost:27017/myapp' }, redis: {}, memory: {} }; @@ -30,7 +29,21 @@ Object.keys(schemas).forEach(function (schemaName) { if (process.env.ONLY && process.env.ONLY !== schemaName) return; if (process.env.EXCEPT && ~process.env.EXCEPT.indexOf(schemaName)) return; context(schemaName, function () { + schemas[schemaName].autoconnect = false; var schema = new Schema(schemaName, schemas[schemaName]); + it('should connect to database', function (test) { + console.log('Connecting:', schemaName); + if (schema.adapter && schema.adapter.connect) { + schema.adapter.connect(function () { + test.done(); + }); + } else if (!schema.adapter) { + setTimeout(test.done, 1000); + } else { + test.done() + } + }); + schema.log = function (a) { console.log(a); }; @@ -333,7 +346,7 @@ function testOrm(schema) { Post.count(function (err, count) { test.equal(countOfposts, count); Post.count({title: 'title'}, function (err, count) { - test.equal(countOfpostsFiltered, count); + test.equal(countOfpostsFiltered, count, 'filtered count'); test.done(); }); }); @@ -442,6 +455,7 @@ function testOrm(schema) { Post.destroyAll(function (err) { if (err) { console.log('Error in destroyAll'); + console.log(err); throw err; } Post.all(function (err, posts) { @@ -495,7 +509,7 @@ function testOrm(schema) { if (err) console.log(err); test.equal(posts.length, 5); titles.sort().forEach(function (t, i) { - test.equal(posts[i].title, t); + if (posts[i]) test.equal(posts[i].title, t); }); finished(); }); @@ -508,6 +522,7 @@ function testOrm(schema) { test.equal(posts.length, 5); dates.sort(numerically).forEach(function (d, i) { // fix inappropriated tz convert + if (posts[i]) test.equal(posts[i].date.toString(), d.toString()); }); finished(); @@ -691,10 +706,14 @@ function testOrm(schema) { }); it('should query one record', function (test) { + test.expect(4); Post.findOne(function (err, post) { - test.ok(post.id); + test.ok(post && post.id); Post.findOne({ where: { title: 'hey' } }, function (err, post) { - if (err) throw err; + if (err) { + console.log(err); + return test.done(); + } test.equal(post.constructor.modelName, 'Post'); test.equal(post.title, 'hey'); Post.findOne({ where: { title: 'not exists' } }, function (err, post) {