From 613f0b5e1cdbbd3aa301df81dc30887d14b5ef13 Mon Sep 17 00:00:00 2001 From: Juan Ferrer Toribio Date: Mon, 8 Apr 2024 15:17:06 +0200 Subject: [PATCH] feat: refs #4409 Code refactor, fixes, auto bind --- .gitignore | 3 +- Dockerfile.consumer | 1 + config/producer.yml | 1 + config/queues.yml | 61 +++++++++++++++----- consumer.js | 20 ++++++- mycdc.js | 81 +++++++++++++++----------- package-lock.json | 138 +++++++++++++++++--------------------------- package.json | 10 ++-- queue.js | 101 ++++---------------------------- queues/queue-fk.js | 86 +++++++++++++++++++++++++++ queues/queue-id.js | 41 +++++++++++++ 11 files changed, 313 insertions(+), 230 deletions(-) create mode 100644 queues/queue-fk.js create mode 100644 queues/queue-id.js diff --git a/.gitignore b/.gitignore index 3e7c91d..b8611a6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ node_modules zongji config/producer.*.yml -config/consumer.*.yml \ No newline at end of file +config/consumer.*.yml +config/queues.*.yml \ No newline at end of file diff --git a/Dockerfile.consumer b/Dockerfile.consumer index d23c0fb..527aa3a 100644 --- a/Dockerfile.consumer +++ b/Dockerfile.consumer @@ -23,6 +23,7 @@ RUN npm install --omit=dev --no-audit --prefer-offline \ && (cd zongji && npm install --omit=dev) COPY config config +COPY queues queues COPY \ LICENSE \ README.md \ diff --git a/config/producer.yml b/config/producer.yml index 733b316..2e2c4b7 100644 --- a/config/producer.yml +++ b/config/producer.yml @@ -1,6 +1,7 @@ code: mycdc debug: false testMode: false +deleteNonEmpty: false amqp: amqp://user:password@localhost:5672 pingInterval: 60 flushInterval: 10 diff --git a/config/queues.yml b/config/queues.yml index 6b60f4a..c335982 100644 --- a/config/queues.yml +++ b/config/queues.yml @@ -1,5 +1,5 @@ orderTotal: - query: CALL hedera.order_recalc(?) + query: CALL hedera.order_recalc(:id) mode: fk flushInterval: 5000 includeSchema: @@ -23,8 +23,8 @@ orderTotal: - warehouseFk - shipment - amount -ticketTotal: - query: CALL vn.ticket_recalc(?) +ticketTotal: + query: CALL vn.ticket_recalc(:id, NULL) mode: fk flushInterval: 5000 includeSchema: @@ -46,27 +46,62 @@ ticketTotal: - itemFk - quantity - price -comparative: - query: CALL vn.itemComparative_refresh(?, ?, ?) - mode: changes - flushInterval: 5000 +stock: + mode: id includeSchema: - vn: - ticket: + vn: + travel: + query: CALL stock.log_refreshBuy(:table, :id) + key: id + entry: + query: CALL stock.log_refreshBuy(:table, :id) + key: id + columns: + - id + - travelFk + - isRaid + events: + - updaterows + buy: + query: CALL stock.log_refreshBuy(:table, :id) + key: id + columns: + - id + - entryFk + - itemFk + - quantity + - created + ticket: + query: CALL stock.log_refreshSale(:table, :id) key: id columns: - id - - shipped - warehouseFk - - isDeleted - - clientFk + - shipped events: - updaterows sale: + query: CALL stock.log_refreshSale(:table, :id) key: id columns: - id - ticketFk - itemFk - quantity - - priceFixed + - created + - isPicked + hedera: + order: + query: CALL stock.log_refreshOrder(:table, :id) + key: id + columns: + - id + - date_send + - address_id + - company_id + - customer_id + events: + - updaterows + orderRow: + query: CALL stock.log_refreshOrder(:table, :id) + key: id diff --git a/consumer.js b/consumer.js index 67ddac3..33476bf 100644 --- a/consumer.js +++ b/consumer.js @@ -4,9 +4,13 @@ const fs = require('fs'); const path = require('path'); const mysql = require('mysql2/promise'); const amqp = require('amqplib'); -const Queue = require('./queue'); const {cpus} = require('os'); +const queues = { + id: require('./queues/queue-id'), + fk: require('./queues/queue-fk') +}; + class Consumer { async start() { const defaultConfig = require('./config/consumer.yml'); @@ -38,7 +42,8 @@ class Consumer { this.onErrorListener = err => this.onError(err); const dbConfig = Object.assign({ - connectionLimit: cpus().length + connectionLimit: cpus().length, + namedPlaceholders: true }, conf.db); this.db = await mysql.createPool(dbConfig); @@ -52,7 +57,16 @@ class Consumer { this.queues = {}; for (const queueName in queuesConf) { - const queue = new Queue(this, queueName, queuesConf[queueName]); + const queueConf = queuesConf[queueName]; + const {mode} = queueConf; + const QueueClass = queues[mode]; + + if (!QueueClass) { + console.warn(`Ignoring queue '${queueName}' with unknown mode '${mode}'`); + continue; + } + + const queue = new QueueClass(this, queueName, queueConf); this.queues[queueName] = queue; await queue.consume(); } diff --git a/mycdc.js b/mycdc.js index ab7e174..106073f 100644 --- a/mycdc.js +++ b/mycdc.js @@ -51,7 +51,7 @@ module.exports = class MyCDC { tableInfo = { queues: new Map(), events: new Map(), - columns: new Map(), + columns: false, fk: 'id' }; tableMap.set(tableName, tableInfo); @@ -69,14 +69,14 @@ module.exports = class MyCDC { } const columns = table.columns; - for (const column of columns) { - let columnInfo = tableInfo.columns.get(column); - if (!columnInfo) { - columnInfo = []; - tableInfo.columns.set(column, columnInfo); - } - columnInfo.push(queueName); - } + if (columns) { + if (tableInfo.columns === false) + tableInfo.columns = new Set(); + if (tableInfo.columns !== true) + for (const column of columns) + tableInfo.columns.add(column); + } else + tableInfo.columns = true; if (table.id) tableInfo.id = table.id; @@ -128,19 +128,39 @@ module.exports = class MyCDC { // RabbitMQ this.publisher = await amqp.connect(conf.amqp); - this.channel = await this.publisher.createChannel(); + const channel = this.channel = await this.publisher.createChannel(); for (const tableMap of this.schemaMap.values()) { for (const tableName of tableMap.keys()) { - await this.channel.assertExchange(tableName, 'headers', { + await channel.assertExchange(tableName, 'headers', { durable: true }); } } for (const queueName in this.queuesConf) { - await this.channel.assertQueue(queueName, { + const options = conf.deleteNonEmpty ? {} : {ifEmpty: true}; + await channel.deleteQueue(queueName, {options}); + await channel.assertQueue(queueName, { durable: true }); + + const includeSchema = this.queuesConf[queueName].includeSchema; + for (const schemaName in includeSchema) { + const schema = includeSchema[schemaName]; + for (const tableName in schema) { + const table = schema[tableName]; + const events = table.events || allEvents; + for (const event of events) { + let args; + if (event === 'updaterows' && table.columns) { + args = {'x-match': 'any'}; + table.columns.map(c => args[c] = true); + } else + args = {'z-event': event}; + await channel.bindQueue(queueName, tableName, '', args); + } + } + } } // Zongji @@ -159,6 +179,7 @@ module.exports = class MyCDC { const [row] = res; this.filename = row.logName; this.position = row.position; + this.isFlushed = true; Object.assign(this.opts, { filename: this.filename, position: this.position @@ -298,7 +319,7 @@ module.exports = class MyCDC { } this.position = position; - this.flushed = false; + this.isFlushed = false; } onRowEvent(evt, eventName) { @@ -320,7 +341,9 @@ module.exports = class MyCDC { if (isUpdate) { rows = []; cols = new Set(); - const columns = tableInfo.columns.keys(); + let columns = tableInfo.columns === true + ? Object.keys(evt.rows[0].after) + : tableInfo.columns.keys(); for (const row of evt.rows) { let nColsChanged = 0; @@ -348,13 +371,12 @@ module.exports = class MyCDC { rows }; - const headers = {}; + let headers = {}; + headers['z-event'] = eventName; if (isUpdate) { for (const col of cols) headers[col] = true; data.cols = Array.from(cols); - } else { - headers['z-'+ eventName] = true; } const options = { @@ -367,30 +389,23 @@ module.exports = class MyCDC { Buffer.from(jsonData), options); if (this.debug) { - //console.debug(data, options); + console.debug(data, options); console.debug('Queued:'.blue, `${tableName}(${rows.length}) [${eventName}]`); } } async flushQueue() { - if (this.flushed) return; - const position = this.nextPosition; + if (this.isFlushed) return; + const {filename, position} = this; + this.debug('Flush', `filename: ${filename}, position: ${position}`); - if (position) { - const filename = this.nextFilename; - this.debug('Flush', `filename: ${filename}, position: ${position}`); - - const replaceQuery = - 'REPLACE INTO `binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?'; - if (!this.conf.testMode) - await this.db.query(replaceQuery, [this.conf.code, filename, position]); + const replaceQuery = + 'REPLACE INTO `binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?'; + if (!this.conf.testMode) + await this.db.query(replaceQuery, [this.conf.code, filename, position]); - this.flushed = true; - } - - this.nextFilename = this.filename; - this.nextPosition = this.position; + this.isFlushed = true; } async connectionPing() { diff --git a/package-lock.json b/package-lock.json index 6d58e66..2037601 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,15 +1,22 @@ { "name": "mycdc", + "version": "0.0.5", "lockfileVersion": 2, "requires": true, "packages": { "": { + "name": "mycdc", + "version": "0.0.5", + "license": "GPL-3.0", "dependencies": { "amqplib": "^0.10.3", "colors": "^1.4.0", - "mysql2": "^2.3.3", + "mysql2": "^3.9.3", "require-yaml": "^0.0.1", "zongji": "file:../zongji" + }, + "engines": { + "node": ">=20" } }, "../zongji": {}, @@ -133,19 +140,16 @@ } }, "node_modules/long": { - "version": "4.0.0", - "resolved": "https://registry.npmjs.org/long/-/long-4.0.0.tgz", - "integrity": "sha512-XsP+KhQif4bjX1kbuSiySJFNAehNxgLb6hPRGJ9QsUr8ajHkuXGdrHmFUTUUXhDwVX2R5bY4JNZEwbUiMhV+MA==" + "version": "5.2.3", + "resolved": "https://registry.npmjs.org/long/-/long-5.2.3.tgz", + "integrity": "sha512-lcHwpNoggQTObv5apGNCTdJrO69eHOZMi4BNC+rTLER8iHAqGrUVeLh/irVIM7zTw2bOXA8T6uNPeujwOLg/2Q==" }, "node_modules/lru-cache": { - "version": "6.0.0", - "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", - "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", - "dependencies": { - "yallist": "^4.0.0" - }, + "version": "8.0.5", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-8.0.5.tgz", + "integrity": "sha512-MhWWlVnuab1RG5/zMRRcVGXZLCXrZTgfwMikgzCegsPnG62yDQo5JnqKkrK4jO5iKqDAZGItAqN5CtKBCBWRUA==", "engines": { - "node": ">=10" + "node": ">=16.14" } }, "node_modules/ms": { @@ -154,16 +158,16 @@ "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, "node_modules/mysql2": { - "version": "2.3.3", - "resolved": "https://registry.npmjs.org/mysql2/-/mysql2-2.3.3.tgz", - "integrity": "sha512-wxJUev6LgMSgACDkb/InIFxDprRa6T95+VEoR+xPvtngtccNH2dGjEB/fVZ8yg1gWv1510c9CvXuJHi5zUm0ZA==", + "version": "3.9.3", + "resolved": "https://registry.npmjs.org/mysql2/-/mysql2-3.9.3.tgz", + "integrity": "sha512-+ZaoF0llESUy7BffccHG+urErHcWPZ/WuzYAA9TEeLaDYyke3/3D+VQDzK9xzRnXpd0eMtRf0WNOeo4Q1Baung==", "dependencies": { - "denque": "^2.0.1", + "denque": "^2.1.0", "generate-function": "^2.3.1", "iconv-lite": "^0.6.3", - "long": "^4.0.0", - "lru-cache": "^6.0.0", - "named-placeholders": "^1.1.2", + "long": "^5.2.1", + "lru-cache": "^8.0.0", + "named-placeholders": "^1.1.3", "seq-queue": "^0.0.5", "sqlstring": "^2.3.2" }, @@ -172,35 +176,24 @@ } }, "node_modules/named-placeholders": { - "version": "1.1.2", - "resolved": "https://registry.npmjs.org/named-placeholders/-/named-placeholders-1.1.2.tgz", - "integrity": "sha512-wiFWqxoLL3PGVReSZpjLVxyJ1bRqe+KKJVbr4hGs1KWfTZTQyezHFBbuKj9hsizHyGV2ne7EMjHdxEGAybD5SA==", + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/named-placeholders/-/named-placeholders-1.1.3.tgz", + "integrity": "sha512-eLoBxg6wE/rZkJPhU/xRX1WTpkFEwDJEN96oxFrTsqBdbT5ec295Q+CoHrL9IT0DipqKhmGcaZmwOt8OON5x1w==", "dependencies": { - "lru-cache": "^4.1.3" + "lru-cache": "^7.14.1" }, "engines": { - "node": ">=6.0.0" + "node": ">=12.0.0" } }, "node_modules/named-placeholders/node_modules/lru-cache": { - "version": "4.1.5", - "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-4.1.5.tgz", - "integrity": "sha512-sWZlbEP2OsHNkXrMl5GYk/jKk70MBng6UU4YI/qGDYbgf6YbP4EvmqISbXCoJiRKs+1bSpFHVgQxvJ17F2li5g==", - "dependencies": { - "pseudomap": "^1.0.2", - "yallist": "^2.1.2" + "version": "7.18.3", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-7.18.3.tgz", + "integrity": "sha512-jumlc0BIUrS3qJGgIkWZsyfAM7NCWiBcCDhnd+3NNM5KbBmLTgHVfWBcg6W+rLUsIpzpERPsvwUP7CckAQSOoA==", + "engines": { + "node": ">=12" } }, - "node_modules/named-placeholders/node_modules/yallist": { - "version": "2.1.2", - "resolved": "https://registry.npmjs.org/yallist/-/yallist-2.1.2.tgz", - "integrity": "sha512-ncTzHV7NvsQZkYe1DW7cbDLm0YpzHmZF5r/iyP3ZnQtMiJ+pjzisCiMNI+Sj+xQF5pXhSHxSB3uDbsBTzY/c2A==" - }, - "node_modules/pseudomap": { - "version": "1.0.2", - "resolved": "https://registry.npmjs.org/pseudomap/-/pseudomap-1.0.2.tgz", - "integrity": "sha512-b/YwNhb8lk1Zz2+bXXpS/LK9OisiZZ1SNsSLxN1x2OXVEhW2Ckr/7mWE5vrC1ZTiJlD9g19jWszTmJsB+oEpFQ==" - }, "node_modules/querystringify": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz", @@ -267,11 +260,6 @@ "requires-port": "^1.0.0" } }, - "node_modules/yallist": { - "version": "4.0.0", - "resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz", - "integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==" - }, "node_modules/zongji": { "resolved": "../zongji", "link": true @@ -372,17 +360,14 @@ } }, "long": { - "version": "4.0.0", - "resolved": "https://registry.npmjs.org/long/-/long-4.0.0.tgz", - "integrity": "sha512-XsP+KhQif4bjX1kbuSiySJFNAehNxgLb6hPRGJ9QsUr8ajHkuXGdrHmFUTUUXhDwVX2R5bY4JNZEwbUiMhV+MA==" + "version": "5.2.3", + "resolved": "https://registry.npmjs.org/long/-/long-5.2.3.tgz", + "integrity": "sha512-lcHwpNoggQTObv5apGNCTdJrO69eHOZMi4BNC+rTLER8iHAqGrUVeLh/irVIM7zTw2bOXA8T6uNPeujwOLg/2Q==" }, "lru-cache": { - "version": "6.0.0", - "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", - "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", - "requires": { - "yallist": "^4.0.0" - } + "version": "8.0.5", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-8.0.5.tgz", + "integrity": "sha512-MhWWlVnuab1RG5/zMRRcVGXZLCXrZTgfwMikgzCegsPnG62yDQo5JnqKkrK4jO5iKqDAZGItAqN5CtKBCBWRUA==" }, "ms": { "version": "2.1.2", @@ -390,49 +375,35 @@ "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, "mysql2": { - "version": "2.3.3", - "resolved": "https://registry.npmjs.org/mysql2/-/mysql2-2.3.3.tgz", - "integrity": "sha512-wxJUev6LgMSgACDkb/InIFxDprRa6T95+VEoR+xPvtngtccNH2dGjEB/fVZ8yg1gWv1510c9CvXuJHi5zUm0ZA==", + "version": "3.9.3", + "resolved": "https://registry.npmjs.org/mysql2/-/mysql2-3.9.3.tgz", + "integrity": "sha512-+ZaoF0llESUy7BffccHG+urErHcWPZ/WuzYAA9TEeLaDYyke3/3D+VQDzK9xzRnXpd0eMtRf0WNOeo4Q1Baung==", "requires": { - "denque": "^2.0.1", + "denque": "^2.1.0", "generate-function": "^2.3.1", "iconv-lite": "^0.6.3", - "long": "^4.0.0", - "lru-cache": "^6.0.0", - "named-placeholders": "^1.1.2", + "long": "^5.2.1", + "lru-cache": "^8.0.0", + "named-placeholders": "^1.1.3", "seq-queue": "^0.0.5", "sqlstring": "^2.3.2" } }, "named-placeholders": { - "version": "1.1.2", - "resolved": "https://registry.npmjs.org/named-placeholders/-/named-placeholders-1.1.2.tgz", - "integrity": "sha512-wiFWqxoLL3PGVReSZpjLVxyJ1bRqe+KKJVbr4hGs1KWfTZTQyezHFBbuKj9hsizHyGV2ne7EMjHdxEGAybD5SA==", + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/named-placeholders/-/named-placeholders-1.1.3.tgz", + "integrity": "sha512-eLoBxg6wE/rZkJPhU/xRX1WTpkFEwDJEN96oxFrTsqBdbT5ec295Q+CoHrL9IT0DipqKhmGcaZmwOt8OON5x1w==", "requires": { - "lru-cache": "^4.1.3" + "lru-cache": "^7.14.1" }, "dependencies": { "lru-cache": { - "version": "4.1.5", - "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-4.1.5.tgz", - "integrity": "sha512-sWZlbEP2OsHNkXrMl5GYk/jKk70MBng6UU4YI/qGDYbgf6YbP4EvmqISbXCoJiRKs+1bSpFHVgQxvJ17F2li5g==", - "requires": { - "pseudomap": "^1.0.2", - "yallist": "^2.1.2" - } - }, - "yallist": { - "version": "2.1.2", - "resolved": "https://registry.npmjs.org/yallist/-/yallist-2.1.2.tgz", - "integrity": "sha512-ncTzHV7NvsQZkYe1DW7cbDLm0YpzHmZF5r/iyP3ZnQtMiJ+pjzisCiMNI+Sj+xQF5pXhSHxSB3uDbsBTzY/c2A==" + "version": "7.18.3", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-7.18.3.tgz", + "integrity": "sha512-jumlc0BIUrS3qJGgIkWZsyfAM7NCWiBcCDhnd+3NNM5KbBmLTgHVfWBcg6W+rLUsIpzpERPsvwUP7CckAQSOoA==" } } }, - "pseudomap": { - "version": "1.0.2", - "resolved": "https://registry.npmjs.org/pseudomap/-/pseudomap-1.0.2.tgz", - "integrity": "sha512-b/YwNhb8lk1Zz2+bXXpS/LK9OisiZZ1SNsSLxN1x2OXVEhW2Ckr/7mWE5vrC1ZTiJlD9g19jWszTmJsB+oEpFQ==" - }, "querystringify": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz", @@ -496,11 +467,6 @@ "requires-port": "^1.0.0" } }, - "yallist": { - "version": "4.0.0", - "resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz", - "integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==" - }, "zongji": { "version": "file:../zongji" } diff --git a/package.json b/package.json index 046b4f3..a56fdc4 100644 --- a/package.json +++ b/package.json @@ -1,20 +1,20 @@ { "name": "mycdc", - "version": "0.0.5", + "version": "0.0.6", "author": "Verdnatura Levante SL", "description": "Asynchronous DB calculations reading the binary log", "license": "GPL-3.0", "repository": { - "type": "git", - "url": "https://gitea.verdnatura.es/verdnatura/mycdc" + "type": "git", + "url": "https://gitea.verdnatura.es/verdnatura/mycdc" }, "engines": { - "node": ">=20" + "node": ">=20" }, "dependencies": { "amqplib": "^0.10.3", "colors": "^1.4.0", - "mysql2": "^2.3.3", + "mysql2": "^3.9.3", "require-yaml": "^0.0.1", "zongji": "file:../zongji" } diff --git a/queue.js b/queue.js index a05c39d..29d45e6 100644 --- a/queue.js +++ b/queue.js @@ -1,94 +1,17 @@ module.exports = class Queue { - constructor(consumer, name, conf) { - Object.assign(this, {consumer, name, conf}); - this.reset(); - } - - async consume() { - if (this.conf.mode !== 'fk') { - console.warn(`Ignoring queue '${this.name} with unknown mode '${this.conf.mode}'`); - return; + constructor(consumer, name, conf) { + Object.assign(this, {consumer, name, conf}); } - const channel = await this.consumer.amqpConn.createChannel(); - channel.prefetch(this.conf.amqpPrefetch); - await channel.assertQueue(this.name, { - durable: true - }); - this.channel = channel; - - await channel.consume(this.name, - msg => this.onConsume(msg)); - this.flush(); - } - - reset() { - this.lastMessage = null; - this.nMessages = 0; - this.ids = new Set(); - } - - flush(flushInterval) { - if (this.timeout) { - clearTimeout(this.timeout); - this.timeout = null; + async consume() { + const channel = await this.consumer.amqpConn.createChannel(); + channel.prefetch(this.conf.amqpPrefetch); + await channel.assertQueue(this.name, { + durable: true + }); + this.channel = channel; + + await channel.consume(this.name, + msg => this.onConsume(msg)); } - this.timeout = setTimeout( - () => this.onFlushTimeout(), flushInterval); - } - - async onFlushTimeout() { - const consumer = this.consumer; - - if (this.ids.size) { - if (consumer.conf.debug) - console.debug('Flush:'.blue, this.name.yellow, this.ids); - - const ids = Array.from(this.ids); - const lastMessage = this.lastMessage; - this.reset(); - - try { - for (const id of ids) { - const sql = consumer.db.format(this.conf.query, id); - consumer.debug('SQL', sql); - if (!consumer.conf.testMode) - await consumer.db.query(sql); - } - - await this.channel.ack(lastMessage, true); - } catch(err) { - await this.channel.nack(lastMessage, true); - console.error(err); - } - } - - this.flush(this.conf.flushInterval); - } - - async onConsume(msg) { - const consumer = this.consumer; - const data = JSON.parse(msg.content.toString()); - - if (consumer.conf.debug) - console.debug('Message:'.blue, this.name.yellow, data.table); - - const ids = this.ids; - const key = this.conf.includeSchema[data.schema][data.table].key; - - if (data.eventName === 'updaterows') { - for (const row of data.rows) { - ids.add(row.before[key]); - ids.add(row.after[key]); - } - } else - for (const row of data.rows) - ids.add(row[key]); - - this.nMessages++; - this.lastMessage = msg; - - if (this.nMessages == consumer.conf.amqpPrefetch) - this.flush(); - } } diff --git a/queues/queue-fk.js b/queues/queue-fk.js new file mode 100644 index 0000000..b47f785 --- /dev/null +++ b/queues/queue-fk.js @@ -0,0 +1,86 @@ +const Queue = require('../queue'); + +module.exports = class QueueFk extends Queue { + constructor(consumer, name, conf) { + super(consumer, name, conf); + this.reset(); + } + + async consume() { + await super.consume(); + this.flush(); + } + + reset() { + this.lastMessage = null; + this.nMessages = 0; + this.ids = new Set(); + } + + flush(flushInterval) { + if (this.timeout) { + clearTimeout(this.timeout); + this.timeout = null; + } + this.timeout = setTimeout( + () => this.onFlushTimeout(), flushInterval); + } + + async onFlushTimeout() { + const consumer = this.consumer; + + if (this.ids.size) { + if (consumer.conf.debug) + console.debug('Flush:'.blue, this.name.yellow, this.ids); + + const ids = Array.from(this.ids); + const lastMessage = this.lastMessage; + this.reset(); + + try { + for (const id of ids) { + const params = {id}; + //const sql = consumer.db.format(this.conf.query, params); + const sql = this.conf.query; + if (consumer.conf.debug) + console.debug('SQL:'.blue, sql, params); + if (!consumer.conf.testMode) + await consumer.db.query(sql, params); + } + + await this.channel.ack(lastMessage, true); + } catch(err) { + await this.channel.nack(lastMessage, true); + console.error(err); + } + } + + this.flush(this.conf.flushInterval); + } + + async onConsume(msg) { + const consumer = this.consumer; + const data = JSON.parse(msg.content.toString()); + + if (consumer.conf.debug) + console.debug('Message:'.blue, this.name.yellow, data.table); + + const ids = this.ids; + const key = this.conf.includeSchema[data.schema][data.table].key; + + if (data.eventName === 'updaterows') { + for (const row of data.rows) { + ids.add(row.before[key]); + ids.add(row.after[key]); + } + } else + for (const row of data.rows) + ids.add(row[key]); + + this.nMessages++; + this.lastMessage = msg; + + if (this.nMessages == consumer.conf.amqpPrefetch) + this.flush(); + } +} diff --git a/queues/queue-id.js b/queues/queue-id.js new file mode 100644 index 0000000..526506e --- /dev/null +++ b/queues/queue-id.js @@ -0,0 +1,41 @@ +const Queue = require('../queue'); + +module.exports = class QueueId extends Queue { + async onConsume(msg) { + const {consumer, conf} = this; + const data = JSON.parse(msg.content.toString()); + + if (consumer.conf.debug) + console.debug('Message:'.blue, this.name.yellow, data.table); + + const table = conf.includeSchema[data.schema][data.table] + const {query, key} = table; + + const params = { + schema: data.schema, + table: data.table, + }; + + async function dbQuery(id) { + const myParams = Object.assign({id}, params); + if (consumer.conf.debug) + console.debug('SQL:'.blue, query, myParams); + if (!consumer.conf.testMode) + await consumer.db.query(query, myParams); + } + + const keyChanged = data.cols.indexOf(key) !== -1; + + if (data.eventName === 'updaterows') { + for (const row of data.rows) { + if (keyChanged) + await dbQuery(row.before[key]); + await dbQuery(row.after[key]); + } + } else + for (const row of data.rows) + await dbQuery(row[key]); + + await this.channel.ack(msg); + } +}