feat: refs #4409 Code refactor, fixes, auto bind
gitea/mycdc/pipeline/head This commit looks good Details

This commit is contained in:
Juan Ferrer 2024-04-08 15:17:06 +02:00
parent 68966f5353
commit 613f0b5e1c
11 changed files with 313 additions and 230 deletions

3
.gitignore vendored
View File

@ -1,4 +1,5 @@
node_modules
zongji
config/producer.*.yml
config/consumer.*.yml
config/consumer.*.yml
config/queues.*.yml

View File

@ -23,6 +23,7 @@ RUN npm install --omit=dev --no-audit --prefer-offline \
&& (cd zongji && npm install --omit=dev)
COPY config config
COPY queues queues
COPY \
LICENSE \
README.md \

View File

@ -1,6 +1,7 @@
code: mycdc
debug: false
testMode: false
deleteNonEmpty: false
amqp: amqp://user:password@localhost:5672
pingInterval: 60
flushInterval: 10

View File

@ -1,5 +1,5 @@
orderTotal:
query: CALL hedera.order_recalc(?)
query: CALL hedera.order_recalc(:id)
mode: fk
flushInterval: 5000
includeSchema:
@ -23,8 +23,8 @@ orderTotal:
- warehouseFk
- shipment
- amount
ticketTotal:
query: CALL vn.ticket_recalc(?)
ticketTotal:
query: CALL vn.ticket_recalc(:id, NULL)
mode: fk
flushInterval: 5000
includeSchema:
@ -46,27 +46,62 @@ ticketTotal:
- itemFk
- quantity
- price
comparative:
query: CALL vn.itemComparative_refresh(?, ?, ?)
mode: changes
flushInterval: 5000
stock:
mode: id
includeSchema:
vn:
ticket:
vn:
travel:
query: CALL stock.log_refreshBuy(:table, :id)
key: id
entry:
query: CALL stock.log_refreshBuy(:table, :id)
key: id
columns:
- id
- travelFk
- isRaid
events:
- updaterows
buy:
query: CALL stock.log_refreshBuy(:table, :id)
key: id
columns:
- id
- entryFk
- itemFk
- quantity
- created
ticket:
query: CALL stock.log_refreshSale(:table, :id)
key: id
columns:
- id
- shipped
- warehouseFk
- isDeleted
- clientFk
- shipped
events:
- updaterows
sale:
query: CALL stock.log_refreshSale(:table, :id)
key: id
columns:
- id
- ticketFk
- itemFk
- quantity
- priceFixed
- created
- isPicked
hedera:
order:
query: CALL stock.log_refreshOrder(:table, :id)
key: id
columns:
- id
- date_send
- address_id
- company_id
- customer_id
events:
- updaterows
orderRow:
query: CALL stock.log_refreshOrder(:table, :id)
key: id

View File

