mycdc/consumer.js

122 lines
2.8 KiB
JavaScript
Raw Normal View History

require('require-yaml');
require('colors');
2022-10-23 19:46:07 +00:00
const fs = require('fs');
const path = require('path');
const mysql = require('mysql2/promise');
const amqp = require('amqplib');
2022-11-06 12:00:55 +00:00
const Queue = require('./queue');
2022-10-25 11:20:22 +00:00
const {cpus} = require('os');
2022-10-23 19:46:07 +00:00
class Consumer {
async start() {
2022-11-06 12:00:55 +00:00
const defaultConfig = require('./config/consumer.yml');
const conf = this.conf = Object.assign({}, defaultConfig);
const localPath = path.join(__dirname, 'config/consumer.local.yml');
if (fs.existsSync(localPath)) {
const localConfig = require(localPath);
2022-11-06 12:00:55 +00:00
Object.assign(conf, localConfig);
}
2022-11-06 12:00:55 +00:00
if (conf.testMode)
2022-10-23 19:46:07 +00:00
console.log('Test mode enabled, just logging queries to console.');
console.log('Starting process.');
await this.init();
console.log('Process started.');
2022-10-25 11:20:22 +00:00
await this.consumeQueues();
2022-10-23 19:46:07 +00:00
}
async stop() {
console.log('Stopping process.');
await this.end();
console.log('Process stopped.');
}
async init() {
2022-11-06 12:00:55 +00:00
const conf = this.conf;
2022-10-23 19:46:07 +00:00
this.onErrorListener = err => this.onError(err);
2022-10-25 11:20:22 +00:00
const dbConfig = Object.assign({
connectionLimit: cpus().length
2022-11-06 12:00:55 +00:00
}, conf.db);
2022-10-23 19:46:07 +00:00
2022-10-25 11:20:22 +00:00
this.db = await mysql.createPool(dbConfig);
this.db.on('error', this.onErrorListener);
2022-10-23 19:46:07 +00:00
2022-11-06 12:00:55 +00:00
this.amqpConn = await amqp.connect(conf.amqp);
2022-10-25 11:20:22 +00:00
}
2022-10-25 11:20:22 +00:00
async consumeQueues() {
2022-11-06 12:00:55 +00:00
const queuesConf = require('./config/queues.yml');
this.queues = {};
for (const queueName in queuesConf) {
const queue = new Queue(this, queueName, queuesConf[queueName]);
this.queues[queueName] = queue;
await queue.consume();
}
2022-10-23 19:46:07 +00:00
}
async end(silent) {
2022-11-06 12:00:55 +00:00
await this.amqpConn.close();
2022-10-23 19:46:07 +00:00
this.db.off('error', this.onErrorListener);
// FIXME: mysql2/promise bug, db.end() ends process
this.db.on('error', () => {});
try {
await this.db.end();
} catch (err) {
if (!silent)
console.error(err);
}
}
async tryRestart() {
try {
await this.init();
console.log('Process restarted.');
} catch(err) {
setTimeout(() => this.tryRestart(), 30);
}
}
async onError(err) {
console.log(`Error: ${err.code}: ${err.message}`);
try {
await this.end(true);
} catch(e) {}
switch (err.code) {
case 'PROTOCOL_CONNECTION_LOST':
case 'ECONNRESET':
console.log('Trying to restart process.');
await this.tryRestart();
break;
default:
process.exit();
}
}
debug(namespace, message) {
2022-11-06 12:00:55 +00:00
if (this.conf.debug)
2022-10-23 19:46:07 +00:00
console.debug(`${namespace}:`.blue, message.yellow);
}
}
async function main() {
const consumer = new Consumer()
2022-10-23 19:46:07 +00:00
await consumer.start();
process.on('SIGINT', async function() {
console.log('Got SIGINT.');
try {
await consumer.stop();
} catch (err) {
console.error(err);
}
process.exit();
});
}
main();