Merge branch 'master' of https://gitea.verdnatura.es/juan/mycdc into 7213-Pasar-problemas-a-columnas

This commit is contained in:
Carlos Andrés 2024-05-13 12:39:17 +02:00
commit 87dcd8372e
23 changed files with 346 additions and 144 deletions

View File

@ -1,6 +1,16 @@
# Asynchronous DB calculations reading the binary log # MyCDC - MySQL change data capture
## Enviroment setup Application to run asynchronous operations based on DB changes. It is divided
into three services:
- RabbitMQ: Service that allows the exchange and queuing of messages between
applications.
- Producer: Reads and filters changes from the DB binary log and adds the
relevant messages to each RabbitMQ queue.
- Consumer: Obtains the elements from the queues and performs the necessary
operations asynchronously.
## Environment setup
Because a bug with MariaDB wich it's fix is pending to be merged into main Because a bug with MariaDB wich it's fix is pending to be merged into main
project branch, a *zongji* fork must be cloned into project root directory. project branch, a *zongji* fork must be cloned into project root directory.
@ -13,8 +23,9 @@ git checkout fix-143
Apply *zongji.sql* script into DB. Apply *zongji.sql* script into DB.
Copy *config.json* to *config.local.json* and place your local configuration Copy *producer.json* and *consumer.json* from *config* directory to
there. *producer.local.json* and *consumer.local.json* and place your local
configuration there.
Install dependencies. Install dependencies.
```text ```text
@ -23,12 +34,19 @@ npm install
## Run application ## Run application
Launch app. Start rabbit.
```text ```text
node index.js npm run rabbit
```
Start producer and consumer.
```text
npm start
npm run consumer
``` ```
## Built With ## Built With
* [Zongji](https://github.com/nevill/zongji) * [Zongji](https://github.com/nevill/zongji)
* [MySQL2](https://github.com/sidorares/node-mysql2#readme) * [MySQL2](https://github.com/sidorares/node-mysql2#readme)
* [RabbitMQ] (https://www.rabbitmq.com/)

View File

@ -22,13 +22,12 @@ RUN npm install --omit=dev --no-audit --prefer-offline \
&& git clone --depth 1 --branch fix-143 https://github.com/juan-ferrer-toribio/zongji.git \ && git clone --depth 1 --branch fix-143 https://github.com/juan-ferrer-toribio/zongji.git \
&& (cd zongji && npm install --omit=dev) && (cd zongji && npm install --omit=dev)
COPY lib lib
COPY config config COPY config config
COPY queues queues COPY queues queues
COPY \ COPY LICENSE \
LICENSE \
README.md \ README.md \
consumer.js \ consumer.js \
queue.js \
./ ./
CMD ["node", "consumer.js"] CMD ["node", "consumer.js"]

View File

@ -23,8 +23,8 @@ RUN npm install --omit=dev --no-audit --prefer-offline \
&& (cd zongji && npm install --omit=dev) && (cd zongji && npm install --omit=dev)
COPY config config COPY config config
COPY \ COPY queues queues
LICENSE \ COPY LICENSE \
README.md \ README.md \
mycdc.js \ mycdc.js \
index.js \ index.js \

13
assets/zongji.sql Normal file
View File

@ -0,0 +1,13 @@
CREATE TABLE `util`.`binlogQueue`(
`code` VARCHAR(255) NOT NULL,
`logName` VARCHAR(255) NOT NULL,
`position` BIGINT UNSIGNED NOT NULL,
PRIMARY KEY (`code`)
) ENGINE = InnoDB;
CREATE USER 'mycdc-producer'@'%' IDENTIFIED BY 'P4$$w0rd';
GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT ON *.* TO 'mycdc-producer'@'%';
GRANT INSERT, DELETE ON `util`.* TO 'mycdc-producer'@'%';
CREATE USER 'mycdc-producer'@'%' IDENTIFIED BY 'P4$$w0rd';

4
assets/zongji.undo.sql Normal file
View File

@ -0,0 +1,4 @@
DROP TABLE IF EXISTS `util`.`binlogQueue`;
DROP USER IF EXISTS 'mycdc-producer'@'%';
DROP USER IF EXISTS 'mycdc-consumer'@'%';

View File

@ -1,5 +1,9 @@
debug: false debug: false
testMode: false testMode: false
defaults:
mode: fk
flushInterval: 5000
amqpPrefetch: 100
amqp: amqp://user:password@localhost:5672 amqp: amqp://user:password@localhost:5672
db: db:
host: localhost host: localhost

View File

@ -5,6 +5,7 @@ deleteNonEmpty: false
amqp: amqp://user:password@localhost:5672 amqp: amqp://user:password@localhost:5672
pingInterval: 60 pingInterval: 60
flushInterval: 10 flushInterval: 10
serverId: 1
db: db:
host: localhost host: localhost
port: 3306 port: 3306

View File

@ -265,4 +265,4 @@ ticketRisk:
key: clientFk key: clientFk
columns: columns:
- clientFk - clientFk
- risk - risk

View File

@ -7,8 +7,8 @@ const amqp = require('amqplib');
const {cpus} = require('os'); const {cpus} = require('os');
const queues = { const queues = {
id: require('./queues/queue-id'), id: require('./lib/queue-id'),
fk: require('./queues/queue-fk') fk: require('./lib/queue-fk')
}; };
class Consumer { class Consumer {
@ -53,14 +53,16 @@ class Consumer {
} }
async consumeQueues() { async consumeQueues() {
const queuesConf = require('./config/queues.yml'); const {conf} = this;
const {defaults} = this.conf;
this.queues = {}; this.queues = {};
for (const queueName in queuesConf) { for (const queueName of conf.pollQueues) {
const queueConf = queuesConf[queueName]; const confFile = path.join(__dirname, 'queues', `${queueName}.yml`);
const queueConf = Object.assign({}, defaults, require(confFile));
const {mode} = queueConf; const {mode} = queueConf;
const QueueClass = queues[mode]; const QueueClass = queues[mode];
if (!QueueClass) { if (!QueueClass) {
console.warn(`Ignoring queue '${queueName}' with unknown mode '${mode}'`); console.warn(`Ignoring queue '${queueName}' with unknown mode '${mode}'`);
continue; continue;

View File

@ -4,9 +4,9 @@ services:
image: registry.verdnatura.es/mycdc-producer:${VERSION:?} image: registry.verdnatura.es/mycdc-producer:${VERSION:?}
build: build:
context: . context: .
dockerfile: Dockerfile.producer dockerfile: assets/Dockerfile.producer
consumer: consumer:
image: registry.verdnatura.es/mycdc-consumer:${VERSION:?} image: registry.verdnatura.es/mycdc-consumer:${VERSION:?}
build: build:
context: . context: .
dockerfile: Dockerfile.consumer dockerfile: assets/Dockerfile.consumer

94
lib/queue-fk.js Normal file
View File

@ -0,0 +1,94 @@
const Queue = require('./queue');
module.exports = class QueueFk extends Queue {
constructor(consumer, name, conf) {
super(consumer, name, conf);
this.reset();
}
async consume() {
await super.consume();
this.flush();
}
reset() {
this.messages = [];
this.nMessages = 0;
this.scopes = new Map();
}
flush(flushInterval) {
if (this.timeout) {
clearTimeout(this.timeout);
this.timeout = null;
}
this.timeout = setTimeout(
() => this.onFlushTimeout(), flushInterval);
}
async onFlushTimeout() {
const {messages} = this;
if (messages.length) {
const {consumer, scopes, channel} = this;
this.reset();
if (consumer.conf.debug)
console.debug('Flush:'.blue, this.name.yellow, scopes);
try {
for (const [scope, ids] of scopes) {
let query = this.conf.query[scope];
for (const id of ids) {
const params = {id};
//query = consumer.db.format(query, params);
if (consumer.conf.debug)
console.debug('SQL:'.blue, query, params);
if (!consumer.conf.testMode)
await consumer.db.query(query, params);
}
}
const promises = [];
for (const msg of messages)
promises.push(channel.ack(msg));
await Promise.all(promises);
} catch(err) {
for (const msg of messages)
await channel.nack(msg);
throw err;
}
}
this.flush(this.conf.flushInterval);
}
onConsume(msg) {
const {conf} = this;
const consumer = this.consumer;
const data = JSON.parse(msg.content.toString());
if (consumer.conf.debug)
console.debug('Message:'.blue, this.name.yellow, data.table);
const tableConf = conf.includeSchema[data.schema][data.table];
const scope = tableConf.scope ?? data.table;
let ids = this.scopes.get(scope);
if (!ids) this.scopes.set(scope, ids = new Set());
const key = tableConf.key;
if (data.eventName === 'updaterows') {
for (const row of data.rows) {
ids.add(row.before[key]);
ids.add(row.after[key]);
}
} else
for (const row of data.rows)
ids.add(row[key]);
this.messages.push(msg);
if (this.messages.length == conf.amqpPrefetch)
this.flush();
}
}

View File

@ -1,4 +1,4 @@
const Queue = require('../queue'); const Queue = require('./queue');
module.exports = class QueueId extends Queue { module.exports = class QueueId extends Queue {
async onConsume(msg) { async onConsume(msg) {

View File

@ -1,6 +1,6 @@
require('require-yaml'); require('require-yaml');
require('colors'); require('colors');
const fs = require('fs'); const fs = require('fs-extra');
const path = require('path'); const path = require('path');
const ZongJi = require('./zongji'); const ZongJi = require('./zongji');
const mysql = require('mysql2/promise'); const mysql = require('mysql2/promise');
@ -29,7 +29,16 @@ module.exports = class MyCDC {
const localConfig = require(localPath); const localConfig = require(localPath);
Object.assign(conf, localConfig); Object.assign(conf, localConfig);
} }
this.queuesConf = require('./config/queues.yml');
this.queuesConf = {};
const queueDir = path.join(__dirname, 'queues');
const queueFiles = await fs.readdir(queueDir);
for (const queueFile of queueFiles) {
const match = queueFile.match(/^([a-zA-Z0-9-_]+)\.ya?ml$/);
if (!match)
throw new Error(`Invalid queue file name '${queueFile}'`);
this.queuesConf[match[1]] = require(path.join(queueDir, queueFile));
}
const queues = this.queuesConf; const queues = this.queuesConf;
for (const queueName in queues) { for (const queueName in queues) {
@ -51,7 +60,7 @@ module.exports = class MyCDC {
tableInfo = { tableInfo = {
queues: new Map(), queues: new Map(),
events: new Map(), events: new Map(),
columns: false, columnSet: false,
fk: 'id' fk: 'id'
}; };
tableMap.set(tableName, tableInfo); tableMap.set(tableName, tableInfo);
@ -70,13 +79,13 @@ module.exports = class MyCDC {
const columns = table.columns; const columns = table.columns;
if (columns) { if (columns) {
if (tableInfo.columns === false) if (tableInfo.columnSet === false)
tableInfo.columns = new Set(); tableInfo.columnSet = new Set();
if (tableInfo.columns !== true) if (tableInfo.columnSet !== true)
for (const column of columns) for (const column of columns)
tableInfo.columns.add(column); tableInfo.columnSet.add(column);
} else } else
tableInfo.columns = true; tableInfo.columnSet = true;
if (table.id) if (table.id)
tableInfo.id = table.id; tableInfo.id = table.id;
@ -87,8 +96,15 @@ module.exports = class MyCDC {
} }
const includeSchema = {}; const includeSchema = {};
for (const [schemaName, tableMap] of this.schemaMap)
for (const [schemaName, tableMap] of this.schemaMap) {
includeSchema[schemaName] = Array.from(tableMap.keys()); includeSchema[schemaName] = Array.from(tableMap.keys());
for (const [tableName, tableInfo] of tableMap) {
if (tableInfo.columnSet !== true)
tableInfo.columns = Array.from(tableInfo.columnSet.keys());
}
}
this.opts = { this.opts = {
includeEvents: [ includeEvents: [
@ -98,7 +114,8 @@ module.exports = class MyCDC {
'updaterows', 'updaterows',
'deleterows' 'deleterows'
], ],
includeSchema includeSchema,
serverId: conf.serverId
}; };
if (conf.testMode) if (conf.testMode)
@ -309,7 +326,10 @@ module.exports = class MyCDC {
case 'rotate': case 'rotate':
this.filename = evt.binlogName; this.filename = evt.binlogName;
position = evt.position; position = evt.position;
console.log(`[${eventName}] filename: ${this.filename}`, `position: ${this.position}, nextPosition: ${evt.nextPosition}`); console.log(
`[${eventName}] filename: ${this.filename}`,
`position: ${this.position}, nextPosition: ${evt.nextPosition}`
);
break; break;
case 'writerows': case 'writerows':
case 'deleterows': case 'deleterows':
@ -341,9 +361,9 @@ module.exports = class MyCDC {
if (isUpdate) { if (isUpdate) {
rows = []; rows = [];
cols = new Set(); cols = new Set();
let columns = tableInfo.columns === true const columns = tableInfo.columnSet === true
? Object.keys(evt.rows[0].after) ? Object.keys(evt.rows[0].after)
: tableInfo.columns.keys(); : tableInfo.columns;
for (const row of evt.rows) { for (const row of evt.rows) {
let nColsChanged = 0; let nColsChanged = 0;

83
package-lock.json generated
View File

@ -1,16 +1,17 @@
{ {
"name": "mycdc", "name": "mycdc",
"version": "0.0.5", "version": "0.0.8",
"lockfileVersion": 2, "lockfileVersion": 2,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "mycdc", "name": "mycdc",
"version": "0.0.5", "version": "0.0.8",
"license": "GPL-3.0", "license": "GPL-3.0",
"dependencies": { "dependencies": {
"amqplib": "^0.10.3", "amqplib": "^0.10.3",
"colors": "^1.4.0", "colors": "^1.4.0",
"fs-extra": "^11.2.0",
"mysql2": "^3.9.3", "mysql2": "^3.9.3",
"require-yaml": "^0.0.1", "require-yaml": "^0.0.1",
"zongji": "file:../zongji" "zongji": "file:../zongji"
@ -94,6 +95,19 @@
"node": ">=0.10" "node": ">=0.10"
} }
}, },
"node_modules/fs-extra": {
"version": "11.2.0",
"resolved": "https://registry.npmjs.org/fs-extra/-/fs-extra-11.2.0.tgz",
"integrity": "sha512-PmDi3uwK5nFuXh7XDTlVnS17xJS7vW36is2+w3xcv8SVxiB4NyATf4ctkVY5bkSjX0Y4nbvZCq1/EjtEyr9ktw==",
"dependencies": {
"graceful-fs": "^4.2.0",
"jsonfile": "^6.0.1",
"universalify": "^2.0.0"
},
"engines": {
"node": ">=14.14"
}
},
"node_modules/generate-function": { "node_modules/generate-function": {
"version": "2.3.1", "version": "2.3.1",
"resolved": "https://registry.npmjs.org/generate-function/-/generate-function-2.3.1.tgz", "resolved": "https://registry.npmjs.org/generate-function/-/generate-function-2.3.1.tgz",
@ -102,6 +116,11 @@
"is-property": "^1.0.2" "is-property": "^1.0.2"
} }
}, },
"node_modules/graceful-fs": {
"version": "4.2.11",
"resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.11.tgz",
"integrity": "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ=="
},
"node_modules/iconv-lite": { "node_modules/iconv-lite": {
"version": "0.6.3", "version": "0.6.3",
"resolved": "https://registry.npmjs.org/iconv-lite/-/iconv-lite-0.6.3.tgz", "resolved": "https://registry.npmjs.org/iconv-lite/-/iconv-lite-0.6.3.tgz",
@ -139,6 +158,17 @@
"js-yaml": "bin/js-yaml.js" "js-yaml": "bin/js-yaml.js"
} }
}, },
"node_modules/jsonfile": {
"version": "6.1.0",
"resolved": "https://registry.npmjs.org/jsonfile/-/jsonfile-6.1.0.tgz",
"integrity": "sha512-5dgndWOriYSm5cnYaJNhalLNDKOqFwyDB/rr1E9ZsGciGvKPs8R2xYGCacuf3z6K1YKDz182fd+fY3cn3pMqXQ==",
"dependencies": {
"universalify": "^2.0.0"
},
"optionalDependencies": {
"graceful-fs": "^4.1.6"
}
},
"node_modules/long": { "node_modules/long": {
"version": "5.2.3", "version": "5.2.3",
"resolved": "https://registry.npmjs.org/long/-/long-5.2.3.tgz", "resolved": "https://registry.npmjs.org/long/-/long-5.2.3.tgz",
@ -158,9 +188,9 @@
"integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w=="
}, },
"node_modules/mysql2": { "node_modules/mysql2": {
"version": "3.9.3", "version": "3.9.4",
"resolved": "https://registry.npmjs.org/mysql2/-/mysql2-3.9.3.tgz", "resolved": "https://registry.npmjs.org/mysql2/-/mysql2-3.9.4.tgz",
"integrity": "sha512-+ZaoF0llESUy7BffccHG+urErHcWPZ/WuzYAA9TEeLaDYyke3/3D+VQDzK9xzRnXpd0eMtRf0WNOeo4Q1Baung==", "integrity": "sha512-OEESQuwxMza803knC1YSt7NMuc1BrK9j7gZhCSs2WAyxr1vfiI7QLaLOKTh5c9SWGz98qVyQUbK8/WckevNQhg==",
"dependencies": { "dependencies": {
"denque": "^2.1.0", "denque": "^2.1.0",
"generate-function": "^2.3.1", "generate-function": "^2.3.1",
@ -251,6 +281,14 @@
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz",
"integrity": "sha512-ev2QzSzWPYmy9GuqfIVildA4OdcGLeFZQrq5ys6RtiuF+RQQiZWr8TZNyAcuVXyQRYfEO+MsoB/1BuQVhOJuoQ==" "integrity": "sha512-ev2QzSzWPYmy9GuqfIVildA4OdcGLeFZQrq5ys6RtiuF+RQQiZWr8TZNyAcuVXyQRYfEO+MsoB/1BuQVhOJuoQ=="
}, },
"node_modules/universalify": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/universalify/-/universalify-2.0.1.tgz",
"integrity": "sha512-gptHNQghINnc/vTGIk0SOFGFNXw7JVrlRUtConJRlvaw6DuX0wO5Jeko9sWrMBhh+PsYAZ7oXAiOnf/UKogyiw==",
"engines": {
"node": ">= 10.0.0"
}
},
"node_modules/url-parse": { "node_modules/url-parse": {
"version": "1.5.10", "version": "1.5.10",
"resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.10.tgz", "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.10.tgz",
@ -320,6 +358,16 @@
"resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz",
"integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==" "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw=="
}, },
"fs-extra": {
"version": "11.2.0",
"resolved": "https://registry.npmjs.org/fs-extra/-/fs-extra-11.2.0.tgz",
"integrity": "sha512-PmDi3uwK5nFuXh7XDTlVnS17xJS7vW36is2+w3xcv8SVxiB4NyATf4ctkVY5bkSjX0Y4nbvZCq1/EjtEyr9ktw==",
"requires": {
"graceful-fs": "^4.2.0",
"jsonfile": "^6.0.1",
"universalify": "^2.0.0"
}
},
"generate-function": { "generate-function": {
"version": "2.3.1", "version": "2.3.1",
"resolved": "https://registry.npmjs.org/generate-function/-/generate-function-2.3.1.tgz", "resolved": "https://registry.npmjs.org/generate-function/-/generate-function-2.3.1.tgz",
@ -328,6 +376,11 @@
"is-property": "^1.0.2" "is-property": "^1.0.2"
} }
}, },
"graceful-fs": {
"version": "4.2.11",
"resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.11.tgz",
"integrity": "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ=="
},
"iconv-lite": { "iconv-lite": {
"version": "0.6.3", "version": "0.6.3",
"resolved": "https://registry.npmjs.org/iconv-lite/-/iconv-lite-0.6.3.tgz", "resolved": "https://registry.npmjs.org/iconv-lite/-/iconv-lite-0.6.3.tgz",
@ -359,6 +412,15 @@
"argparse": "^2.0.1" "argparse": "^2.0.1"
} }
}, },
"jsonfile": {
"version": "6.1.0",
"resolved": "https://registry.npmjs.org/jsonfile/-/jsonfile-6.1.0.tgz",
"integrity": "sha512-5dgndWOriYSm5cnYaJNhalLNDKOqFwyDB/rr1E9ZsGciGvKPs8R2xYGCacuf3z6K1YKDz182fd+fY3cn3pMqXQ==",
"requires": {
"graceful-fs": "^4.1.6",
"universalify": "^2.0.0"
}
},
"long": { "long": {
"version": "5.2.3", "version": "5.2.3",
"resolved": "https://registry.npmjs.org/long/-/long-5.2.3.tgz", "resolved": "https://registry.npmjs.org/long/-/long-5.2.3.tgz",
@ -375,9 +437,9 @@
"integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w=="
}, },
"mysql2": { "mysql2": {
"version": "3.9.3", "version": "3.9.4",
"resolved": "https://registry.npmjs.org/mysql2/-/mysql2-3.9.3.tgz", "resolved": "https://registry.npmjs.org/mysql2/-/mysql2-3.9.4.tgz",
"integrity": "sha512-+ZaoF0llESUy7BffccHG+urErHcWPZ/WuzYAA9TEeLaDYyke3/3D+VQDzK9xzRnXpd0eMtRf0WNOeo4Q1Baung==", "integrity": "sha512-OEESQuwxMza803knC1YSt7NMuc1BrK9j7gZhCSs2WAyxr1vfiI7QLaLOKTh5c9SWGz98qVyQUbK8/WckevNQhg==",
"requires": { "requires": {
"denque": "^2.1.0", "denque": "^2.1.0",
"generate-function": "^2.3.1", "generate-function": "^2.3.1",
@ -458,6 +520,11 @@
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz",
"integrity": "sha512-ev2QzSzWPYmy9GuqfIVildA4OdcGLeFZQrq5ys6RtiuF+RQQiZWr8TZNyAcuVXyQRYfEO+MsoB/1BuQVhOJuoQ==" "integrity": "sha512-ev2QzSzWPYmy9GuqfIVildA4OdcGLeFZQrq5ys6RtiuF+RQQiZWr8TZNyAcuVXyQRYfEO+MsoB/1BuQVhOJuoQ=="
}, },
"universalify": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/universalify/-/universalify-2.0.1.tgz",
"integrity": "sha512-gptHNQghINnc/vTGIk0SOFGFNXw7JVrlRUtConJRlvaw6DuX0wO5Jeko9sWrMBhh+PsYAZ7oXAiOnf/UKogyiw=="
},
"url-parse": { "url-parse": {
"version": "1.5.10", "version": "1.5.10",
"resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.10.tgz", "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.10.tgz",

View File

@ -1,6 +1,6 @@
{ {
"name": "mycdc", "name": "mycdc",
"version": "0.0.8", "version": "0.0.14",
"author": "Verdnatura Levante SL", "author": "Verdnatura Levante SL",
"description": "Asynchronous DB calculations reading the binary log", "description": "Asynchronous DB calculations reading the binary log",
"license": "GPL-3.0", "license": "GPL-3.0",
@ -14,8 +14,14 @@
"dependencies": { "dependencies": {
"amqplib": "^0.10.3", "amqplib": "^0.10.3",
"colors": "^1.4.0", "colors": "^1.4.0",
"fs-extra": "^11.2.0",
"mysql2": "^3.9.3", "mysql2": "^3.9.3",
"require-yaml": "^0.0.1", "require-yaml": "^0.0.1",
"zongji": "file:../zongji" "zongji": "file:../zongji"
},
"scripts": {
"start": "node index.js",
"consumer": "node consumer.js",
"rabbit": "assets/run-rabbit.sh"
} }
} }

24
queues/orderTotal.yml Normal file
View File

@ -0,0 +1,24 @@
query:
order: CALL hedera.order_recalc(:id)
includeSchema:
hedera:
order:
key: id
columns:
- id
- address_id
- company_id
- date_send
- customer_id
events:
- updaterows
orderRow:
key: orderFk
scope: order
columns:
- id
- orderFk
- itemFk
- warehouseFk
- shipment
- amount

View File

@ -1,86 +0,0 @@
const Queue = require('../queue');
module.exports = class QueueFk extends Queue {
constructor(consumer, name, conf) {
super(consumer, name, conf);
this.reset();
}
async consume() {
await super.consume();
this.flush();
}
reset() {
this.lastMessage = null;
this.nMessages = 0;
this.ids = new Set();
}
flush(flushInterval) {
if (this.timeout) {
clearTimeout(this.timeout);
this.timeout = null;
}
this.timeout = setTimeout(
() => this.onFlushTimeout(), flushInterval);
}
async onFlushTimeout() {
const consumer = this.consumer;
if (this.ids.size) {
if (consumer.conf.debug)
console.debug('Flush:'.blue, this.name.yellow, this.ids);
const ids = Array.from(this.ids);
const lastMessage = this.lastMessage;
this.reset();
try {
for (const id of ids) {
const params = {id};
//const sql = consumer.db.format(this.conf.query, params);
const sql = this.conf.query;
if (consumer.conf.debug)
console.debug('SQL:'.blue, sql, params);
if (!consumer.conf.testMode)
await consumer.db.query(sql, params);
}
await this.channel.ack(lastMessage, true);
} catch(err) {
await this.channel.nack(lastMessage, true);
console.error(err);
}
}
this.flush(this.conf.flushInterval);
}
async onConsume(msg) {
const consumer = this.consumer;
const data = JSON.parse(msg.content.toString());
if (consumer.conf.debug)
console.debug('Message:'.blue, this.name.yellow, data.table);
const ids = this.ids;
const key = this.conf.includeSchema[data.schema][data.table].key;
if (data.eventName === 'updaterows') {
for (const row of data.rows) {
ids.add(row.before[key]);
ids.add(row.after[key]);
}
} else
for (const row of data.rows)
ids.add(row[key]);
this.nMessages++;
this.lastMessage = msg;
if (this.nMessages == consumer.conf.amqpPrefetch)
this.flush();
}
}

42
queues/ticketTotal.yml Normal file
View File

@ -0,0 +1,42 @@
query:
ticket: CALL vn.ticket_recalc(:id, NULL)
client: CALL vn.ticket_recalcByScope('client', :id)
address: CALL vn.ticket_recalcByScope('address', :id)
includeSchema:
vn:
ticket:
key: id
columns:
- id
- clientFk
- addressFk
- companyFk
- shipped
events:
- updaterows
ticketService:
key: ticketFk
scope: ticket
columns:
- ticketFk
- price
- quantity
sale:
key: ticketFk
scope: ticket
columns:
- id
- ticketFk
- itemFk
- quantity
- price
- discount
client:
key: id
columns:
- provinceFk
- isVies
address:
key: id
columns:
- isEqualizated

9
queues/travelEntries.yml Normal file
View File

@ -0,0 +1,9 @@
query:
travel: CALL vn.travel_recalc(:id)
includeSchema:
vn:
entry:
key: travelFk
scope: travel
columns:
- travelFk

View File

@ -1,12 +0,0 @@
CREATE TABLE `util`.`binlogQueue`(
`code` VARCHAR(255) NOT NULL,
`logName` VARCHAR(255) NOT NULL,
`position` BIGINT UNSIGNED NOT NULL,
PRIMARY KEY (`code`)
) ENGINE = InnoDB;
CREATE USER 'zongji'@'%' IDENTIFIED BY 'password';
GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT ON *.* TO 'zongji'@'%';
GRANT INSERT, DELETE ON `util`.* TO 'zongji'@'%';

View File

@ -1,3 +0,0 @@
DROP TABLE IF EXISTS `util`.`binlogQueue`;
DROP USER 'zongji'@'%';