diff --git a/README.md b/README.md index daed738..2b69786 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,16 @@ -# Asynchronous DB calculations reading the binary log +# MyCDC - MySQL change data capture -## Enviroment setup +Application to run asynchronous operations based on DB changes. It is divided +into three services: + +- RabbitMQ: Service that allows the exchange and queuing of messages between +applications. +- Producer: Reads and filters changes from the DB binary log and adds the +relevant messages to each RabbitMQ queue. +- Consumer: Obtains the elements from the queues and performs the necessary +operations asynchronously. + +## Environment setup Because a bug with MariaDB wich it's fix is pending to be merged into main project branch, a *zongji* fork must be cloned into project root directory. @@ -13,8 +23,9 @@ git checkout fix-143 Apply *zongji.sql* script into DB. -Copy *config.json* to *config.local.json* and place your local configuration -there. +Copy *producer.json* and *consumer.json* from *config* directory to +*producer.local.json* and *consumer.local.json* and place your local +configuration there. Install dependencies. ```text @@ -23,12 +34,19 @@ npm install ## Run application -Launch app. +Start rabbit. ```text -node index.js +npm run rabbit +``` + +Start producer and consumer. +```text +npm start +npm run consumer ``` ## Built With * [Zongji](https://github.com/nevill/zongji) * [MySQL2](https://github.com/sidorares/node-mysql2#readme) +* [RabbitMQ] (https://www.rabbitmq.com/) diff --git a/Dockerfile.consumer b/assets/Dockerfile.consumer similarity index 95% rename from Dockerfile.consumer rename to assets/Dockerfile.consumer index 527aa3a..9b5dd4f 100644 --- a/Dockerfile.consumer +++ b/assets/Dockerfile.consumer @@ -22,13 +22,12 @@ RUN npm install --omit=dev --no-audit --prefer-offline \ && git clone --depth 1 --branch fix-143 https://github.com/juan-ferrer-toribio/zongji.git \ && (cd zongji && npm install --omit=dev) +COPY lib lib COPY config config COPY queues queues -COPY \ - LICENSE \ +COPY LICENSE \ README.md \ consumer.js \ - queue.js \ ./ CMD ["node", "consumer.js"] diff --git a/Dockerfile.producer b/assets/Dockerfile.producer similarity index 95% rename from Dockerfile.producer rename to assets/Dockerfile.producer index c3d2ea8..6abc7d0 100644 --- a/Dockerfile.producer +++ b/assets/Dockerfile.producer @@ -23,8 +23,8 @@ RUN npm install --omit=dev --no-audit --prefer-offline \ && (cd zongji && npm install --omit=dev) COPY config config -COPY \ - LICENSE \ +COPY queues queues +COPY LICENSE \ README.md \ mycdc.js \ index.js \ diff --git a/run-rabbit.sh b/assets/run-rabbit.sh similarity index 100% rename from run-rabbit.sh rename to assets/run-rabbit.sh diff --git a/assets/zongji.sql b/assets/zongji.sql new file mode 100644 index 0000000..b9bc14a --- /dev/null +++ b/assets/zongji.sql @@ -0,0 +1,13 @@ + +CREATE TABLE `util`.`binlogQueue`( + `code` VARCHAR(255) NOT NULL, + `logName` VARCHAR(255) NOT NULL, + `position` BIGINT UNSIGNED NOT NULL, + PRIMARY KEY (`code`) +) ENGINE = InnoDB; + +CREATE USER 'mycdc-producer'@'%' IDENTIFIED BY 'P4$$w0rd'; +GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT ON *.* TO 'mycdc-producer'@'%'; +GRANT INSERT, DELETE ON `util`.* TO 'mycdc-producer'@'%'; + +CREATE USER 'mycdc-producer'@'%' IDENTIFIED BY 'P4$$w0rd'; diff --git a/assets/zongji.undo.sql b/assets/zongji.undo.sql new file mode 100644 index 0000000..1bb1425 --- /dev/null +++ b/assets/zongji.undo.sql @@ -0,0 +1,4 @@ + +DROP TABLE IF EXISTS `util`.`binlogQueue`; +DROP USER IF EXISTS 'mycdc-producer'@'%'; +DROP USER IF EXISTS 'mycdc-consumer'@'%'; diff --git a/config/consumer.yml b/config/consumer.yml index 824c957..97d7e8e 100644 --- a/config/consumer.yml +++ b/config/consumer.yml @@ -1,5 +1,9 @@ debug: false testMode: false +defaults: + mode: fk + flushInterval: 5000 + amqpPrefetch: 100 amqp: amqp://user:password@localhost:5672 db: host: localhost diff --git a/config/producer.yml b/config/producer.yml index 2e2c4b7..848e8a1 100644 --- a/config/producer.yml +++ b/config/producer.yml @@ -5,6 +5,7 @@ deleteNonEmpty: false amqp: amqp://user:password@localhost:5672 pingInterval: 60 flushInterval: 10 +serverId: 1 db: host: localhost port: 3306 diff --git a/config/queues.yml b/config/queues.yml index bf4fd4f..9a4763a 100644 --- a/config/queues.yml +++ b/config/queues.yml @@ -265,4 +265,4 @@ ticketRisk: key: clientFk columns: - clientFk - - risk \ No newline at end of file + - risk diff --git a/consumer.js b/consumer.js index 33476bf..ccd7650 100644 --- a/consumer.js +++ b/consumer.js @@ -7,8 +7,8 @@ const amqp = require('amqplib'); const {cpus} = require('os'); const queues = { - id: require('./queues/queue-id'), - fk: require('./queues/queue-fk') + id: require('./lib/queue-id'), + fk: require('./lib/queue-fk') }; class Consumer { @@ -53,14 +53,16 @@ class Consumer { } async consumeQueues() { - const queuesConf = require('./config/queues.yml'); + const {conf} = this; + const {defaults} = this.conf; this.queues = {}; - for (const queueName in queuesConf) { - const queueConf = queuesConf[queueName]; + for (const queueName of conf.pollQueues) { + const confFile = path.join(__dirname, 'queues', `${queueName}.yml`); + const queueConf = Object.assign({}, defaults, require(confFile)); + const {mode} = queueConf; const QueueClass = queues[mode]; - if (!QueueClass) { console.warn(`Ignoring queue '${queueName}' with unknown mode '${mode}'`); continue; diff --git a/docker-compose.yml b/docker-compose.yml index 6ed67f4..bc42127 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,9 +4,9 @@ services: image: registry.verdnatura.es/mycdc-producer:${VERSION:?} build: context: . - dockerfile: Dockerfile.producer + dockerfile: assets/Dockerfile.producer consumer: image: registry.verdnatura.es/mycdc-consumer:${VERSION:?} build: context: . - dockerfile: Dockerfile.consumer + dockerfile: assets/Dockerfile.consumer diff --git a/lib/queue-fk.js b/lib/queue-fk.js new file mode 100644 index 0000000..d24b92c --- /dev/null +++ b/lib/queue-fk.js @@ -0,0 +1,94 @@ +const Queue = require('./queue'); + +module.exports = class QueueFk extends Queue { + constructor(consumer, name, conf) { + super(consumer, name, conf); + this.reset(); + } + + async consume() { + await super.consume(); + this.flush(); + } + + reset() { + this.messages = []; + this.nMessages = 0; + this.scopes = new Map(); + } + + flush(flushInterval) { + if (this.timeout) { + clearTimeout(this.timeout); + this.timeout = null; + } + this.timeout = setTimeout( + () => this.onFlushTimeout(), flushInterval); + } + + async onFlushTimeout() { + const {messages} = this; + + if (messages.length) { + const {consumer, scopes, channel} = this; + this.reset(); + + if (consumer.conf.debug) + console.debug('Flush:'.blue, this.name.yellow, scopes); + + try { + for (const [scope, ids] of scopes) { + let query = this.conf.query[scope]; + for (const id of ids) { + const params = {id}; + //query = consumer.db.format(query, params); + if (consumer.conf.debug) + console.debug('SQL:'.blue, query, params); + if (!consumer.conf.testMode) + await consumer.db.query(query, params); + } + } + + const promises = []; + for (const msg of messages) + promises.push(channel.ack(msg)); + await Promise.all(promises); + } catch(err) { + for (const msg of messages) + await channel.nack(msg); + throw err; + } + } + + this.flush(this.conf.flushInterval); + } + + onConsume(msg) { + const {conf} = this; + const consumer = this.consumer; + const data = JSON.parse(msg.content.toString()); + + if (consumer.conf.debug) + console.debug('Message:'.blue, this.name.yellow, data.table); + + const tableConf = conf.includeSchema[data.schema][data.table]; + + const scope = tableConf.scope ?? data.table; + let ids = this.scopes.get(scope); + if (!ids) this.scopes.set(scope, ids = new Set()); + + const key = tableConf.key; + if (data.eventName === 'updaterows') { + for (const row of data.rows) { + ids.add(row.before[key]); + ids.add(row.after[key]); + } + } else + for (const row of data.rows) + ids.add(row[key]); + + this.messages.push(msg); + if (this.messages.length == conf.amqpPrefetch) + this.flush(); + } +} diff --git a/queues/queue-id.js b/lib/queue-id.js similarity index 96% rename from queues/queue-id.js rename to lib/queue-id.js index eea9db7..225c642 100644 --- a/queues/queue-id.js +++ b/lib/queue-id.js @@ -1,4 +1,4 @@ -const Queue = require('../queue'); +const Queue = require('./queue'); module.exports = class QueueId extends Queue { async onConsume(msg) { diff --git a/queue.js b/lib/queue.js similarity index 100% rename from queue.js rename to lib/queue.js diff --git a/mycdc.js b/mycdc.js index 746567b..64e93ed 100644 --- a/mycdc.js +++ b/mycdc.js @@ -1,6 +1,6 @@ require('require-yaml'); require('colors'); -const fs = require('fs'); +const fs = require('fs-extra'); const path = require('path'); const ZongJi = require('./zongji'); const mysql = require('mysql2/promise'); @@ -29,7 +29,16 @@ module.exports = class MyCDC { const localConfig = require(localPath); Object.assign(conf, localConfig); } - this.queuesConf = require('./config/queues.yml'); + + this.queuesConf = {}; + const queueDir = path.join(__dirname, 'queues'); + const queueFiles = await fs.readdir(queueDir); + for (const queueFile of queueFiles) { + const match = queueFile.match(/^([a-zA-Z0-9-_]+)\.ya?ml$/); + if (!match) + throw new Error(`Invalid queue file name '${queueFile}'`); + this.queuesConf[match[1]] = require(path.join(queueDir, queueFile)); + } const queues = this.queuesConf; for (const queueName in queues) { @@ -51,7 +60,7 @@ module.exports = class MyCDC { tableInfo = { queues: new Map(), events: new Map(), - columns: false, + columnSet: false, fk: 'id' }; tableMap.set(tableName, tableInfo); @@ -70,13 +79,13 @@ module.exports = class MyCDC { const columns = table.columns; if (columns) { - if (tableInfo.columns === false) - tableInfo.columns = new Set(); - if (tableInfo.columns !== true) + if (tableInfo.columnSet === false) + tableInfo.columnSet = new Set(); + if (tableInfo.columnSet !== true) for (const column of columns) - tableInfo.columns.add(column); + tableInfo.columnSet.add(column); } else - tableInfo.columns = true; + tableInfo.columnSet = true; if (table.id) tableInfo.id = table.id; @@ -87,8 +96,15 @@ module.exports = class MyCDC { } const includeSchema = {}; - for (const [schemaName, tableMap] of this.schemaMap) + + for (const [schemaName, tableMap] of this.schemaMap) { includeSchema[schemaName] = Array.from(tableMap.keys()); + + for (const [tableName, tableInfo] of tableMap) { + if (tableInfo.columnSet !== true) + tableInfo.columns = Array.from(tableInfo.columnSet.keys()); + } + } this.opts = { includeEvents: [ @@ -98,7 +114,8 @@ module.exports = class MyCDC { 'updaterows', 'deleterows' ], - includeSchema + includeSchema, + serverId: conf.serverId }; if (conf.testMode) @@ -309,7 +326,10 @@ module.exports = class MyCDC { case 'rotate': this.filename = evt.binlogName; position = evt.position; - console.log(`[${eventName}] filename: ${this.filename}`, `position: ${this.position}, nextPosition: ${evt.nextPosition}`); + console.log( + `[${eventName}] filename: ${this.filename}`, + `position: ${this.position}, nextPosition: ${evt.nextPosition}` + ); break; case 'writerows': case 'deleterows': @@ -341,9 +361,9 @@ module.exports = class MyCDC { if (isUpdate) { rows = []; cols = new Set(); - let columns = tableInfo.columns === true + const columns = tableInfo.columnSet === true ? Object.keys(evt.rows[0].after) - : tableInfo.columns.keys(); + : tableInfo.columns; for (const row of evt.rows) { let nColsChanged = 0; diff --git a/package-lock.json b/package-lock.json index 2037601..dfe5628 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,16 +1,17 @@ { "name": "mycdc", - "version": "0.0.5", + "version": "0.0.8", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "mycdc", - "version": "0.0.5", + "version": "0.0.8", "license": "GPL-3.0", "dependencies": { "amqplib": "^0.10.3", "colors": "^1.4.0", + "fs-extra": "^11.2.0", "mysql2": "^3.9.3", "require-yaml": "^0.0.1", "zongji": "file:../zongji" @@ -94,6 +95,19 @@ "node": ">=0.10" } }, + "node_modules/fs-extra": { + "version": "11.2.0", + "resolved": "https://registry.npmjs.org/fs-extra/-/fs-extra-11.2.0.tgz", + "integrity": "sha512-PmDi3uwK5nFuXh7XDTlVnS17xJS7vW36is2+w3xcv8SVxiB4NyATf4ctkVY5bkSjX0Y4nbvZCq1/EjtEyr9ktw==", + "dependencies": { + "graceful-fs": "^4.2.0", + "jsonfile": "^6.0.1", + "universalify": "^2.0.0" + }, + "engines": { + "node": ">=14.14" + } + }, "node_modules/generate-function": { "version": "2.3.1", "resolved": "https://registry.npmjs.org/generate-function/-/generate-function-2.3.1.tgz", @@ -102,6 +116,11 @@ "is-property": "^1.0.2" } }, + "node_modules/graceful-fs": { + "version": "4.2.11", + "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.11.tgz", + "integrity": "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ==" + }, "node_modules/iconv-lite": { "version": "0.6.3", "resolved": "https://registry.npmjs.org/iconv-lite/-/iconv-lite-0.6.3.tgz", @@ -139,6 +158,17 @@ "js-yaml": "bin/js-yaml.js" } }, + "node_modules/jsonfile": { + "version": "6.1.0", + "resolved": "https://registry.npmjs.org/jsonfile/-/jsonfile-6.1.0.tgz", + "integrity": "sha512-5dgndWOriYSm5cnYaJNhalLNDKOqFwyDB/rr1E9ZsGciGvKPs8R2xYGCacuf3z6K1YKDz182fd+fY3cn3pMqXQ==", + "dependencies": { + "universalify": "^2.0.0" + }, + "optionalDependencies": { + "graceful-fs": "^4.1.6" + } + }, "node_modules/long": { "version": "5.2.3", "resolved": "https://registry.npmjs.org/long/-/long-5.2.3.tgz", @@ -158,9 +188,9 @@ "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, "node_modules/mysql2": { - "version": "3.9.3", - "resolved": "https://registry.npmjs.org/mysql2/-/mysql2-3.9.3.tgz", - "integrity": "sha512-+ZaoF0llESUy7BffccHG+urErHcWPZ/WuzYAA9TEeLaDYyke3/3D+VQDzK9xzRnXpd0eMtRf0WNOeo4Q1Baung==", + "version": "3.9.4", + "resolved": "https://registry.npmjs.org/mysql2/-/mysql2-3.9.4.tgz", + "integrity": "sha512-OEESQuwxMza803knC1YSt7NMuc1BrK9j7gZhCSs2WAyxr1vfiI7QLaLOKTh5c9SWGz98qVyQUbK8/WckevNQhg==", "dependencies": { "denque": "^2.1.0", "generate-function": "^2.3.1", @@ -251,6 +281,14 @@ "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", "integrity": "sha512-ev2QzSzWPYmy9GuqfIVildA4OdcGLeFZQrq5ys6RtiuF+RQQiZWr8TZNyAcuVXyQRYfEO+MsoB/1BuQVhOJuoQ==" }, + "node_modules/universalify": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/universalify/-/universalify-2.0.1.tgz", + "integrity": "sha512-gptHNQghINnc/vTGIk0SOFGFNXw7JVrlRUtConJRlvaw6DuX0wO5Jeko9sWrMBhh+PsYAZ7oXAiOnf/UKogyiw==", + "engines": { + "node": ">= 10.0.0" + } + }, "node_modules/url-parse": { "version": "1.5.10", "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.10.tgz", @@ -320,6 +358,16 @@ "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==" }, + "fs-extra": { + "version": "11.2.0", + "resolved": "https://registry.npmjs.org/fs-extra/-/fs-extra-11.2.0.tgz", + "integrity": "sha512-PmDi3uwK5nFuXh7XDTlVnS17xJS7vW36is2+w3xcv8SVxiB4NyATf4ctkVY5bkSjX0Y4nbvZCq1/EjtEyr9ktw==", + "requires": { + "graceful-fs": "^4.2.0", + "jsonfile": "^6.0.1", + "universalify": "^2.0.0" + } + }, "generate-function": { "version": "2.3.1", "resolved": "https://registry.npmjs.org/generate-function/-/generate-function-2.3.1.tgz", @@ -328,6 +376,11 @@ "is-property": "^1.0.2" } }, + "graceful-fs": { + "version": "4.2.11", + "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.11.tgz", + "integrity": "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ==" + }, "iconv-lite": { "version": "0.6.3", "resolved": "https://registry.npmjs.org/iconv-lite/-/iconv-lite-0.6.3.tgz", @@ -359,6 +412,15 @@ "argparse": "^2.0.1" } }, + "jsonfile": { + "version": "6.1.0", + "resolved": "https://registry.npmjs.org/jsonfile/-/jsonfile-6.1.0.tgz", + "integrity": "sha512-5dgndWOriYSm5cnYaJNhalLNDKOqFwyDB/rr1E9ZsGciGvKPs8R2xYGCacuf3z6K1YKDz182fd+fY3cn3pMqXQ==", + "requires": { + "graceful-fs": "^4.1.6", + "universalify": "^2.0.0" + } + }, "long": { "version": "5.2.3", "resolved": "https://registry.npmjs.org/long/-/long-5.2.3.tgz", @@ -375,9 +437,9 @@ "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, "mysql2": { - "version": "3.9.3", - "resolved": "https://registry.npmjs.org/mysql2/-/mysql2-3.9.3.tgz", - "integrity": "sha512-+ZaoF0llESUy7BffccHG+urErHcWPZ/WuzYAA9TEeLaDYyke3/3D+VQDzK9xzRnXpd0eMtRf0WNOeo4Q1Baung==", + "version": "3.9.4", + "resolved": "https://registry.npmjs.org/mysql2/-/mysql2-3.9.4.tgz", + "integrity": "sha512-OEESQuwxMza803knC1YSt7NMuc1BrK9j7gZhCSs2WAyxr1vfiI7QLaLOKTh5c9SWGz98qVyQUbK8/WckevNQhg==", "requires": { "denque": "^2.1.0", "generate-function": "^2.3.1", @@ -458,6 +520,11 @@ "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", "integrity": "sha512-ev2QzSzWPYmy9GuqfIVildA4OdcGLeFZQrq5ys6RtiuF+RQQiZWr8TZNyAcuVXyQRYfEO+MsoB/1BuQVhOJuoQ==" }, + "universalify": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/universalify/-/universalify-2.0.1.tgz", + "integrity": "sha512-gptHNQghINnc/vTGIk0SOFGFNXw7JVrlRUtConJRlvaw6DuX0wO5Jeko9sWrMBhh+PsYAZ7oXAiOnf/UKogyiw==" + }, "url-parse": { "version": "1.5.10", "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.10.tgz", diff --git a/package.json b/package.json index 562ca5e..42eda9c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "mycdc", - "version": "0.0.8", + "version": "0.0.14", "author": "Verdnatura Levante SL", "description": "Asynchronous DB calculations reading the binary log", "license": "GPL-3.0", @@ -14,8 +14,14 @@ "dependencies": { "amqplib": "^0.10.3", "colors": "^1.4.0", + "fs-extra": "^11.2.0", "mysql2": "^3.9.3", "require-yaml": "^0.0.1", "zongji": "file:../zongji" + }, + "scripts": { + "start": "node index.js", + "consumer": "node consumer.js", + "rabbit": "assets/run-rabbit.sh" } } diff --git a/queues/orderTotal.yml b/queues/orderTotal.yml new file mode 100644 index 0000000..0814d5c --- /dev/null +++ b/queues/orderTotal.yml @@ -0,0 +1,24 @@ +query: + order: CALL hedera.order_recalc(:id) +includeSchema: + hedera: + order: + key: id + columns: + - id + - address_id + - company_id + - date_send + - customer_id + events: + - updaterows + orderRow: + key: orderFk + scope: order + columns: + - id + - orderFk + - itemFk + - warehouseFk + - shipment + - amount diff --git a/queues/queue-fk.js b/queues/queue-fk.js deleted file mode 100644 index b47f785..0000000 --- a/queues/queue-fk.js +++ /dev/null @@ -1,86 +0,0 @@ -const Queue = require('../queue'); - -module.exports = class QueueFk extends Queue { - constructor(consumer, name, conf) { - super(consumer, name, conf); - this.reset(); - } - - async consume() { - await super.consume(); - this.flush(); - } - - reset() { - this.lastMessage = null; - this.nMessages = 0; - this.ids = new Set(); - } - - flush(flushInterval) { - if (this.timeout) { - clearTimeout(this.timeout); - this.timeout = null; - } - this.timeout = setTimeout( - () => this.onFlushTimeout(), flushInterval); - } - - async onFlushTimeout() { - const consumer = this.consumer; - - if (this.ids.size) { - if (consumer.conf.debug) - console.debug('Flush:'.blue, this.name.yellow, this.ids); - - const ids = Array.from(this.ids); - const lastMessage = this.lastMessage; - this.reset(); - - try { - for (const id of ids) { - const params = {id}; - //const sql = consumer.db.format(this.conf.query, params); - const sql = this.conf.query; - if (consumer.conf.debug) - console.debug('SQL:'.blue, sql, params); - if (!consumer.conf.testMode) - await consumer.db.query(sql, params); - } - - await this.channel.ack(lastMessage, true); - } catch(err) { - await this.channel.nack(lastMessage, true); - console.error(err); - } - } - - this.flush(this.conf.flushInterval); - } - - async onConsume(msg) { - const consumer = this.consumer; - const data = JSON.parse(msg.content.toString()); - - if (consumer.conf.debug) - console.debug('Message:'.blue, this.name.yellow, data.table); - - const ids = this.ids; - const key = this.conf.includeSchema[data.schema][data.table].key; - - if (data.eventName === 'updaterows') { - for (const row of data.rows) { - ids.add(row.before[key]); - ids.add(row.after[key]); - } - } else - for (const row of data.rows) - ids.add(row[key]); - - this.nMessages++; - this.lastMessage = msg; - - if (this.nMessages == consumer.conf.amqpPrefetch) - this.flush(); - } -} diff --git a/queues/ticketTotal.yml b/queues/ticketTotal.yml new file mode 100644 index 0000000..2128d8c --- /dev/null +++ b/queues/ticketTotal.yml @@ -0,0 +1,42 @@ +query: + ticket: CALL vn.ticket_recalc(:id, NULL) + client: CALL vn.ticket_recalcByScope('client', :id) + address: CALL vn.ticket_recalcByScope('address', :id) +includeSchema: + vn: + ticket: + key: id + columns: + - id + - clientFk + - addressFk + - companyFk + - shipped + events: + - updaterows + ticketService: + key: ticketFk + scope: ticket + columns: + - ticketFk + - price + - quantity + sale: + key: ticketFk + scope: ticket + columns: + - id + - ticketFk + - itemFk + - quantity + - price + - discount + client: + key: id + columns: + - provinceFk + - isVies + address: + key: id + columns: + - isEqualizated diff --git a/queues/travelEntries.yml b/queues/travelEntries.yml new file mode 100644 index 0000000..3ff4380 --- /dev/null +++ b/queues/travelEntries.yml @@ -0,0 +1,9 @@ +query: + travel: CALL vn.travel_recalc(:id) +includeSchema: + vn: + entry: + key: travelFk + scope: travel + columns: + - travelFk diff --git a/zongji.sql b/zongji.sql deleted file mode 100644 index bd3fc83..0000000 --- a/zongji.sql +++ /dev/null @@ -1,12 +0,0 @@ - -CREATE TABLE `util`.`binlogQueue`( - `code` VARCHAR(255) NOT NULL, - `logName` VARCHAR(255) NOT NULL, - `position` BIGINT UNSIGNED NOT NULL, - PRIMARY KEY (`code`) -) ENGINE = InnoDB; - -CREATE USER 'zongji'@'%' IDENTIFIED BY 'password'; -GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT ON *.* TO 'zongji'@'%'; - -GRANT INSERT, DELETE ON `util`.* TO 'zongji'@'%'; diff --git a/zongji.undo.sql b/zongji.undo.sql deleted file mode 100644 index 62e9607..0000000 --- a/zongji.undo.sql +++ /dev/null @@ -1,3 +0,0 @@ - -DROP TABLE IF EXISTS `util`.`binlogQueue`; -DROP USER 'zongji'@'%';