This commit is contained in:
Juan Ferrer 2022-10-25 13:20:22 +02:00
parent 77270ed10d
commit 4cc2dc067a
5 changed files with 138 additions and 47 deletions

2
.gitignore vendored
View File

@ -1,3 +1,3 @@
node_modules
zongji
config.local.json
config.local.yml

View File

@ -1,6 +1,7 @@
debug: true
testMode: true
testMode: false
code: mycdc
db:
host: localhost
port: 3306
@ -15,7 +16,7 @@ consumerDb:
database: util
amqp: amqp://user:password@localhost:5672
pingInterval: 60
flushInterval: 5000
flushInterval: 10
queues:
orderTotal:
query: CALL hedera.order_recalc(?)
@ -23,18 +24,17 @@ queues:
includeSchema:
hedera:
order:
fk: id
events:
- updaterows
key: id
columns:
- id
- address_id
- company_id
- date_send
- customer_id
events:
- updaterows
orderRow:
fk: orderFk
table: order
key: orderFk
columns:
- id
- orderFk
@ -42,25 +42,95 @@ queues:
- warehouseFk
- shipment
- amount
comparative:
query: CALL vn.comparative_refresh(?table, ?id, ?data)
mode: changes
ticketTotal:
query: CALL vn.ticket_recalc(?)
mode: fk
includeSchema:
vn:
ticket:
id: id
events:
- updaterows
key: id
columns:
- id
- shipped
- warehouseFk
- isDeleted
- clientFk
events:
- updaterows
sale:
id: id
key: ticketFk
columns:
- id
- ticketFk
- itemFk
- quantity
- price
comparative:
query: CALL vn.comparative_refresh(?, ?, ?)
mode: changes
includeSchema:
vn:
ticket:
key: id
columns:
- id
- shipped
- warehouseFk
- isDeleted
events:
- updaterows
sale:
key: id
columns:
- id
- ticketFk
- itemFk
- quantity
- price
stock:
query: CALL stock.available_refresh(?, ?, ?)
mode: changes
includeSchema:
vn:
ticket:
key: id
columns:
- id
- shipped
- warehouseFk
- isDeleted
events:
- updaterows
sale:
key: id
columns:
- id
- ticketFk
- itemFk
- quantity
travel:
key: id
columns:
- id
- shipped
- landing
- warehouseInFk
- warehouseOutFk
- isDelivered
- isReceived
events:
- updaterows
entry:
key: id
columns:
- id
- travelFk
events:
- updaterows
buy:
key: id
columns:
- id
- entryFk
- itemFk
- quantity
- life

View File

