From 77270ed10d4168e1996e7957a766503abc32e558 Mon Sep 17 00:00:00 2001 From: Juan Ferrer Toribio Date: Mon, 24 Oct 2022 18:11:25 +0200 Subject: [PATCH] Multi-queue alpha, yml config, code clean, refactor --- config.json | 57 ------- config.yml | 66 ++++++++ consumer.js | 78 +++++---- index.js | 15 +- mycdc.js | 290 +++++++++++++++++++++++----------- package-lock.json | 48 +++++- package.json | 1 + run-queue.sh => run-rabbit.sh | 0 zongji.sql | 2 - 9 files changed, 353 insertions(+), 204 deletions(-) delete mode 100644 config.json create mode 100644 config.yml rename run-queue.sh => run-rabbit.sh (100%) diff --git a/config.json b/config.json deleted file mode 100644 index 731db2c..0000000 --- a/config.json +++ /dev/null @@ -1,57 +0,0 @@ -{ - "debug": true, - "testMode": false, - "db": { - "host": "localhost", - "port": 3306, - "user": "zongji", - "password": "password", - "database": "util" - }, - "consumerDb": { - "host": "localhost", - "port": 3306, - "user": "zongji", - "password": "password", - "database": "util" - }, - "amqp": "amqp://user:password@localhost:5672", - "includeEvents": [ - "rotate", - "tablemap", - "writerows", - "updaterows", - "deleterows" - ], - "pingInterval": 60, - "flushInterval": 5000, - "queue": "orderRecalc", - "addQuery": "INSERT INTO `hedera`.`orderRecalc` (`orderFk`) VALUES ?", - "recalcQuery": "CALL `hedera`.`order_recalc`(?)", - "includeSchema": { - "hedera": { - "order": { - "fk": "id", - "events": ["updaterows"], - "columns": [ - "id", - "address_id", - "company_id", - "date_send", - "customer_id" - ] - }, - "orderRow": { - "fk": "orderFk", - "columns": [ - "id", - "orderFk", - "itemFk", - "warehouseFk", - "shipment", - "amount" - ] - } - } - } -} diff --git a/config.yml b/config.yml new file mode 100644 index 0000000..155f454 --- /dev/null +++ b/config.yml @@ -0,0 +1,66 @@ + +debug: true +testMode: true +db: + host: localhost + port: 3306 + user: zongji + password: password + database: util +consumerDb: + host: localhost + port: 3306 + user: zongji + password: password + database: util +amqp: amqp://user:password@localhost:5672 +pingInterval: 60 +flushInterval: 5000 +queues: + orderTotal: + query: CALL hedera.order_recalc(?) + mode: fk + includeSchema: + hedera: + order: + fk: id + events: + - updaterows + columns: + - id + - address_id + - company_id + - date_send + - customer_id + orderRow: + fk: orderFk + table: order + columns: + - id + - orderFk + - itemFk + - warehouseFk + - shipment + - amount + comparative: + query: CALL vn.comparative_refresh(?table, ?id, ?data) + mode: changes + includeSchema: + vn: + ticket: + id: id + events: + - updaterows + columns: + - id + - shipped + - warehouseFk + - isDeleted + sale: + id: id + columns: + - id + - ticketFk + - itemFk + - quantity + - price diff --git a/consumer.js b/consumer.js index d78f8df..966d2bf 100644 --- a/consumer.js +++ b/consumer.js @@ -1,26 +1,21 @@ - +require('require-yaml'); +require('colors'); const fs = require('fs'); const path = require('path'); -const defaultConfig = require('./config.json'); - const mysql = require('mysql2/promise'); const amqp = require('amqplib'); -require('colors'); - -const config = Object.assign({}, defaultConfig); -const localPath = path.join(__dirname, 'config.local.json'); -if (fs.existsSync(localPath)) { - const localConfig = require(localPath); - Object.assign(config, localConfig); -} class Consumer { - constructor(config) { - this.config = config; - } - async start() { - if (this.config.testMode) + const defaultConfig = require('./config.yml'); + const config = this.config = Object.assign({}, defaultConfig); + const localPath = path.join(__dirname, 'config.local.yml'); + if (fs.existsSync(localPath)) { + const localConfig = require(localPath); + Object.assign(config, localConfig); + } + + if (config.testMode) console.log('Test mode enabled, just logging queries to console.'); console.log('Starting process.'); @@ -35,20 +30,25 @@ class Consumer { } async init() { + const config = this.config; this.onErrorListener = err => this.onError(err); - this.db = await mysql.createConnection(this.config.consumerDb); + this.db = await mysql.createConnection(config.consumerDb); this.db.on('error', this.onErrorListener); this.pingInterval = setInterval( - () => this.connectionPing(), this.config.pingInterval * 1000); + () => this.connectionPing(), config.pingInterval * 1000); - this.consumer = await amqp.connect(this.config.amqp); + this.consumer = await amqp.connect(config.amqp); this.channel = await this.consumer.createChannel(); - this.channel.assertQueue(this.config.queue, { - durable: true - }); - this.channel.consume(this.config.queue, msg => this.onConsume(msg)); + + for (const queueName in config.queues) { + await this.channel.assertQueue(queueName, { + durable: true + }); + await this.channel.consume(queueName, + msg => this.onConsume(msg, queueName)); + } } async end(silent) { @@ -72,18 +72,28 @@ class Consumer { await this.db.ping(); } - async onConsume(msg) { - const fks = JSON.parse(msg.content.toString()); - if (this.config.debug) - console.debug('RabbitMQ message'.blue, fks); + async onConsume(msg, queueName) { + const config = this.config; - for (const fk of fks) { - if (this.config.debug) - console.debug('Query'.blue, this.config.recalcQuery.yellow); - await this.db.query(this.config.recalcQuery, fk); + const data = JSON.parse(msg.content.toString()); + if (config.debug) + console.debug('Message:'.blue, queueName.yellow, fks); + + const queue = config.queues[queueName]; + const query = queue.query; + if (!query) return; + + if (!config.testMode) + switch(queue.mode) { + case 'fk': + for (const fk of data.fks) + await this.db.query(query, fk); + break; + case 'changes': + break; } - this.channel.ack(msg); + await this.channel.ack(msg); } async tryRestart() { @@ -118,10 +128,8 @@ class Consumer { } } -let consumer; - async function main() { - consumer = new Consumer(config) + const consumer = new Consumer() await consumer.start(); process.on('SIGINT', async function() { diff --git a/index.js b/index.js index 51845f5..c491456 100644 --- a/index.js +++ b/index.js @@ -1,20 +1,7 @@ - -const fs = require('fs'); -const path = require('path'); -const defaultConfig = require('./config.json'); const MyCDC = require('./mycdc'); -const config = Object.assign({}, defaultConfig); -const localPath = path.join(__dirname, 'config.local.json'); -if (fs.existsSync(localPath)) { - const localConfig = require(localPath); - Object.assign(config, localConfig); -} - -let mycdc; - async function main() { - mycdc = new MyCDC(config) + const mycdc = new MyCDC() await mycdc.start(); process.on('SIGINT', async function() { diff --git a/mycdc.js b/mycdc.js index b1ff721..eaff877 100644 --- a/mycdc.js +++ b/mycdc.js @@ -1,62 +1,107 @@ +require('require-yaml'); +require('colors'); +const fs = require('fs'); +const path = require('path'); const ZongJi = require('./zongji'); const mysql = require('mysql2/promise'); const amqp = require('amqplib'); -require('colors'); -const allEvents = new Set([ - 'writerows', - 'updaterows', - 'deleterows' - ]); +const allEvents = [ + 'writerows', + 'updaterows', + 'deleterows' +]; module.exports = class MyCDC { - constructor(config) { - this.config = config; + constructor() { this.running = false; this.filename = null; this.position = null; this.schemaMap = new Map(); this.fks = new Set(); - - const includeSchema = {}; - for (const schemaName in this.config.includeSchema) { - const schema = this.config.includeSchema[schemaName]; - const tables = []; - const tableMap = new Map(); - - for (const tableName in schema) { - const table = schema[tableName]; - tables.push(tableName); - - const tableInfo = { - events: allEvents, - columns: true, - fk: 'id' - }; - tableMap.set(tableName, tableInfo); - - if (typeof table === 'object') { - if (Array.isArray(table.events)) - tableInfo.events = new Set(table.events); - if (Array.isArray(table.columns)) - tableInfo.columns = new Set(table.columns); - if (table.fk) - tableInfo.fk = table.fk; - } - } - - includeSchema[schemaName] = tables; - this.schemaMap.set(schemaName, tableMap); - } - - this.opts = { - includeEvents: this.config.includeEvents, - includeSchema - }; + this.queues = {}; } async start() { - if (this.config.testMode) + const defaultConfig = require('./config.yml'); + const config = this.config = Object.assign({}, defaultConfig); + const localPath = path.join(__dirname, 'config.local.yml'); + if (fs.existsSync(localPath)) { + const localConfig = require(localPath); + Object.assign(config, localConfig); + } + + const queues = config.queues; + for (const queueName in queues) { + const includeSchema = queues[queueName].includeSchema; + for (const schemaName in includeSchema) { + let tableMap = this.schemaMap.get(schemaName); + if (!tableMap) { + tableMap = new Map(); + this.schemaMap.set(schemaName, tableMap); + } + + const schema = includeSchema[schemaName]; + for (const tableName in schema) { + const table = schema[tableName]; + //if (typeof table !== 'object') continue; + + let tableInfo = tableMap.get(tableName); + if (!tableInfo) { + tableInfo = { + queues: new Map(), + events: new Map(), + columns: new Map(), + fk: 'id' + }; + tableMap.set(tableName, tableInfo); + } + tableInfo.queues.set(queueName, table); + + const events = table.events || allEvents; + for (const event of events) { + let eventInfo = tableInfo.events.get(event); + if (!eventInfo) { + eventInfo = []; + tableInfo.events.set(event, eventInfo); + } + eventInfo.push(queueName); + } + + const columns = table.columns; + for (const column of columns) { + let columnInfo = tableInfo.columns.get(column); + if (!columnInfo) { + columnInfo = []; + tableInfo.columns.set(column, columnInfo); + } + columnInfo.push(queueName); + } + + if (table.id) + tableInfo.id = table.id; + } + + this.schemaMap.set(schemaName, tableMap); + } + } + + const includeSchema = {}; + for (const [schemaName, tableMap] of this.schemaMap) + includeSchema[schemaName] = Array.from(tableMap.keys()); + + this.opts = { + includeEvents: [ + 'rotate', + 'tablemap', + 'writerows', + 'updaterows', + 'deleterows' + ], + includeSchema + }; + + if (config.testMode) console.log('Test mode enabled, just logging queries to console.'); console.log('Starting process.'); @@ -71,25 +116,29 @@ module.exports = class MyCDC { } async init() { - this.debug('DbAsync', 'Initializing.'); + const config = this.config; + this.debug('MyCDC', 'Initializing.'); this.onErrorListener = err => this.onError(err); // DB connection - this.db = await mysql.createConnection(this.config.db); + this.db = await mysql.createConnection(config.db); this.db.on('error', this.onErrorListener); // RabbitMQ - this.publisher = await amqp.connect(this.config.amqp); + this.publisher = await amqp.connect(config.amqp); this.channel = await this.publisher.createChannel(); - this.channel.assertQueue(this.config.queue, { - durable: true - }); + + for (const queueName in config.queues) { + await this.channel.assertQueue(queueName, { + durable: true + }); + } // Zongji - const zongji = new ZongJi(this.config.db); + const zongji = new ZongJi(config.db); this.zongji = zongji; this.onBinlogListener = evt => this.onBinlog(evt); @@ -97,7 +146,7 @@ module.exports = class MyCDC { const [res] = await this.db.query( 'SELECT `logName`, `position` FROM `binlogQueue` WHERE code = ?', - [this.config.queue] + [config.queue] ); if (res.length) { const [row] = res; @@ -132,21 +181,21 @@ module.exports = class MyCDC { this.zongji.on('error', this.onErrorListener); this.flushInterval = setInterval( - () => this.flushQueue(), this.config.flushInterval); + () => this.flushQueue(), config.flushInterval); this.pingInterval = setInterval( - () => this.connectionPing(), this.config.pingInterval * 1000); + () => this.connectionPing(), config.pingInterval * 1000); // Summary this.running = true; - this.debug('DbAsync', 'Initialized.'); + this.debug('MyCDC', 'Initialized.'); } async end(silent) { const zongji = this.zongji; if (!zongji) return; - this.debug('DbAsync', 'Ending.'); + this.debug('MyCDC', 'Ending.'); // Zongji @@ -194,7 +243,7 @@ module.exports = class MyCDC { // Summary - this.debug('DbAsync', 'Ended.'); + this.debug('MyCDC', 'Ended.'); } async tryRestart() { @@ -242,56 +291,107 @@ module.exports = class MyCDC { const tableInfo = tableMap.get(table.tableName); if (!tableInfo) return; - if (!tableInfo.events.has(eventName)) return; + const queueNames = tableInfo.events.get(eventName); + if (!queueNames) return; - let column; const rows = evt.rows; + const queues = this.config.queues; + const tableQueues = tableInfo.queues; - const fks = new Set(); + const changes = new Map(); + for (const queueName of queueNames) { + const change = { + mode: queues[queueName].mode + }; + changes.set(queueName, change); + + switch(change.mode) { + case 'fk': + change.fks = new Set(); + break; + case 'changes': + change.rows = {}; + break; + } + } + + function addChange(row, queueNames) { + for (const queueName of queueNames) { + const queueInfo = tableQueues.get(queueName); + const change = changes.get(queueName); + + switch(change.mode) { + case 'fk': + change.fks.add(row[queueInfo.fk]); + break; + case 'changes': + const queueRow = {}; + for (const column of queueInfo.columns) + if (row[column] !== undefined) + queueRow[column] = row[column]; + change.rows[row[queueInfo.id]] = queueRow; + break; + } + } + } + + const columnMap = tableInfo.columns; + const columns = columnMap.keys(); if (eventName === 'updaterows') { - if (tableInfo.columns !== true) { - let changes = false; - for (const row of rows) { - const after = row.after; - for (const col in after) { - if (tableInfo.columns.has(col) && !equals(after[col], row.before[col])) { - fks.add(after[tableInfo.fk]); - changes = true; - if (!column) column = col; - break; - } - } + const changedQueues = new Set(); + for (const row of rows) { + changedQueues.clear(); + const after = row.after; + + for (const col of columns) { + if (after[col] === undefined + || equals(after[col], row.before[col])) + continue; + + for (const queue of columnMap.get(col)) + changedQueues.add(queue); + + if (changedQueues.size === queueNames.length) + break; } - if (!changes) return; - } else { - for (const row of rows) - fks.add(row.after[tableInfo.fk]); + + if (changedQueues.size) + addChange(after, changedQueues); } + if (!changes) return; } else { for (const row of rows) - fks.add(row[tableInfo.fk]); + addChange(row, queueNames); } - if (fks.size) { - const data = JSON.stringify(Array.from(fks)); - this.channel.sendToQueue(this.config.queue, - Buffer.from(data)); - this.debug('Queued', data); - } + for (const [queueName, change] of changes) { + const jsonData = { + eventName, + table: table.tableName, + schema: table.parentSchema, + mode: change.mode + }; - const row = eventName === 'updaterows' - ? rows[0].after - : rows[0]; - - if (this.config.debug) { - console.debug(`[${eventName}] ${table.tableName}: ${rows.length}`); - console.debug(` ${tableInfo.fk}: ${row[tableInfo.fk]}`); - if (column) { - let before = formatValue(rows[0].before[column]); - let after = formatValue(rows[0].after[column]); - console.debug(` ${column}: ${before} <- ${after}`); + let nChanges; + switch(change.mode) { + case 'fk': + jsonData.fks = Array.from(change.fks); + nChanges = change.fks.size; + break; + case 'changes': + jsonData.rows = change.rows; + nChanges = change.rows.length; + break; } + + if (!nChanges) continue; + + const data = JSON.stringify(jsonData); + this.channel.sendToQueue(queueName, + Buffer.from(data), {persistent: true}); + + console.debug('Queued'.blue, queueName.yellow, `[${eventName}] ${table.tableName}`); } this.position = evt.nextPosition; diff --git a/package-lock.json b/package-lock.json index a8bddee..6d58e66 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,5 +1,5 @@ { - "name": "db-async", + "name": "mycdc", "lockfileVersion": 2, "requires": true, "packages": { @@ -8,6 +8,7 @@ "amqplib": "^0.10.3", "colors": "^1.4.0", "mysql2": "^2.3.3", + "require-yaml": "^0.0.1", "zongji": "file:../zongji" } }, @@ -39,6 +40,11 @@ "node": ">=10" } }, + "node_modules/argparse": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/argparse/-/argparse-2.0.1.tgz", + "integrity": "sha512-8+9WqebbFzpX9OR+Wa6O29asIogeRMzcGtAINdpMHHyAg10f05aSFVBbcEqGf/PXw1EjAZ+q2/bEBg3DvurK3Q==" + }, "node_modules/buffer-more-ints": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/buffer-more-ints/-/buffer-more-ints-1.0.0.tgz", @@ -115,6 +121,17 @@ "resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz", "integrity": "sha512-D2S+3GLxWH+uhrNEcoh/fnmYeP8E8/zHl644d/jdA0g2uyXvy3sb0qxotE+ne0LtccHknQzWwZEzhak7oJ0COQ==" }, + "node_modules/js-yaml": { + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-4.1.0.tgz", + "integrity": "sha512-wpxZs9NoxZaJESJGIZTyDEaYpl0FKSA+FB9aJiyemKhMwkxQg63h4T1KJgUGHpTqPDNRcmmYLugrRjJlBtWvRA==", + "dependencies": { + "argparse": "^2.0.1" + }, + "bin": { + "js-yaml": "bin/js-yaml.js" + } + }, "node_modules/long": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/long/-/long-4.0.0.tgz", @@ -200,6 +217,14 @@ "string_decoder": "~0.10.x" } }, + "node_modules/require-yaml": { + "version": "0.0.1", + "resolved": "https://registry.npmjs.org/require-yaml/-/require-yaml-0.0.1.tgz", + "integrity": "sha512-M6eVEgLPRbeOhgSCnOTtdrOOEQzbXRchg24Xa13c39dMuraFKdI9emUo97Rih0YEFzSICmSKg8w4RQp+rd9pOQ==", + "dependencies": { + "js-yaml": "" + } + }, "node_modules/requires-port": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz", @@ -274,6 +299,11 @@ "url-parse": "~1.5.10" } }, + "argparse": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/argparse/-/argparse-2.0.1.tgz", + "integrity": "sha512-8+9WqebbFzpX9OR+Wa6O29asIogeRMzcGtAINdpMHHyAg10f05aSFVBbcEqGf/PXw1EjAZ+q2/bEBg3DvurK3Q==" + }, "buffer-more-ints": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/buffer-more-ints/-/buffer-more-ints-1.0.0.tgz", @@ -333,6 +363,14 @@ "resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz", "integrity": "sha512-D2S+3GLxWH+uhrNEcoh/fnmYeP8E8/zHl644d/jdA0g2uyXvy3sb0qxotE+ne0LtccHknQzWwZEzhak7oJ0COQ==" }, + "js-yaml": { + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-4.1.0.tgz", + "integrity": "sha512-wpxZs9NoxZaJESJGIZTyDEaYpl0FKSA+FB9aJiyemKhMwkxQg63h4T1KJgUGHpTqPDNRcmmYLugrRjJlBtWvRA==", + "requires": { + "argparse": "^2.0.1" + } + }, "long": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/long/-/long-4.0.0.tgz", @@ -411,6 +449,14 @@ "string_decoder": "~0.10.x" } }, + "require-yaml": { + "version": "0.0.1", + "resolved": "https://registry.npmjs.org/require-yaml/-/require-yaml-0.0.1.tgz", + "integrity": "sha512-M6eVEgLPRbeOhgSCnOTtdrOOEQzbXRchg24Xa13c39dMuraFKdI9emUo97Rih0YEFzSICmSKg8w4RQp+rd9pOQ==", + "requires": { + "js-yaml": "" + } + }, "requires-port": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz", diff --git a/package.json b/package.json index a4e4343..4cca3de 100644 --- a/package.json +++ b/package.json @@ -3,6 +3,7 @@ "amqplib": "^0.10.3", "colors": "^1.4.0", "mysql2": "^2.3.3", + "require-yaml": "^0.0.1", "zongji": "file:../zongji" } } diff --git a/run-queue.sh b/run-rabbit.sh similarity index 100% rename from run-queue.sh rename to run-rabbit.sh diff --git a/zongji.sql b/zongji.sql index bb6e63c..bd3fc83 100644 --- a/zongji.sql +++ b/zongji.sql @@ -10,5 +10,3 @@ CREATE USER 'zongji'@'%' IDENTIFIED BY 'password'; GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT ON *.* TO 'zongji'@'%'; GRANT INSERT, DELETE ON `util`.* TO 'zongji'@'%'; -GRANT INSERT ON `hedera`.`orderRecalc` TO 'zongji'@'%'; -GRANT INSERT ON `vn`.`ticketRecalc` TO 'zongji'@'%';