require('require-yaml'); require('colors'); const fs = require('fs'); const path = require('path'); const mysql = require('mysql2/promise'); const amqp = require('amqplib'); const {cpus} = require('os'); const queues = { id: require('./lib/queue-id'), fk: require('./lib/queue-fk') }; class Consumer { async start() { const defaultConfig = require('./config/consumer.yml'); const conf = this.conf = Object.assign({}, defaultConfig); const localPath = path.join(__dirname, 'config/consumer.local.yml'); if (fs.existsSync(localPath)) { const localConfig = require(localPath); Object.assign(conf, localConfig); } if (conf.testMode) console.log('Test mode enabled, just logging queries to console.'); console.log('Starting process.'); await this.init(); console.log('Process started.'); await this.consumeQueues(); } async stop() { console.log('Stopping process.'); await this.end(); console.log('Process stopped.'); } async init() { const conf = this.conf; this.onErrorListener = err => this.onError(err); const dbConfig = Object.assign({ connectionLimit: cpus().length, namedPlaceholders: true }, conf.db); this.db = await mysql.createPool(dbConfig); this.db.on('error', this.onErrorListener); this.amqpConn = await amqp.connect(conf.amqp); } async consumeQueues() { const {conf} = this; const {defaults} = this.conf; this.queues = {}; for (const queueName of conf.pollQueues) { const confFile = path.join(__dirname, 'queues', `${queueName}.yml`); const queueConf = Object.assign({}, defaults, require(confFile)); const {mode} = queueConf; const QueueClass = queues[mode]; if (!QueueClass) { console.warn(`Ignoring queue '${queueName}' with unknown mode '${mode}'`); continue; } const queue = new QueueClass(this, queueName, queueConf); this.queues[queueName] = queue; await queue.consume(); } } async end(silent) { await this.amqpConn.close(); this.db.off('error', this.onErrorListener); // FIXME: mysql2/promise bug, db.end() ends process this.db.on('error', () => {}); try { await this.db.end(); } catch (err) { if (!silent) console.error(err); } } async tryRestart() { try { await this.init(); console.log('Process restarted.'); } catch(err) { setTimeout(() => this.tryRestart(), 30); } } async onError(err) { console.log(`Error: ${err.code}: ${err.message}`); try { await this.end(true); } catch(e) {} switch (err.code) { case 'PROTOCOL_CONNECTION_LOST': case 'ECONNRESET': console.log('Trying to restart process.'); await this.tryRestart(); break; default: process.exit(); } } debug(namespace, message) { if (this.conf.debug) console.debug(`${namespace}:`.blue, message.yellow); } } async function main() { const consumer = new Consumer() await consumer.start(); process.on('SIGINT', async function() { console.log('Got SIGINT.'); try { await consumer.stop(); } catch (err) { console.error(err); } process.exit(); }); } main();