From 4cc2dc067a77ba5228626eb91e639b0c4e155253 Mon Sep 17 00:00:00 2001 From: Juan Ferrer Toribio Date: Tue, 25 Oct 2022 13:20:22 +0200 Subject: [PATCH] Fixes --- .gitignore | 2 +- config.yml | 100 ++++++++++++++++++++++++++++++++++++++++++-------- consumer.js | 52 +++++++++++++++++--------- mycdc.js | 29 ++++++++------- run-rabbit.sh | 2 +- 5 files changed, 138 insertions(+), 47 deletions(-) diff --git a/.gitignore b/.gitignore index 3f2a19d..2cd4783 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ node_modules zongji -config.local.json \ No newline at end of file +config.local.yml \ No newline at end of file diff --git a/config.yml b/config.yml index 155f454..fedcb9b 100644 --- a/config.yml +++ b/config.yml @@ -1,6 +1,7 @@ debug: true -testMode: true +testMode: false +code: mycdc db: host: localhost port: 3306 @@ -15,7 +16,7 @@ consumerDb: database: util amqp: amqp://user:password@localhost:5672 pingInterval: 60 -flushInterval: 5000 +flushInterval: 10 queues: orderTotal: query: CALL hedera.order_recalc(?) @@ -23,18 +24,17 @@ queues: includeSchema: hedera: order: - fk: id - events: - - updaterows + key: id columns: - id - address_id - company_id - date_send - customer_id + events: + - updaterows orderRow: - fk: orderFk - table: order + key: orderFk columns: - id - orderFk @@ -42,25 +42,95 @@ queues: - warehouseFk - shipment - amount - comparative: - query: CALL vn.comparative_refresh(?table, ?id, ?data) - mode: changes + ticketTotal: + query: CALL vn.ticket_recalc(?) + mode: fk includeSchema: vn: ticket: - id: id - events: - - updaterows + key: id columns: - id - shipped - warehouseFk - - isDeleted + - clientFk + events: + - updaterows sale: - id: id + key: ticketFk columns: - id - ticketFk - itemFk - quantity - price + comparative: + query: CALL vn.comparative_refresh(?, ?, ?) + mode: changes + includeSchema: + vn: + ticket: + key: id + columns: + - id + - shipped + - warehouseFk + - isDeleted + events: + - updaterows + sale: + key: id + columns: + - id + - ticketFk + - itemFk + - quantity + - price + stock: + query: CALL stock.available_refresh(?, ?, ?) + mode: changes + includeSchema: + vn: + ticket: + key: id + columns: + - id + - shipped + - warehouseFk + - isDeleted + events: + - updaterows + sale: + key: id + columns: + - id + - ticketFk + - itemFk + - quantity + travel: + key: id + columns: + - id + - shipped + - landing + - warehouseInFk + - warehouseOutFk + - isDelivered + - isReceived + events: + - updaterows + entry: + key: id + columns: + - id + - travelFk + events: + - updaterows + buy: + key: id + columns: + - id + - entryFk + - itemFk + - quantity + - life diff --git a/consumer.js b/consumer.js index 966d2bf..5228023 100644 --- a/consumer.js +++ b/consumer.js @@ -4,6 +4,8 @@ const fs = require('fs'); const path = require('path'); const mysql = require('mysql2/promise'); const amqp = require('amqplib'); +const {cpus} = require('os'); + class Consumer { async start() { @@ -21,6 +23,8 @@ class Consumer { console.log('Starting process.'); await this.init(); console.log('Process started.'); + + await this.consumeQueues(); } async stop() { @@ -33,16 +37,20 @@ class Consumer { const config = this.config; this.onErrorListener = err => this.onError(err); - this.db = await mysql.createConnection(config.consumerDb); - this.db.on('error', this.onErrorListener); + const dbConfig = Object.assign({ + connectionLimit: cpus().length + }, config.consumerDb); - this.pingInterval = setInterval( - () => this.connectionPing(), config.pingInterval * 1000); + this.db = await mysql.createPool(dbConfig); + this.db.on('error', this.onErrorListener); this.consumer = await amqp.connect(config.amqp); this.channel = await this.consumer.createChannel(); + this.channel.prefetch(1); + } - for (const queueName in config.queues) { + async consumeQueues() { + for (const queueName in this.config.queues) { await this.channel.assertQueue(queueName, { durable: true }); @@ -52,8 +60,6 @@ class Consumer { } async end(silent) { - clearInterval(this.pingInterval); - await this.consumer.close(); this.db.off('error', this.onErrorListener); @@ -67,29 +73,41 @@ class Consumer { } } - async connectionPing() { - this.debug('Ping', 'Sending ping to database.'); - await this.db.ping(); - } - async onConsume(msg, queueName) { const config = this.config; - const data = JSON.parse(msg.content.toString()); + if (config.debug) - console.debug('Message:'.blue, queueName.yellow, fks); + console.debug('Message:'.blue, queueName.yellow, data.table); const queue = config.queues[queueName]; - const query = queue.query; + let query = queue.query; if (!query) return; - if (!config.testMode) + // XXX: Testing + //query = 'SELECT 1 sleep'; + switch(queue.mode) { case 'fk': - for (const fk of data.fks) + for (const fk of data.fks) { + const sql = this.db.format(query, fk); + this.debug('SQL', sql); + if (!config.testMode) await this.db.query(query, fk); + } break; case 'changes': + const queueTable = queue.includeSchema[data.schema][data.table]; + for (const row of data.rows) { + const sql = this.db.format(query, [ + data.table, + row[queueTable.key], + JSON.stringify(row) + ]); + this.debug('SQL', sql); + if (!config.testMode) + await this.db.query(query, row); + } break; } diff --git a/mycdc.js b/mycdc.js index eaff877..92c448f 100644 --- a/mycdc.js +++ b/mycdc.js @@ -18,7 +18,6 @@ module.exports = class MyCDC { this.filename = null; this.position = null; this.schemaMap = new Map(); - this.fks = new Set(); this.queues = {}; } @@ -146,7 +145,7 @@ module.exports = class MyCDC { const [res] = await this.db.query( 'SELECT `logName`, `position` FROM `binlogQueue` WHERE code = ?', - [config.queue] + [config.code] ); if (res.length) { const [row] = res; @@ -181,7 +180,7 @@ module.exports = class MyCDC { this.zongji.on('error', this.onErrorListener); this.flushInterval = setInterval( - () => this.flushQueue(), config.flushInterval); + () => this.flushQueue(), config.flushInterval * 1000); this.pingInterval = setInterval( () => this.connectionPing(), config.pingInterval * 1000); @@ -310,26 +309,31 @@ module.exports = class MyCDC { change.fks = new Set(); break; case 'changes': - change.rows = {}; + change.rows = []; break; } } - function addChange(row, queueNames) { + function addChange(queueNames, row, old) { for (const queueName of queueNames) { const queueInfo = tableQueues.get(queueName); const change = changes.get(queueName); + const key = row[queueInfo.key]; + const oldKey = old ? old[queueInfo.key] : null; + switch(change.mode) { case 'fk': - change.fks.add(row[queueInfo.fk]); + change.fks.add(key); + if (old && !equals(oldKey, key)) + change.fks.add(oldKey); break; case 'changes': const queueRow = {}; for (const column of queueInfo.columns) if (row[column] !== undefined) queueRow[column] = row[column]; - change.rows[row[queueInfo.id]] = queueRow; + change.rows.push(queueRow); break; } } @@ -357,12 +361,11 @@ module.exports = class MyCDC { } if (changedQueues.size) - addChange(after, changedQueues); + addChange(changedQueues, after, row.before); } - if (!changes) return; } else { for (const row of rows) - addChange(row, queueNames); + addChange(queueNames, row); } for (const [queueName, change] of changes) { @@ -391,7 +394,7 @@ module.exports = class MyCDC { this.channel.sendToQueue(queueName, Buffer.from(data), {persistent: true}); - console.debug('Queued'.blue, queueName.yellow, `[${eventName}] ${table.tableName}`); + console.debug('Queued:'.blue, `${queueName}:`.yellow, `${table.tableName}(${nChanges}) [${eventName}]`); } this.position = evt.nextPosition; @@ -403,9 +406,9 @@ module.exports = class MyCDC { this.debug('Flush', `filename: ${this.filename}, position: ${this.position}`); const replaceQuery = - 'REPLACE INTO `binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?'; + 'REPLACE INTO `binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?'; if (!this.config.testMode) - await this.db.query(replaceQuery, [this.config.queue, this.filename, this.position]); + await this.db.query(replaceQuery, [this.config.code, this.filename, this.position]); this.flushed = true; } diff --git a/run-rabbit.sh b/run-rabbit.sh index e3cb283..f211336 100755 --- a/run-rabbit.sh +++ b/run-rabbit.sh @@ -9,4 +9,4 @@ docker run \ -e RABBITMQ_DEFAULT_PASS=password \ -p 5672:5672 \ -p 8080:15672 \ - rabbitmq:3-management + rabbitmq:3.11.2-management