From 22289d8adcc7c231d19df803c871a952daa97bdd Mon Sep 17 00:00:00 2001 From: Juan Ferrer Toribio Date: Wed, 17 Apr 2024 11:40:44 +0200 Subject: [PATCH] feat: refs #4409 Global refactor, queue polling improved --- README.md | 10 +- .../Dockerfile.consumer | 1 + .../Dockerfile.producer | 1 + run-rabbit.sh => assets/run-rabbit.sh | 0 assets/zongji.sql | 13 +++ zongji.undo.sql => assets/zongji.undo.sql | 0 config/consumer.yml | 3 + config/queues.yml | 107 ------------------ consumer.js | 14 ++- docker-compose.yml | 4 +- {queues => lib}/queue-fk.js | 39 ++++--- {queues => lib}/queue-id.js | 2 +- queue.js => lib/queue.js | 0 mycdc.js | 13 ++- package-lock.json | 83 ++++++++++++-- package.json | 6 + queues/orderTotal.yml | 24 ++++ queues/ticketTotal.yml | 42 +++++++ queues/travelEntries.yml | 9 ++ zongji.sql | 12 -- 20 files changed, 227 insertions(+), 156 deletions(-) rename Dockerfile.consumer => assets/Dockerfile.consumer (98%) rename Dockerfile.producer => assets/Dockerfile.producer (97%) rename run-rabbit.sh => assets/run-rabbit.sh (100%) create mode 100644 assets/zongji.sql rename zongji.undo.sql => assets/zongji.undo.sql (100%) delete mode 100644 config/queues.yml rename {queues => lib}/queue-fk.js (63%) rename {queues => lib}/queue-id.js (96%) rename queue.js => lib/queue.js (100%) create mode 100644 queues/orderTotal.yml create mode 100644 queues/ticketTotal.yml create mode 100644 queues/travelEntries.yml delete mode 100644 zongji.sql diff --git a/README.md b/README.md index daed738..045e7eb 100644 --- a/README.md +++ b/README.md @@ -23,9 +23,15 @@ npm install ## Run application -Launch app. +Start rabbit. ```text -node index.js +npm run rabbit +``` + +Start producer and consumer. +```text +npm start +npm run consumer ``` ## Built With diff --git a/Dockerfile.consumer b/assets/Dockerfile.consumer similarity index 98% rename from Dockerfile.consumer rename to assets/Dockerfile.consumer index 527aa3a..e672333 100644 --- a/Dockerfile.consumer +++ b/assets/Dockerfile.consumer @@ -24,6 +24,7 @@ RUN npm install --omit=dev --no-audit --prefer-offline \ COPY config config COPY queues queues +COPY lib lib COPY \ LICENSE \ README.md \ diff --git a/Dockerfile.producer b/assets/Dockerfile.producer similarity index 97% rename from Dockerfile.producer rename to assets/Dockerfile.producer index c3d2ea8..d677b08 100644 --- a/Dockerfile.producer +++ b/assets/Dockerfile.producer @@ -23,6 +23,7 @@ RUN npm install --omit=dev --no-audit --prefer-offline \ && (cd zongji && npm install --omit=dev) COPY config config +COPY queues queues COPY \ LICENSE \ README.md \ diff --git a/run-rabbit.sh b/assets/run-rabbit.sh similarity index 100% rename from run-rabbit.sh rename to assets/run-rabbit.sh diff --git a/assets/zongji.sql b/assets/zongji.sql new file mode 100644 index 0000000..b9bc14a --- /dev/null +++ b/assets/zongji.sql @@ -0,0 +1,13 @@ + +CREATE TABLE `util`.`binlogQueue`( + `code` VARCHAR(255) NOT NULL, + `logName` VARCHAR(255) NOT NULL, + `position` BIGINT UNSIGNED NOT NULL, + PRIMARY KEY (`code`) +) ENGINE = InnoDB; + +CREATE USER 'mycdc-producer'@'%' IDENTIFIED BY 'P4$$w0rd'; +GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT ON *.* TO 'mycdc-producer'@'%'; +GRANT INSERT, DELETE ON `util`.* TO 'mycdc-producer'@'%'; + +CREATE USER 'mycdc-producer'@'%' IDENTIFIED BY 'P4$$w0rd'; diff --git a/zongji.undo.sql b/assets/zongji.undo.sql similarity index 100% rename from zongji.undo.sql rename to assets/zongji.undo.sql diff --git a/config/consumer.yml b/config/consumer.yml index 824c957..9524fd7 100644 --- a/config/consumer.yml +++ b/config/consumer.yml @@ -1,5 +1,8 @@ debug: false testMode: false +defaults: + mode: fk + flushInterval: 5000 amqp: amqp://user:password@localhost:5672 db: host: localhost diff --git a/config/queues.yml b/config/queues.yml deleted file mode 100644 index c335982..0000000 --- a/config/queues.yml +++ /dev/null @@ -1,107 +0,0 @@ -orderTotal: - query: CALL hedera.order_recalc(:id) - mode: fk - flushInterval: 5000 - includeSchema: - hedera: - order: - key: id - columns: - - id - - address_id - - company_id - - date_send - - customer_id - events: - - updaterows - orderRow: - key: orderFk - columns: - - id - - orderFk - - itemFk - - warehouseFk - - shipment - - amount -ticketTotal: - query: CALL vn.ticket_recalc(:id, NULL) - mode: fk - flushInterval: 5000 - includeSchema: - vn: - ticket: - key: id - columns: - - id - - shipped - - warehouseFk - - clientFk - events: - - updaterows - sale: - key: ticketFk - columns: - - id - - ticketFk - - itemFk - - quantity - - price -stock: - mode: id - includeSchema: - vn: - travel: - query: CALL stock.log_refreshBuy(:table, :id) - key: id - entry: - query: CALL stock.log_refreshBuy(:table, :id) - key: id - columns: - - id - - travelFk - - isRaid - events: - - updaterows - buy: - query: CALL stock.log_refreshBuy(:table, :id) - key: id - columns: - - id - - entryFk - - itemFk - - quantity - - created - ticket: - query: CALL stock.log_refreshSale(:table, :id) - key: id - columns: - - id - - warehouseFk - - shipped - events: - - updaterows - sale: - query: CALL stock.log_refreshSale(:table, :id) - key: id - columns: - - id - - ticketFk - - itemFk - - quantity - - created - - isPicked - hedera: - order: - query: CALL stock.log_refreshOrder(:table, :id) - key: id - columns: - - id - - date_send - - address_id - - company_id - - customer_id - events: - - updaterows - orderRow: - query: CALL stock.log_refreshOrder(:table, :id) - key: id diff --git a/consumer.js b/consumer.js index 33476bf..ccd7650 100644 --- a/consumer.js +++ b/consumer.js @@ -7,8 +7,8 @@ const amqp = require('amqplib'); const {cpus} = require('os'); const queues = { - id: require('./queues/queue-id'), - fk: require('./queues/queue-fk') + id: require('./lib/queue-id'), + fk: require('./lib/queue-fk') }; class Consumer { @@ -53,14 +53,16 @@ class Consumer { } async consumeQueues() { - const queuesConf = require('./config/queues.yml'); + const {conf} = this; + const {defaults} = this.conf; this.queues = {}; - for (const queueName in queuesConf) { - const queueConf = queuesConf[queueName]; + for (const queueName of conf.pollQueues) { + const confFile = path.join(__dirname, 'queues', `${queueName}.yml`); + const queueConf = Object.assign({}, defaults, require(confFile)); + const {mode} = queueConf; const QueueClass = queues[mode]; - if (!QueueClass) { console.warn(`Ignoring queue '${queueName}' with unknown mode '${mode}'`); continue; diff --git a/docker-compose.yml b/docker-compose.yml index 6ed67f4..bc42127 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,9 +4,9 @@ services: image: registry.verdnatura.es/mycdc-producer:${VERSION:?} build: context: . - dockerfile: Dockerfile.producer + dockerfile: assets/Dockerfile.producer consumer: image: registry.verdnatura.es/mycdc-consumer:${VERSION:?} build: context: . - dockerfile: Dockerfile.consumer + dockerfile: assets/Dockerfile.consumer diff --git a/queues/queue-fk.js b/lib/queue-fk.js similarity index 63% rename from queues/queue-fk.js rename to lib/queue-fk.js index b47f785..5f609ed 100644 --- a/queues/queue-fk.js +++ b/lib/queue-fk.js @@ -1,4 +1,4 @@ -const Queue = require('../queue'); +const Queue = require('./queue'); module.exports = class QueueFk extends Queue { constructor(consumer, name, conf) { @@ -14,7 +14,7 @@ module.exports = class QueueFk extends Queue { reset() { this.lastMessage = null; this.nMessages = 0; - this.ids = new Set(); + this.scopes = new Map(); } flush(flushInterval) { @@ -27,25 +27,27 @@ module.exports = class QueueFk extends Queue { } async onFlushTimeout() { - const consumer = this.consumer; - - if (this.ids.size) { + if (this.nMessages) { + const {consumer} = this; + if (consumer.conf.debug) console.debug('Flush:'.blue, this.name.yellow, this.ids); - const ids = Array.from(this.ids); + const scopes = this.scopes; const lastMessage = this.lastMessage; this.reset(); try { - for (const id of ids) { - const params = {id}; - //const sql = consumer.db.format(this.conf.query, params); - const sql = this.conf.query; - if (consumer.conf.debug) - console.debug('SQL:'.blue, sql, params); - if (!consumer.conf.testMode) - await consumer.db.query(sql, params); + for (const [scope, ids] of scopes) { + let query = this.conf.query[scope]; + for (const id of ids) { + const params = {id}; + //query = consumer.db.format(query, params); + if (consumer.conf.debug) + console.debug('SQL:'.blue, query, params); + if (!consumer.conf.testMode) + await consumer.db.query(query, params); + } } await this.channel.ack(lastMessage, true); @@ -59,15 +61,20 @@ module.exports = class QueueFk extends Queue { } async onConsume(msg) { + const {conf} = this; const consumer = this.consumer; const data = JSON.parse(msg.content.toString()); if (consumer.conf.debug) console.debug('Message:'.blue, this.name.yellow, data.table); - const ids = this.ids; - const key = this.conf.includeSchema[data.schema][data.table].key; + const tableConf = conf.includeSchema[data.schema][data.table]; + const scope = tableConf.scope ?? data.table; + let ids = this.scopes.get(scope); + if (!ids) this.scopes.set(scope, ids = new Set()); + + const key = tableConf.key; if (data.eventName === 'updaterows') { for (const row of data.rows) { ids.add(row.before[key]); diff --git a/queues/queue-id.js b/lib/queue-id.js similarity index 96% rename from queues/queue-id.js rename to lib/queue-id.js index eea9db7..225c642 100644 --- a/queues/queue-id.js +++ b/lib/queue-id.js @@ -1,4 +1,4 @@ -const Queue = require('../queue'); +const Queue = require('./queue'); module.exports = class QueueId extends Queue { async onConsume(msg) { diff --git a/queue.js b/lib/queue.js similarity index 100% rename from queue.js rename to lib/queue.js diff --git a/mycdc.js b/mycdc.js index 746567b..ed408eb 100644 --- a/mycdc.js +++ b/mycdc.js @@ -1,6 +1,6 @@ require('require-yaml'); require('colors'); -const fs = require('fs'); +const fs = require('fs-extra'); const path = require('path'); const ZongJi = require('./zongji'); const mysql = require('mysql2/promise'); @@ -29,7 +29,16 @@ module.exports = class MyCDC { const localConfig = require(localPath); Object.assign(conf, localConfig); } - this.queuesConf = require('./config/queues.yml'); + + this.queuesConf = {}; + const queueDir = path.join(__dirname, 'queues'); + const queueFiles = await fs.readdir(queueDir); + for (const queueFile of queueFiles) { + const match = queueFile.match(/^([a-zA-Z0-9-_]+)\.ya?ml$/); + if (!match) + throw new Error(`Invalid queue file name '${queueFile}'`); + this.queuesConf[match[1]] = require(path.join(queueDir, queueFile)); + } const queues = this.queuesConf; for (const queueName in queues) { diff --git a/package-lock.json b/package-lock.json index 2037601..dfe5628 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,16 +1,17 @@ { "name": "mycdc", - "version": "0.0.5", + "version": "0.0.8", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "mycdc", - "version": "0.0.5", + "version": "0.0.8", "license": "GPL-3.0", "dependencies": { "amqplib": "^0.10.3", "colors": "^1.4.0", + "fs-extra": "^11.2.0", "mysql2": "^3.9.3", "require-yaml": "^0.0.1", "zongji": "file:../zongji" @@ -94,6 +95,19 @@ "node": ">=0.10" } }, + "node_modules/fs-extra": { + "version": "11.2.0", + "resolved": "https://registry.npmjs.org/fs-extra/-/fs-extra-11.2.0.tgz", + "integrity": "sha512-PmDi3uwK5nFuXh7XDTlVnS17xJS7vW36is2+w3xcv8SVxiB4NyATf4ctkVY5bkSjX0Y4nbvZCq1/EjtEyr9ktw==", + "dependencies": { + "graceful-fs": "^4.2.0", + "jsonfile": "^6.0.1", + "universalify": "^2.0.0" + }, + "engines": { + "node": ">=14.14" + } + }, "node_modules/generate-function": { "version": "2.3.1", "resolved": "https://registry.npmjs.org/generate-function/-/generate-function-2.3.1.tgz", @@ -102,6 +116,11 @@ "is-property": "^1.0.2" } }, + "node_modules/graceful-fs": { + "version": "4.2.11", + "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.11.tgz", + "integrity": "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ==" + }, "node_modules/iconv-lite": { "version": "0.6.3", "resolved": "https://registry.npmjs.org/iconv-lite/-/iconv-lite-0.6.3.tgz", @@ -139,6 +158,17 @@ "js-yaml": "bin/js-yaml.js" } }, + "node_modules/jsonfile": { + "version": "6.1.0", + "resolved": "https://registry.npmjs.org/jsonfile/-/jsonfile-6.1.0.tgz", + "integrity": "sha512-5dgndWOriYSm5cnYaJNhalLNDKOqFwyDB/rr1E9ZsGciGvKPs8R2xYGCacuf3z6K1YKDz182fd+fY3cn3pMqXQ==", + "dependencies": { + "universalify": "^2.0.0" + }, + "optionalDependencies": { + "graceful-fs": "^4.1.6" + } + }, "node_modules/long": { "version": "5.2.3", "resolved": "https://registry.npmjs.org/long/-/long-5.2.3.tgz", @@ -158,9 +188,9 @@ "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, "node_modules/mysql2": { - "version": "3.9.3", - "resolved": "https://registry.npmjs.org/mysql2/-/mysql2-3.9.3.tgz", - "integrity": "sha512-+ZaoF0llESUy7BffccHG+urErHcWPZ/WuzYAA9TEeLaDYyke3/3D+VQDzK9xzRnXpd0eMtRf0WNOeo4Q1Baung==", + "version": "3.9.4", + "resolved": "https://registry.npmjs.org/mysql2/-/mysql2-3.9.4.tgz", + "integrity": "sha512-OEESQuwxMza803knC1YSt7NMuc1BrK9j7gZhCSs2WAyxr1vfiI7QLaLOKTh5c9SWGz98qVyQUbK8/WckevNQhg==", "dependencies": { "denque": "^2.1.0", "generate-function": "^2.3.1", @@ -251,6 +281,14 @@ "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", "integrity": "sha512-ev2QzSzWPYmy9GuqfIVildA4OdcGLeFZQrq5ys6RtiuF+RQQiZWr8TZNyAcuVXyQRYfEO+MsoB/1BuQVhOJuoQ==" }, + "node_modules/universalify": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/universalify/-/universalify-2.0.1.tgz", + "integrity": "sha512-gptHNQghINnc/vTGIk0SOFGFNXw7JVrlRUtConJRlvaw6DuX0wO5Jeko9sWrMBhh+PsYAZ7oXAiOnf/UKogyiw==", + "engines": { + "node": ">= 10.0.0" + } + }, "node_modules/url-parse": { "version": "1.5.10", "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.10.tgz", @@ -320,6 +358,16 @@ "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==" }, + "fs-extra": { + "version": "11.2.0", + "resolved": "https://registry.npmjs.org/fs-extra/-/fs-extra-11.2.0.tgz", + "integrity": "sha512-PmDi3uwK5nFuXh7XDTlVnS17xJS7vW36is2+w3xcv8SVxiB4NyATf4ctkVY5bkSjX0Y4nbvZCq1/EjtEyr9ktw==", + "requires": { + "graceful-fs": "^4.2.0", + "jsonfile": "^6.0.1", + "universalify": "^2.0.0" + } + }, "generate-function": { "version": "2.3.1", "resolved": "https://registry.npmjs.org/generate-function/-/generate-function-2.3.1.tgz", @@ -328,6 +376,11 @@ "is-property": "^1.0.2" } }, + "graceful-fs": { + "version": "4.2.11", + "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.11.tgz", + "integrity": "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ==" + }, "iconv-lite": { "version": "0.6.3", "resolved": "https://registry.npmjs.org/iconv-lite/-/iconv-lite-0.6.3.tgz", @@ -359,6 +412,15 @@ "argparse": "^2.0.1" } }, + "jsonfile": { + "version": "6.1.0", + "resolved": "https://registry.npmjs.org/jsonfile/-/jsonfile-6.1.0.tgz", + "integrity": "sha512-5dgndWOriYSm5cnYaJNhalLNDKOqFwyDB/rr1E9ZsGciGvKPs8R2xYGCacuf3z6K1YKDz182fd+fY3cn3pMqXQ==", + "requires": { + "graceful-fs": "^4.1.6", + "universalify": "^2.0.0" + } + }, "long": { "version": "5.2.3", "resolved": "https://registry.npmjs.org/long/-/long-5.2.3.tgz", @@ -375,9 +437,9 @@ "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, "mysql2": { - "version": "3.9.3", - "resolved": "https://registry.npmjs.org/mysql2/-/mysql2-3.9.3.tgz", - "integrity": "sha512-+ZaoF0llESUy7BffccHG+urErHcWPZ/WuzYAA9TEeLaDYyke3/3D+VQDzK9xzRnXpd0eMtRf0WNOeo4Q1Baung==", + "version": "3.9.4", + "resolved": "https://registry.npmjs.org/mysql2/-/mysql2-3.9.4.tgz", + "integrity": "sha512-OEESQuwxMza803knC1YSt7NMuc1BrK9j7gZhCSs2WAyxr1vfiI7QLaLOKTh5c9SWGz98qVyQUbK8/WckevNQhg==", "requires": { "denque": "^2.1.0", "generate-function": "^2.3.1", @@ -458,6 +520,11 @@ "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", "integrity": "sha512-ev2QzSzWPYmy9GuqfIVildA4OdcGLeFZQrq5ys6RtiuF+RQQiZWr8TZNyAcuVXyQRYfEO+MsoB/1BuQVhOJuoQ==" }, + "universalify": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/universalify/-/universalify-2.0.1.tgz", + "integrity": "sha512-gptHNQghINnc/vTGIk0SOFGFNXw7JVrlRUtConJRlvaw6DuX0wO5Jeko9sWrMBhh+PsYAZ7oXAiOnf/UKogyiw==" + }, "url-parse": { "version": "1.5.10", "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.10.tgz", diff --git a/package.json b/package.json index 562ca5e..d094b4c 100644 --- a/package.json +++ b/package.json @@ -14,8 +14,14 @@ "dependencies": { "amqplib": "^0.10.3", "colors": "^1.4.0", + "fs-extra": "^11.2.0", "mysql2": "^3.9.3", "require-yaml": "^0.0.1", "zongji": "file:../zongji" + }, + "scripts": { + "start": "node index.js", + "consumer": "node consumer.js", + "rabbit": "assets/run-rabbit.sh" } } diff --git a/queues/orderTotal.yml b/queues/orderTotal.yml new file mode 100644 index 0000000..0814d5c --- /dev/null +++ b/queues/orderTotal.yml @@ -0,0 +1,24 @@ +query: + order: CALL hedera.order_recalc(:id) +includeSchema: + hedera: + order: + key: id + columns: + - id + - address_id + - company_id + - date_send + - customer_id + events: + - updaterows + orderRow: + key: orderFk + scope: order + columns: + - id + - orderFk + - itemFk + - warehouseFk + - shipment + - amount diff --git a/queues/ticketTotal.yml b/queues/ticketTotal.yml new file mode 100644 index 0000000..3517035 --- /dev/null +++ b/queues/ticketTotal.yml @@ -0,0 +1,42 @@ +query: + ticket: CALL vn.ticket_recalc(:id, NULL) + client: CALL vn.ticket_recalcByScope(:table, :id) + address: CALL vn.ticket_recalcByScope(:table, :id) +includeSchema: + vn: + ticket: + key: id + columns: + - id + - clientFk + - addressFk + - companyFk + - shipped + events: + - updaterows + ticketService: + key: ticketFk + scope: ticket + columns: + - ticketFk + - price + - quantity + sale: + key: ticketFk + scope: ticket + columns: + - id + - ticketFk + - itemFk + - quantity + - price + - discount + client: + key: id + columns: + - provinceFk + - isVies + address: + key: id + columns: + - isEqualizated diff --git a/queues/travelEntries.yml b/queues/travelEntries.yml new file mode 100644 index 0000000..f56ad2d --- /dev/null +++ b/queues/travelEntries.yml @@ -0,0 +1,9 @@ +query: + travel: CALL vn.travel_recalc(:id) +includeSchema: + vn: + entry: + key: travelFk + scope: travel + columns: + - travelfk diff --git a/zongji.sql b/zongji.sql deleted file mode 100644 index bd3fc83..0000000 --- a/zongji.sql +++ /dev/null @@ -1,12 +0,0 @@ - -CREATE TABLE `util`.`binlogQueue`( - `code` VARCHAR(255) NOT NULL, - `logName` VARCHAR(255) NOT NULL, - `position` BIGINT UNSIGNED NOT NULL, - PRIMARY KEY (`code`) -) ENGINE = InnoDB; - -CREATE USER 'zongji'@'%' IDENTIFIED BY 'password'; -GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT ON *.* TO 'zongji'@'%'; - -GRANT INSERT, DELETE ON `util`.* TO 'zongji'@'%';