diff --git a/docs/examples.md b/docs/examples.md index 9fa652a..6c6b21c 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -550,3 +550,76 @@ To test out this example, try: $ ldapsearch -H ldap://localhost:389 -x -D cn=demo,dc=example,dc=com \ -w demo -b "dc=example,dc=com" objectclass=* ``` + +# Multi-threaded Server + +This example demonstrates multi-threading via the `cluster` module utilizing a `net` server for initial socket receipt. An alternate example demonstrating use of the `connectionRouter` `serverOptions` hook is available in the `examples` directory. + +```js +const cluster = require('cluster'); +const ldap = require('ldapjs'); +const net = require('net'); +const os = require('os'); + +const threads = []; +threads.getNext = function () { + return (Math.floor(Math.random() * this.length)); +}; + +const serverOptions = { + port: 1389 +}; + +if (cluster.isMaster) { + const server = net.createServer(serverOptions, (socket) => { + socket.pause(); + console.log('ldapjs client requesting connection'); + let routeTo = threads.getNext(); + threads[routeTo].send({ type: 'connection' }, socket); + }); + + for (let i = 0; i < os.cpus().length; i++) { + let thread = cluster.fork({ + 'id': i + }); + thread.id = i; + thread.on('message', function (msg) { + + }); + threads.push(thread); + } + + server.listen(serverOptions.port, function () { + console.log('ldapjs listening at ldap://127.0.0.1:' + serverOptions.port); + }); +} else { + const server = ldap.createServer(serverOptions); + + let threadId = process.env.id; + + process.on('message', (msg, socket) => { + switch (msg.type) { + case 'connection': + server.newConnection(socket); + socket.resume(); + console.log('ldapjs client connection accepted on ' + threadId.toString()); + } + }); + + server.search('dc=example', function (req, res, next) { + console.log('ldapjs search initiated on ' + threadId.toString()); + var obj = { + dn: req.dn.toString(), + attributes: { + objectclass: ['organization', 'top'], + o: 'example' + } + }; + + if (req.filter.matches(obj.attributes)) + res.send(obj); + + res.end(); + }); +} +``` diff --git a/examples/cluster-threading-net-server.js b/examples/cluster-threading-net-server.js new file mode 100644 index 0000000..4d49089 --- /dev/null +++ b/examples/cluster-threading-net-server.js @@ -0,0 +1,65 @@ +const cluster = require('cluster') +const ldap = require('ldapjs') +const net = require('net') +const os = require('os') + +const threads = [] +threads.getNext = function () { + return (Math.floor(Math.random() * this.length)) +} + +const serverOptions = { + port: 1389 +} + +if (cluster.isMaster) { + const server = net.createServer(serverOptions, (socket) => { + socket.pause() + console.log('ldapjs client requesting connection') + const routeTo = threads.getNext() + threads[routeTo].send({ type: 'connection' }, socket) + }) + + for (let i = 0; i < os.cpus().length; i++) { + const thread = cluster.fork({ + id: i + }) + thread.id = i + thread.on('message', function () { + + }) + threads.push(thread) + } + + server.listen(serverOptions.port, function () { + console.log('ldapjs listening at ldap://127.0.0.1:' + serverOptions.port) + }) +} else { + const server = ldap.createServer(serverOptions) + + const threadId = process.env.id + + process.on('message', (msg, socket) => { + switch (msg.type) { + case 'connection': + server.newConnection(socket) + socket.resume() + console.log('ldapjs client connection accepted on ' + threadId.toString()) + } + }) + + server.search('dc=example', function (req, res) { + console.log('ldapjs search initiated on ' + threadId.toString()) + const obj = { + dn: req.dn.toString(), + attributes: { + objectclass: ['organization', 'top'], + o: 'example' + } + } + + if (req.filter.matches(obj.attributes)) { res.send(obj) } + + res.end() + }) +} diff --git a/examples/cluster-threading.js b/examples/cluster-threading.js new file mode 100644 index 0000000..8def06a --- /dev/null +++ b/examples/cluster-threading.js @@ -0,0 +1,65 @@ +const cluster = require('cluster') +const ldap = require('ldapjs') +const os = require('os') + +const threads = [] +threads.getNext = function () { + return (Math.floor(Math.random() * this.length)) +} + +const serverOptions = { + connectionRouter: (socket) => { + socket.pause() + console.log('ldapjs client requesting connection') + const routeTo = threads.getNext() + threads[routeTo].send({ type: 'connection' }, socket) + } +} + +const server = ldap.createServer(serverOptions) + +if (cluster.isMaster) { + for (let i = 0; i < os.cpus().length; i++) { + const thread = cluster.fork({ + id: i + }) + thread.id = i + thread.on('message', function () { + + }) + threads.push(thread) + } + + server.listen(1389, function () { + console.log('ldapjs listening at ' + server.url) + }) +} else { + const threadId = process.env.id + serverOptions.connectionRouter = () => { + console.log('should not be hit') + } + + process.on('message', (msg, socket) => { + switch (msg.type) { + case 'connection': + server.newConnection(socket) + socket.resume() + console.log('ldapjs client connection accepted on ' + threadId.toString()) + } + }) + + server.search('dc=example', function (req, res) { + console.log('ldapjs search initiated on ' + threadId.toString()) + const obj = { + dn: req.dn.toString(), + attributes: { + objectclass: ['organization', 'top'], + o: 'example' + } + } + + if (req.filter.matches(obj.attributes)) { res.send(obj) } + + res.end() + }) +} diff --git a/lib/server.js b/lib/server.js index 512c679..74f2a67 100644 --- a/lib/server.js +++ b/lib/server.js @@ -313,7 +313,9 @@ function Server (options) { return c } - function newConnection (conn) { + self.newConnection = function (conn) { + // TODO: make `newConnection` available on the `Server` prototype + // https://github.com/ldapjs/node-ldapjs/pull/727/files#r636572294 setupConnection(conn) log.trace('new connection from %s', conn.ldap.id) @@ -438,9 +440,9 @@ function Server (options) { this.routes = {} if ((options.cert || options.certificate) && options.key) { options.cert = options.cert || options.certificate - this.server = tls.createServer(options, newConnection) + this.server = tls.createServer(options, options.connectionRouter ? options.connectionRouter : self.newConnection) } else { - this.server = net.createServer(newConnection) + this.server = net.createServer(options.connectionRouter ? options.connectionRouter : self.newConnection) } this.server.log = options.log this.server.ldap = { diff --git a/test/server.test.js b/test/server.test.js index 949cc27..459cfa3 100644 --- a/test/server.test.js +++ b/test/server.test.js @@ -1,5 +1,6 @@ 'use strict' +const net = require('net') const tap = require('tap') const vasync = require('vasync') const { getSock } = require('./utils') @@ -331,3 +332,49 @@ tap.test('close passes error to callback', function (t) { t.end() }) }) + +tap.test('multithreading support via external server', function (t) { + const serverOptions = { } + const server = ldap.createServer(serverOptions) + const fauxServer = net.createServer(serverOptions, (connection) => { + server.newConnection(connection) + }) + fauxServer.log = serverOptions.log + fauxServer.ldap = { + config: serverOptions + } + t.ok(server) + fauxServer.listen(5555, 'localhost', function () { + t.ok(true, 'server listening on ' + server.url) + + t.ok(fauxServer) + const client = ldap.createClient({ url: 'ldap://127.0.0.1:5555' }) + client.on('connect', function () { + t.ok(client) + client.unbind() + fauxServer.close(() => t.end()) + }) + }) +}) + +tap.test('multithreading support via hook', function (t) { + const serverOptions = { + connectionRouter: (connection) => { + server.newConnection(connection) + } + } + const server = ldap.createServer(serverOptions) + const fauxServer = ldap.createServer(serverOptions) + t.ok(server) + fauxServer.listen(0, 'localhost', function () { + t.ok(true, 'server listening on ' + server.url) + + t.ok(fauxServer) + const client = ldap.createClient({ url: fauxServer.url }) + client.on('connect', function () { + t.ok(client) + client.unbind() + fauxServer.close(() => t.end()) + }) + }) +})