@ -4,6 +4,8 @@ const fs = require('fs');
const path = require('path');
const mysql = require('mysql2/promise');
const amqp = require('amqplib');
const {cpus} = require('os');
class Consumer {
async start() {
@ -21,6 +23,8 @@ class Consumer {
console.log('Starting process.');
await this.init();
console.log('Process started.');
await this.consumeQueues();
}
async stop() {
@ -33,16 +37,20 @@ class Consumer {
const config = this.config;
this.onErrorListener = err => this.onError(err);
this.db = await mysql.createConnection(config.consumerDb);
this.db.on('error', this.onErrorListener);
const dbConfig = Object.assign({
connectionLimit: cpus().length
}, config.consumerDb);
this.pingInterval = setInterval(
() => this.connectionPing(), config.pingInterval * 1000);
this.db = await mysql.createPool(dbConfig);
this.db.on('error', this.onErrorListener);
this.consumer = await amqp.connect(config.amqp);
this.channel = await this.consumer.createChannel();
this.channel.prefetch(1);
}
for (const queueName in config.queues) {
async consumeQueues() {
for (const queueName in this.config.queues) {
await this.channel.assertQueue(queueName, {
durable: true
});
@ -52,8 +60,6 @@ class Consumer {
}
async end(silent) {
clearInterval(this.pingInterval);
await this.consumer.close();
this.db.off('error', this.onErrorListener);
@ -67,29 +73,41 @@ class Consumer {
}
}
async connectionPing() {
this.debug('Ping', 'Sending ping to database.');
await this.db.ping();
}
async onConsume(msg, queueName) {
const config = this.config;
const data = JSON.parse(msg.content.toString());
if (config.debug)
console.debug('Message:'.blue, queueName.yellow, fks);
console.debug('Message:'.blue, queueName.yellow, data.table);
const queue = config.queues[queueName];
const query = queue.query;
let query = queue.query;
if (!query) return;
if (!config.testMode)
// XXX: Testing
//query = 'SELECT 1 sleep';
switch(queue.mode) {
case 'fk':
for (const fk of data.fks)
for (const fk of data.fks) {
const sql = this.db.format(query, fk);
this.debug('SQL', sql);
if (!config.testMode)
await this.db.query(query, fk);
}
break;
case 'changes':
const queueTable = queue.includeSchema[data.schema][data.table];
for (const row of data.rows) {
const sql = this.db.format(query, [
data.table,
row[queueTable.key],
JSON.stringify(row)
]);
this.debug('SQL', sql);
if (!config.testMode)
await this.db.query(query, row);
}
break;
}

View File

@ -18,7 +18,6 @@ module.exports = class MyCDC {
this.filename = null;
this.position = null;
this.schemaMap = new Map();
this.fks = new Set();
this.queues = {};
}
@ -146,7 +145,7 @@ module.exports = class MyCDC {
const [res] = await this.db.query(
'SELECT `logName`, `position` FROM `binlogQueue` WHERE code = ?',
[config.queue]
[config.code]
);
if (res.length) {
const [row] = res;
@ -181,7 +180,7 @@ module.exports = class MyCDC {
this.zongji.on('error', this.onErrorListener);
this.flushInterval = setInterval(
() => this.flushQueue(), config.flushInterval);
() => this.flushQueue(), config.flushInterval * 1000);
this.pingInterval = setInterval(
() => this.connectionPing(), config.pingInterval * 1000);
@ -310,26 +309,31 @@ module.exports = class MyCDC {
change.fks = new Set();
break;
case 'changes':
change.rows = {};
change.rows = [];
break;
}
}
function addChange(row, queueNames) {
function addChange(queueNames, row, old) {
for (const queueName of queueNames) {
const queueInfo = tableQueues.get(queueName);
const change = changes.get(queueName);
const key = row[queueInfo.key];
const oldKey = old ? old[queueInfo.key] : null;
switch(change.mode) {
case 'fk':
change.fks.add(row[queueInfo.fk]);
change.fks.add(key);
if (old && !equals(oldKey, key))
change.fks.add(oldKey);
break;
case 'changes':
const queueRow = {};
for (const column of queueInfo.columns)
if (row[column] !== undefined)
queueRow[column] = row[column];
change.rows[row[queueInfo.id]] = queueRow;
change.rows.push(queueRow);
break;
}
}
@ -357,12 +361,11 @@ module.exports = class MyCDC {
}
if (changedQueues.size)
addChange(after, changedQueues);
addChange(changedQueues, after, row.before);
}
if (!changes) return;
} else {
for (const row of rows)
addChange(row, queueNames);
addChange(queueNames, row);
}
for (const [queueName, change] of changes) {
@ -391,7 +394,7 @@ module.exports = class MyCDC {
this.channel.sendToQueue(queueName,
Buffer.from(data), {persistent: true});
console.debug('Queued'.blue, queueName.yellow, `[${eventName}] ${table.tableName}`);
console.debug('Queued:'.blue, `${queueName}:`.yellow, `${table.tableName}(${nChanges}) [${eventName}]`);
}
this.position = evt.nextPosition;
@ -403,9 +406,9 @@ module.exports = class MyCDC {
this.debug('Flush', `filename: ${this.filename}, position: ${this.position}`);
const replaceQuery =
'REPLACE INTO `binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?';
'REPLACE INTO `binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?';
if (!this.config.testMode)
await this.db.query(replaceQuery, [this.config.queue, this.filename, this.position]);
await this.db.query(replaceQuery, [this.config.code, this.filename, this.position]);
this.flushed = true;
}

View File

@ -9,4 +9,4 @@ docker run \
-e RABBITMQ_DEFAULT_PASS=password \
-p 5672:5672 \
-p 8080:15672 \
rabbitmq:3-management
rabbitmq:3.11.2-management