@ -4,9 +4,13 @@ const fs = require('fs');
const path = require('path');
const mysql = require('mysql2/promise');
const amqp = require('amqplib');
const Queue = require('./queue');
const {cpus} = require('os');
const queues = {
id: require('./queues/queue-id'),
fk: require('./queues/queue-fk')
};
class Consumer {
async start() {
const defaultConfig = require('./config/consumer.yml');
@ -38,7 +42,8 @@ class Consumer {
this.onErrorListener = err => this.onError(err);
const dbConfig = Object.assign({
connectionLimit: cpus().length
connectionLimit: cpus().length,
namedPlaceholders: true
}, conf.db);
this.db = await mysql.createPool(dbConfig);
@ -52,7 +57,16 @@ class Consumer {
this.queues = {};
for (const queueName in queuesConf) {
const queue = new Queue(this, queueName, queuesConf[queueName]);
const queueConf = queuesConf[queueName];
const {mode} = queueConf;
const QueueClass = queues[mode];
if (!QueueClass) {
console.warn(`Ignoring queue '${queueName}' with unknown mode '${mode}'`);
continue;
}
const queue = new QueueClass(this, queueName, queueConf);
this.queues[queueName] = queue;
await queue.consume();
}

View File

@ -51,7 +51,7 @@ module.exports = class MyCDC {
tableInfo = {
queues: new Map(),
events: new Map(),
columns: new Map(),
columns: false,
fk: 'id'
};
tableMap.set(tableName, tableInfo);
@ -69,14 +69,14 @@ module.exports = class MyCDC {
}
const columns = table.columns;
for (const column of columns) {
let columnInfo = tableInfo.columns.get(column);
if (!columnInfo) {
columnInfo = [];
tableInfo.columns.set(column, columnInfo);
}
columnInfo.push(queueName);
}
if (columns) {
if (tableInfo.columns === false)
tableInfo.columns = new Set();
if (tableInfo.columns !== true)
for (const column of columns)
tableInfo.columns.add(column);
} else
tableInfo.columns = true;
if (table.id)
tableInfo.id = table.id;
@ -128,19 +128,39 @@ module.exports = class MyCDC {
// RabbitMQ
this.publisher = await amqp.connect(conf.amqp);
this.channel = await this.publisher.createChannel();
const channel = this.channel = await this.publisher.createChannel();
for (const tableMap of this.schemaMap.values()) {
for (const tableName of tableMap.keys()) {
await this.channel.assertExchange(tableName, 'headers', {
await channel.assertExchange(tableName, 'headers', {
durable: true
});
}
}
for (const queueName in this.queuesConf) {
await this.channel.assertQueue(queueName, {
const options = conf.deleteNonEmpty ? {} : {ifEmpty: true};
await channel.deleteQueue(queueName, {options});
await channel.assertQueue(queueName, {
durable: true
});
const includeSchema = this.queuesConf[queueName].includeSchema;
for (const schemaName in includeSchema) {
const schema = includeSchema[schemaName];
for (const tableName in schema) {
const table = schema[tableName];
const events = table.events || allEvents;
for (const event of events) {
let args;
if (event === 'updaterows' && table.columns) {
args = {'x-match': 'any'};
table.columns.map(c => args[c] = true);
} else
args = {'z-event': event};
await channel.bindQueue(queueName, tableName, '', args);
}
}
}
}
// Zongji
@ -159,6 +179,7 @@ module.exports = class MyCDC {
const [row] = res;
this.filename = row.logName;
this.position = row.position;
this.isFlushed = true;
Object.assign(this.opts, {
filename: this.filename,
position: this.position
@ -298,7 +319,7 @@ module.exports = class MyCDC {
}
this.position = position;
this.flushed = false;
this.isFlushed = false;
}
onRowEvent(evt, eventName) {
@ -320,7 +341,9 @@ module.exports = class MyCDC {
if (isUpdate) {
rows = [];
cols = new Set();
const columns = tableInfo.columns.keys();
let columns = tableInfo.columns === true
? Object.keys(evt.rows[0].after)
: tableInfo.columns.keys();
for (const row of evt.rows) {
let nColsChanged = 0;
@ -348,13 +371,12 @@ module.exports = class MyCDC {
rows
};
const headers = {};
let headers = {};
headers['z-event'] = eventName;
if (isUpdate) {
for (const col of cols)
headers[col] = true;
data.cols = Array.from(cols);
} else {
headers['z-'+ eventName] = true;
}
const options = {
@ -367,30 +389,23 @@ module.exports = class MyCDC {
Buffer.from(jsonData), options);
if (this.debug) {
//console.debug(data, options);
console.debug(data, options);
console.debug('Queued:'.blue,
`${tableName}(${rows.length}) [${eventName}]`);
}
}
async flushQueue() {
if (this.flushed) return;
const position = this.nextPosition;
if (this.isFlushed) return;
const {filename, position} = this;
this.debug('Flush', `filename: ${filename}, position: ${position}`);
if (position) {
const filename = this.nextFilename;
this.debug('Flush', `filename: ${filename}, position: ${position}`);
const replaceQuery =
'REPLACE INTO `binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?';
if (!this.conf.testMode)
await this.db.query(replaceQuery, [this.conf.code, filename, position]);
const replaceQuery =
'REPLACE INTO `binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?';
if (!this.conf.testMode)
await this.db.query(replaceQuery, [this.conf.code, filename, position]);
this.flushed = true;
}
this.nextFilename = this.filename;
this.nextPosition = this.position;
this.isFlushed = true;
}
async connectionPing() {

138
package-lock.json generated
View File

@ -1,15 +1,22 @@
{
"name": "mycdc",
"version": "0.0.5",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "mycdc",
"version": "0.0.5",
"license": "GPL-3.0",
"dependencies": {
"amqplib": "^0.10.3",
"colors": "^1.4.0",
"mysql2": "^2.3.3",
"mysql2": "^3.9.3",
"require-yaml": "^0.0.1",
"zongji": "file:../zongji"
},
"engines": {
"node": ">=20"
}
},
"../zongji": {},
@ -133,19 +140,16 @@
}
},
"node_modules/long": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/long/-/long-4.0.0.tgz",
"integrity": "sha512-XsP+KhQif4bjX1kbuSiySJFNAehNxgLb6hPRGJ9QsUr8ajHkuXGdrHmFUTUUXhDwVX2R5bY4JNZEwbUiMhV+MA=="
"version": "5.2.3",
"resolved": "https://registry.npmjs.org/long/-/long-5.2.3.tgz",
"integrity": "sha512-lcHwpNoggQTObv5apGNCTdJrO69eHOZMi4BNC+rTLER8iHAqGrUVeLh/irVIM7zTw2bOXA8T6uNPeujwOLg/2Q=="
},
"node_modules/lru-cache": {
"version": "6.0.0",
"resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz",
"integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==",
"dependencies": {
"yallist": "^4.0.0"
},
"version": "8.0.5",
"resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-8.0.5.tgz",
"integrity": "sha512-MhWWlVnuab1RG5/zMRRcVGXZLCXrZTgfwMikgzCegsPnG62yDQo5JnqKkrK4jO5iKqDAZGItAqN5CtKBCBWRUA==",
"engines": {
"node": ">=10"
"node": ">=16.14"
}
},
"node_modules/ms": {
@ -154,16 +158,16 @@
"integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w=="
},
"node_modules/mysql2": {
"version": "2.3.3",
"resolved": "https://registry.npmjs.org/mysql2/-/mysql2-2.3.3.tgz",
"integrity": "sha512-wxJUev6LgMSgACDkb/InIFxDprRa6T95+VEoR+xPvtngtccNH2dGjEB/fVZ8yg1gWv1510c9CvXuJHi5zUm0ZA==",
"version": "3.9.3",
"resolved": "https://registry.npmjs.org/mysql2/-/mysql2-3.9.3.tgz",
"integrity": "sha512-+ZaoF0llESUy7BffccHG+urErHcWPZ/WuzYAA9TEeLaDYyke3/3D+VQDzK9xzRnXpd0eMtRf0WNOeo4Q1Baung==",
"dependencies": {
"denque": "^2.0.1",
"denque": "^2.1.0",
"generate-function": "^2.3.1",
"iconv-lite": "^0.6.3",
"long": "^4.0.0",
"lru-cache": "^6.0.0",
"named-placeholders": "^1.1.2",
"long": "^5.2.1",
"lru-cache": "^8.0.0",
"named-placeholders": "^1.1.3",
"seq-queue": "^0.0.5",
"sqlstring": "^2.3.2"
},
@ -172,35 +176,24 @@
}
},
"node_modules/named-placeholders": {
"version": "1.1.2",
"resolved": "https://registry.npmjs.org/named-placeholders/-/named-placeholders-1.1.2.tgz",
"integrity": "sha512-wiFWqxoLL3PGVReSZpjLVxyJ1bRqe+KKJVbr4hGs1KWfTZTQyezHFBbuKj9hsizHyGV2ne7EMjHdxEGAybD5SA==",
"version": "1.1.3",
"resolved": "https://registry.npmjs.org/named-placeholders/-/named-placeholders-1.1.3.tgz",
"integrity": "sha512-eLoBxg6wE/rZkJPhU/xRX1WTpkFEwDJEN96oxFrTsqBdbT5ec295Q+CoHrL9IT0DipqKhmGcaZmwOt8OON5x1w==",
"dependencies": {
"lru-cache": "^4.1.3"
"lru-cache": "^7.14.1"
},
"engines": {
"node": ">=6.0.0"
"node": ">=12.0.0"
}
},
"node_modules/named-placeholders/node_modules/lru-cache": {
"version": "4.1.5",
"resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-4.1.5.tgz",
"integrity": "sha512-sWZlbEP2OsHNkXrMl5GYk/jKk70MBng6UU4YI/qGDYbgf6YbP4EvmqISbXCoJiRKs+1bSpFHVgQxvJ17F2li5g==",
"dependencies": {
"pseudomap": "^1.0.2",
"yallist": "^2.1.2"
"version": "7.18.3",
"resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-7.18.3.tgz",
"integrity": "sha512-jumlc0BIUrS3qJGgIkWZsyfAM7NCWiBcCDhnd+3NNM5KbBmLTgHVfWBcg6W+rLUsIpzpERPsvwUP7CckAQSOoA==",
"engines": {
"node": ">=12"
}
},
"node_modules/named-placeholders/node_modules/yallist": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/yallist/-/yallist-2.1.2.tgz",
"integrity": "sha512-ncTzHV7NvsQZkYe1DW7cbDLm0YpzHmZF5r/iyP3ZnQtMiJ+pjzisCiMNI+Sj+xQF5pXhSHxSB3uDbsBTzY/c2A=="
},
"node_modules/pseudomap": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/pseudomap/-/pseudomap-1.0.2.tgz",
"integrity": "sha512-b/YwNhb8lk1Zz2+bXXpS/LK9OisiZZ1SNsSLxN1x2OXVEhW2Ckr/7mWE5vrC1ZTiJlD9g19jWszTmJsB+oEpFQ=="
},
"node_modules/querystringify": {
"version": "2.2.0",
"resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz",
@ -267,11 +260,6 @@
"requires-port": "^1.0.0"
}
},
"node_modules/yallist": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz",
"integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A=="
},
"node_modules/zongji": {
"resolved": "../zongji",
"link": true
@ -372,17 +360,14 @@
}
},
"long": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/long/-/long-4.0.0.tgz",
"integrity": "sha512-XsP+KhQif4bjX1kbuSiySJFNAehNxgLb6hPRGJ9QsUr8ajHkuXGdrHmFUTUUXhDwVX2R5bY4JNZEwbUiMhV+MA=="
"version": "5.2.3",
"resolved": "https://registry.npmjs.org/long/-/long-5.2.3.tgz",
"integrity": "sha512-lcHwpNoggQTObv5apGNCTdJrO69eHOZMi4BNC+rTLER8iHAqGrUVeLh/irVIM7zTw2bOXA8T6uNPeujwOLg/2Q=="
},
"lru-cache": {
"version": "6.0.0",
"resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz",
"integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==",
"requires": {
"yallist": "^4.0.0"
}
"version": "8.0.5",
"resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-8.0.5.tgz",
"integrity": "sha512-MhWWlVnuab1RG5/zMRRcVGXZLCXrZTgfwMikgzCegsPnG62yDQo5JnqKkrK4jO5iKqDAZGItAqN5CtKBCBWRUA=="
},
"ms": {
"version": "2.1.2",
@ -390,49 +375,35 @@
"integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w=="
},
"mysql2": {
"version": "2.3.3",
"resolved": "https://registry.npmjs.org/mysql2/-/mysql2-2.3.3.tgz",
"integrity": "sha512-wxJUev6LgMSgACDkb/InIFxDprRa6T95+VEoR+xPvtngtccNH2dGjEB/fVZ8yg1gWv1510c9CvXuJHi5zUm0ZA==",
"version": "3.9.3",
"resolved": "https://registry.npmjs.org/mysql2/-/mysql2-3.9.3.tgz",
"integrity": "sha512-+ZaoF0llESUy7BffccHG+urErHcWPZ/WuzYAA9TEeLaDYyke3/3D+VQDzK9xzRnXpd0eMtRf0WNOeo4Q1Baung==",
"requires": {
"denque": "^2.0.1",
"denque": "^2.1.0",
"generate-function": "^2.3.1",
"iconv-lite": "^0.6.3",
"long": "^4.0.0",
"lru-cache": "^6.0.0",
"named-placeholders": "^1.1.2",
"long": "^5.2.1",
"lru-cache": "^8.0.0",
"named-placeholders": "^1.1.3",
"seq-queue": "^0.0.5",
"sqlstring": "^2.3.2"
}
},
"named-placeholders": {
"version": "1.1.2",
"resolved": "https://registry.npmjs.org/named-placeholders/-/named-placeholders-1.1.2.tgz",
"integrity": "sha512-wiFWqxoLL3PGVReSZpjLVxyJ1bRqe+KKJVbr4hGs1KWfTZTQyezHFBbuKj9hsizHyGV2ne7EMjHdxEGAybD5SA==",
"version": "1.1.3",
"resolved": "https://registry.npmjs.org/named-placeholders/-/named-placeholders-1.1.3.tgz",
"integrity": "sha512-eLoBxg6wE/rZkJPhU/xRX1WTpkFEwDJEN96oxFrTsqBdbT5ec295Q+CoHrL9IT0DipqKhmGcaZmwOt8OON5x1w==",
"requires": {
"lru-cache": "^4.1.3"
"lru-cache": "^7.14.1"
},
"dependencies": {
"lru-cache": {
"version": "4.1.5",
"resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-4.1.5.tgz",
"integrity": "sha512-sWZlbEP2OsHNkXrMl5GYk/jKk70MBng6UU4YI/qGDYbgf6YbP4EvmqISbXCoJiRKs+1bSpFHVgQxvJ17F2li5g==",
"requires": {
"pseudomap": "^1.0.2",
"yallist": "^2.1.2"
}
},
"yallist": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/yallist/-/yallist-2.1.2.tgz",
"integrity": "sha512-ncTzHV7NvsQZkYe1DW7cbDLm0YpzHmZF5r/iyP3ZnQtMiJ+pjzisCiMNI+Sj+xQF5pXhSHxSB3uDbsBTzY/c2A=="
"version": "7.18.3",
"resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-7.18.3.tgz",
"integrity": "sha512-jumlc0BIUrS3qJGgIkWZsyfAM7NCWiBcCDhnd+3NNM5KbBmLTgHVfWBcg6W+rLUsIpzpERPsvwUP7CckAQSOoA=="
}
}
},
"pseudomap": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/pseudomap/-/pseudomap-1.0.2.tgz",
"integrity": "sha512-b/YwNhb8lk1Zz2+bXXpS/LK9OisiZZ1SNsSLxN1x2OXVEhW2Ckr/7mWE5vrC1ZTiJlD9g19jWszTmJsB+oEpFQ=="
},
"querystringify": {
"version": "2.2.0",
"resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz",
@ -496,11 +467,6 @@
"requires-port": "^1.0.0"
}
},
"yallist": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz",
"integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A=="
},
"zongji": {
"version": "file:../zongji"
}

View File

@ -1,20 +1,20 @@
{
"name": "mycdc",
"version": "0.0.5",
"version": "0.0.6",
"author": "Verdnatura Levante SL",
"description": "Asynchronous DB calculations reading the binary log",
"license": "GPL-3.0",
"repository": {
"type": "git",
"url": "https://gitea.verdnatura.es/verdnatura/mycdc"
"type": "git",
"url": "https://gitea.verdnatura.es/verdnatura/mycdc"
},
"engines": {
"node": ">=20"
"node": ">=20"
},
"dependencies": {
"amqplib": "^0.10.3",
"colors": "^1.4.0",
"mysql2": "^2.3.3",
"mysql2": "^3.9.3",
"require-yaml": "^0.0.1",
"zongji": "file:../zongji"
}

101
queue.js
View File

@ -1,94 +1,17 @@
module.exports = class Queue {
constructor(consumer, name, conf) {
Object.assign(this, {consumer, name, conf});
this.reset();
}
async consume() {
if (this.conf.mode !== 'fk') {
console.warn(`Ignoring queue '${this.name} with unknown mode '${this.conf.mode}'`);
return;
constructor(consumer, name, conf) {
Object.assign(this, {consumer, name, conf});
}
const channel = await this.consumer.amqpConn.createChannel();
channel.prefetch(this.conf.amqpPrefetch);
await channel.assertQueue(this.name, {
durable: true
});
this.channel = channel;
await channel.consume(this.name,
msg => this.onConsume(msg));
this.flush();
}
reset() {
this.lastMessage = null;
this.nMessages = 0;
this.ids = new Set();
}
flush(flushInterval) {
if (this.timeout) {
clearTimeout(this.timeout);
this.timeout = null;
async consume() {
const channel = await this.consumer.amqpConn.createChannel();
channel.prefetch(this.conf.amqpPrefetch);
await channel.assertQueue(this.name, {
durable: true
});
this.channel = channel;
await channel.consume(this.name,
msg => this.onConsume(msg));
}
this.timeout = setTimeout(
() => this.onFlushTimeout(), flushInterval);
}
async onFlushTimeout() {
const consumer = this.consumer;
if (this.ids.size) {
if (consumer.conf.debug)
console.debug('Flush:'.blue, this.name.yellow, this.ids);
const ids = Array.from(this.ids);
const lastMessage = this.lastMessage;
this.reset();
try {
for (const id of ids) {
const sql = consumer.db.format(this.conf.query, id);
consumer.debug('SQL', sql);
if (!consumer.conf.testMode)
await consumer.db.query(sql);
}
await this.channel.ack(lastMessage, true);
} catch(err) {
await this.channel.nack(lastMessage, true);
console.error(err);
}
}
this.flush(this.conf.flushInterval);
}
async onConsume(msg) {
const consumer = this.consumer;
const data = JSON.parse(msg.content.toString());
if (consumer.conf.debug)
console.debug('Message:'.blue, this.name.yellow, data.table);
const ids = this.ids;
const key = this.conf.includeSchema[data.schema][data.table].key;
if (data.eventName === 'updaterows') {
for (const row of data.rows) {
ids.add(row.before[key]);
ids.add(row.after[key]);
}
} else
for (const row of data.rows)
ids.add(row[key]);
this.nMessages++;
this.lastMessage = msg;
if (this.nMessages == consumer.conf.amqpPrefetch)
this.flush();
}
}

86
queues/queue-fk.js Normal file
View File

@ -0,0 +1,86 @@
const Queue = require('../queue');
module.exports = class QueueFk extends Queue {
constructor(consumer, name, conf) {
super(consumer, name, conf);
this.reset();
}
async consume() {
await super.consume();
this.flush();
}
reset() {
this.lastMessage = null;
this.nMessages = 0;
this.ids = new Set();
}
flush(flushInterval) {
if (this.timeout) {
clearTimeout(this.timeout);
this.timeout = null;
}
this.timeout = setTimeout(
() => this.onFlushTimeout(), flushInterval);
}
async onFlushTimeout() {
const consumer = this.consumer;
if (this.ids.size) {
if (consumer.conf.debug)
console.debug('Flush:'.blue, this.name.yellow, this.ids);
const ids = Array.from(this.ids);
const lastMessage = this.lastMessage;
this.reset();
try {
for (const id of ids) {
const params = {id};
//const sql = consumer.db.format(this.conf.query, params);
const sql = this.conf.query;
if (consumer.conf.debug)
console.debug('SQL:'.blue, sql, params);
if (!consumer.conf.testMode)
await consumer.db.query(sql, params);
}
await this.channel.ack(lastMessage, true);
} catch(err) {
await this.channel.nack(lastMessage, true);
console.error(err);
}
}
this.flush(this.conf.flushInterval);
}
async onConsume(msg) {
const consumer = this.consumer;
const data = JSON.parse(msg.content.toString());
if (consumer.conf.debug)
console.debug('Message:'.blue, this.name.yellow, data.table);
const ids = this.ids;
const key = this.conf.includeSchema[data.schema][data.table].key;
if (data.eventName === 'updaterows') {
for (const row of data.rows) {
ids.add(row.before[key]);
ids.add(row.after[key]);
}
} else
for (const row of data.rows)
ids.add(row[key]);
this.nMessages++;
this.lastMessage = msg;
if (this.nMessages == consumer.conf.amqpPrefetch)
this.flush();
}
}

41
queues/queue-id.js Normal file
View File

@ -0,0 +1,41 @@
const Queue = require('../queue');
module.exports = class QueueId extends Queue {
async onConsume(msg) {
const {consumer, conf} = this;
const data = JSON.parse(msg.content.toString());
if (consumer.conf.debug)
console.debug('Message:'.blue, this.name.yellow, data.table);
const table = conf.includeSchema[data.schema][data.table]
const {query, key} = table;
const params = {
schema: data.schema,
table: data.table,
};
async function dbQuery(id) {
const myParams = Object.assign({id}, params);
if (consumer.conf.debug)
console.debug('SQL:'.blue, query, myParams);
if (!consumer.conf.testMode)
await consumer.db.query(query, myParams);
}
const keyChanged = data.cols.indexOf(key) !== -1;
if (data.eventName === 'updaterows') {
for (const row of data.rows) {
if (keyChanged)
await dbQuery(row.before[key]);
await dbQuery(row.after[key]);
}
} else
for (const row of data.rows)
await dbQuery(row[key]);
await this.channel.ack(msg);
}
}