Multi-queue alpha, yml config, code clean, refactor

This commit is contained in:
Juan Ferrer 2022-10-24 18:11:25 +02:00
parent 916e92c940
commit 77270ed10d
9 changed files with 353 additions and 204 deletions

View File

@ -1,57 +0,0 @@
{
"debug": true,
"testMode": false,
"db": {
"host": "localhost",
"port": 3306,
"user": "zongji",
"password": "password",
"database": "util"
},
"consumerDb": {
"host": "localhost",
"port": 3306,
"user": "zongji",
"password": "password",
"database": "util"
},
"amqp": "amqp://user:password@localhost:5672",
"includeEvents": [
"rotate",
"tablemap",
"writerows",
"updaterows",
"deleterows"
],
"pingInterval": 60,
"flushInterval": 5000,
"queue": "orderRecalc",
"addQuery": "INSERT INTO `hedera`.`orderRecalc` (`orderFk`) VALUES ?",
"recalcQuery": "CALL `hedera`.`order_recalc`(?)",
"includeSchema": {
"hedera": {
"order": {
"fk": "id",
"events": ["updaterows"],
"columns": [
"id",
"address_id",
"company_id",
"date_send",
"customer_id"
]
},
"orderRow": {
"fk": "orderFk",
"columns": [
"id",
"orderFk",
"itemFk",
"warehouseFk",
"shipment",
"amount"
]
}
}
}
}

66
config.yml Normal file
View File

@ -0,0 +1,66 @@
debug: true
testMode: true
db:
host: localhost
port: 3306
user: zongji
password: password
database: util
consumerDb:
host: localhost
port: 3306
user: zongji
password: password
database: util
amqp: amqp://user:password@localhost:5672
pingInterval: 60
flushInterval: 5000
queues:
orderTotal:
query: CALL hedera.order_recalc(?)
mode: fk
includeSchema:
hedera:
order:
fk: id
events:
- updaterows
columns:
- id
- address_id
- company_id
- date_send
- customer_id
orderRow:
fk: orderFk
table: order
columns:
- id
- orderFk
- itemFk
- warehouseFk
- shipment
- amount
comparative:
query: CALL vn.comparative_refresh(?table, ?id, ?data)
mode: changes
includeSchema:
vn:
ticket:
id: id
events:
- updaterows
columns:
- id
- shipped
- warehouseFk
- isDeleted
sale:
id: id
columns:
- id
- ticketFk
- itemFk
- quantity
- price

View File

@ -1,26 +1,21 @@
require('require-yaml');
require('colors');
const fs = require('fs');
const path = require('path');
const defaultConfig = require('./config.json');
const mysql = require('mysql2/promise');
const amqp = require('amqplib');
require('colors');
const config = Object.assign({}, defaultConfig);
const localPath = path.join(__dirname, 'config.local.json');
if (fs.existsSync(localPath)) {
const localConfig = require(localPath);
Object.assign(config, localConfig);
}
class Consumer {
constructor(config) {
this.config = config;
}
async start() {
if (this.config.testMode)
const defaultConfig = require('./config.yml');
const config = this.config = Object.assign({}, defaultConfig);
const localPath = path.join(__dirname, 'config.local.yml');
if (fs.existsSync(localPath)) {
const localConfig = require(localPath);
Object.assign(config, localConfig);
}
if (config.testMode)
console.log('Test mode enabled, just logging queries to console.');
console.log('Starting process.');
@ -35,20 +30,25 @@ class Consumer {
}
async init() {
const config = this.config;
this.onErrorListener = err => this.onError(err);
this.db = await mysql.createConnection(this.config.consumerDb);
this.db = await mysql.createConnection(config.consumerDb);
this.db.on('error', this.onErrorListener);
this.pingInterval = setInterval(
() => this.connectionPing(), this.config.pingInterval * 1000);
() => this.connectionPing(), config.pingInterval * 1000);
this.consumer = await amqp.connect(this.config.amqp);
this.consumer = await amqp.connect(config.amqp);
this.channel = await this.consumer.createChannel();
this.channel.assertQueue(this.config.queue, {
durable: true
});
this.channel.consume(this.config.queue, msg => this.onConsume(msg));
for (const queueName in config.queues) {
await this.channel.assertQueue(queueName, {
durable: true
});
await this.channel.consume(queueName,
msg => this.onConsume(msg, queueName));
}
}
async end(silent) {
@ -72,18 +72,28 @@ class Consumer {
await this.db.ping();
}
async onConsume(msg) {
const fks = JSON.parse(msg.content.toString());
if (this.config.debug)
console.debug('RabbitMQ message'.blue, fks);
async onConsume(msg, queueName) {
const config = this.config;
for (const fk of fks) {
if (this.config.debug)
console.debug('Query'.blue, this.config.recalcQuery.yellow);
await this.db.query(this.config.recalcQuery, fk);
const data = JSON.parse(msg.content.toString());
if (config.debug)
console.debug('Message:'.blue, queueName.yellow, fks);
const queue = config.queues[queueName];
const query = queue.query;
if (!query) return;
if (!config.testMode)
switch(queue.mode) {
case 'fk':
for (const fk of data.fks)
await this.db.query(query, fk);
break;
case 'changes':
break;
}
this.channel.ack(msg);
await this.channel.ack(msg);
}
async tryRestart() {
@ -118,10 +128,8 @@ class Consumer {
}
}
let consumer;
async function main() {
consumer = new Consumer(config)
const consumer = new Consumer()
await consumer.start();
process.on('SIGINT', async function() {

View File

@ -1,20 +1,7 @@
const fs = require('fs');
const path = require('path');
const defaultConfig = require('./config.json');
const MyCDC = require('./mycdc');
const config = Object.assign({}, defaultConfig);
const localPath = path.join(__dirname, 'config.local.json');
if (fs.existsSync(localPath)) {
const localConfig = require(localPath);
Object.assign(config, localConfig);
}
let mycdc;
async function main() {
mycdc = new MyCDC(config)
const mycdc = new MyCDC()
await mycdc.start();
process.on('SIGINT', async function() {

290
mycdc.js
View File

@ -1,62 +1,107 @@
require('require-yaml');
require('colors');
const fs = require('fs');
const path = require('path');
const ZongJi = require('./zongji');
const mysql = require('mysql2/promise');
const amqp = require('amqplib');
require('colors');
const allEvents = new Set([
'writerows',
'updaterows',
'deleterows'
]);
const allEvents = [
'writerows',
'updaterows',
'deleterows'
];
module.exports = class MyCDC {
constructor(config) {
this.config = config;
constructor() {
this.running = false;
this.filename = null;
this.position = null;
this.schemaMap = new Map();
this.fks = new Set();
const includeSchema = {};
for (const schemaName in this.config.includeSchema) {
const schema = this.config.includeSchema[schemaName];
const tables = [];
const tableMap = new Map();
for (const tableName in schema) {
const table = schema[tableName];
tables.push(tableName);
const tableInfo = {
events: allEvents,
columns: true,
fk: 'id'
};
tableMap.set(tableName, tableInfo);
if (typeof table === 'object') {
if (Array.isArray(table.events))
tableInfo.events = new Set(table.events);
if (Array.isArray(table.columns))
tableInfo.columns = new Set(table.columns);
if (table.fk)
tableInfo.fk = table.fk;
}
}
includeSchema[schemaName] = tables;
this.schemaMap.set(schemaName, tableMap);
}
this.opts = {
includeEvents: this.config.includeEvents,
includeSchema
};
this.queues = {};
}
async start() {
if (this.config.testMode)
const defaultConfig = require('./config.yml');
const config = this.config = Object.assign({}, defaultConfig);
const localPath = path.join(__dirname, 'config.local.yml');
if (fs.existsSync(localPath)) {
const localConfig = require(localPath);
Object.assign(config, localConfig);
}
const queues = config.queues;
for (const queueName in queues) {
const includeSchema = queues[queueName].includeSchema;
for (const schemaName in includeSchema) {
let tableMap = this.schemaMap.get(schemaName);
if (!tableMap) {
tableMap = new Map();
this.schemaMap.set(schemaName, tableMap);
}
const schema = includeSchema[schemaName];
for (const tableName in schema) {
const table = schema[tableName];
//if (typeof table !== 'object') continue;
let tableInfo = tableMap.get(tableName);
if (!tableInfo) {
tableInfo = {
queues: new Map(),
events: new Map(),
columns: new Map(),
fk: 'id'
};
tableMap.set(tableName, tableInfo);
}
tableInfo.queues.set(queueName, table);
const events = table.events || allEvents;
for (const event of events) {
let eventInfo = tableInfo.events.get(event);
if (!eventInfo) {
eventInfo = [];
tableInfo.events.set(event, eventInfo);
}
eventInfo.push(queueName);
}
const columns = table.columns;
for (const column of columns) {
let columnInfo = tableInfo.columns.get(column);
if (!columnInfo) {
columnInfo = [];
tableInfo.columns.set(column, columnInfo);
}
columnInfo.push(queueName);
}
if (table.id)
tableInfo.id = table.id;
}
this.schemaMap.set(schemaName, tableMap);
}
}
const includeSchema = {};
for (const [schemaName, tableMap] of this.schemaMap)
includeSchema[schemaName] = Array.from(tableMap.keys());
this.opts = {
includeEvents: [
'rotate',
'tablemap',
'writerows',
'updaterows',
'deleterows'
],
includeSchema
};
if (config.testMode)
console.log('Test mode enabled, just logging queries to console.');
console.log('Starting process.');
@ -71,25 +116,29 @@ module.exports = class MyCDC {
}
async init() {
this.debug('DbAsync', 'Initializing.');
const config = this.config;
this.debug('MyCDC', 'Initializing.');
this.onErrorListener = err => this.onError(err);
// DB connection
this.db = await mysql.createConnection(this.config.db);
this.db = await mysql.createConnection(config.db);
this.db.on('error', this.onErrorListener);
// RabbitMQ
this.publisher = await amqp.connect(this.config.amqp);
this.publisher = await amqp.connect(config.amqp);
this.channel = await this.publisher.createChannel();
this.channel.assertQueue(this.config.queue, {
durable: true
});
for (const queueName in config.queues) {
await this.channel.assertQueue(queueName, {
durable: true
});
}
// Zongji
const zongji = new ZongJi(this.config.db);
const zongji = new ZongJi(config.db);
this.zongji = zongji;
this.onBinlogListener = evt => this.onBinlog(evt);
@ -97,7 +146,7 @@ module.exports = class MyCDC {
const [res] = await this.db.query(
'SELECT `logName`, `position` FROM `binlogQueue` WHERE code = ?',
[this.config.queue]
[config.queue]
);
if (res.length) {
const [row] = res;
@ -132,21 +181,21 @@ module.exports = class MyCDC {
this.zongji.on('error', this.onErrorListener);
this.flushInterval = setInterval(
() => this.flushQueue(), this.config.flushInterval);
() => this.flushQueue(), config.flushInterval);
this.pingInterval = setInterval(
() => this.connectionPing(), this.config.pingInterval * 1000);
() => this.connectionPing(), config.pingInterval * 1000);
// Summary
this.running = true;
this.debug('DbAsync', 'Initialized.');
this.debug('MyCDC', 'Initialized.');
}
async end(silent) {
const zongji = this.zongji;
if (!zongji) return;
this.debug('DbAsync', 'Ending.');
this.debug('MyCDC', 'Ending.');
// Zongji
@ -194,7 +243,7 @@ module.exports = class MyCDC {
// Summary
this.debug('DbAsync', 'Ended.');
this.debug('MyCDC', 'Ended.');
}
async tryRestart() {
@ -242,56 +291,107 @@ module.exports = class MyCDC {
const tableInfo = tableMap.get(table.tableName);
if (!tableInfo) return;
if (!tableInfo.events.has(eventName)) return;
const queueNames = tableInfo.events.get(eventName);
if (!queueNames) return;
let column;
const rows = evt.rows;
const queues = this.config.queues;
const tableQueues = tableInfo.queues;
const fks = new Set();
const changes = new Map();
for (const queueName of queueNames) {
const change = {
mode: queues[queueName].mode
};
changes.set(queueName, change);
switch(change.mode) {
case 'fk':
change.fks = new Set();
break;
case 'changes':
change.rows = {};
break;
}
}
function addChange(row, queueNames) {
for (const queueName of queueNames) {
const queueInfo = tableQueues.get(queueName);
const change = changes.get(queueName);
switch(change.mode) {
case 'fk':
change.fks.add(row[queueInfo.fk]);
break;
case 'changes':
const queueRow = {};
for (const column of queueInfo.columns)
if (row[column] !== undefined)
queueRow[column] = row[column];
change.rows[row[queueInfo.id]] = queueRow;
break;
}
}
}
const columnMap = tableInfo.columns;
const columns = columnMap.keys();
if (eventName === 'updaterows') {
if (tableInfo.columns !== true) {
let changes = false;
for (const row of rows) {
const after = row.after;
for (const col in after) {
if (tableInfo.columns.has(col) && !equals(after[col], row.before[col])) {
fks.add(after[tableInfo.fk]);
changes = true;
if (!column) column = col;
break;
}
}
const changedQueues = new Set();
for (const row of rows) {
changedQueues.clear();
const after = row.after;
for (const col of columns) {
if (after[col] === undefined
|| equals(after[col], row.before[col]))
continue;
for (const queue of columnMap.get(col))
changedQueues.add(queue);
if (changedQueues.size === queueNames.length)
break;
}
if (!changes) return;
} else {
for (const row of rows)
fks.add(row.after[tableInfo.fk]);
if (changedQueues.size)
addChange(after, changedQueues);
}
if (!changes) return;
} else {
for (const row of rows)
fks.add(row[tableInfo.fk]);
addChange(row, queueNames);
}
if (fks.size) {
const data = JSON.stringify(Array.from(fks));
this.channel.sendToQueue(this.config.queue,
Buffer.from(data));
this.debug('Queued', data);
}
for (const [queueName, change] of changes) {
const jsonData = {
eventName,
table: table.tableName,
schema: table.parentSchema,
mode: change.mode
};
const row = eventName === 'updaterows'
? rows[0].after
: rows[0];
if (this.config.debug) {
console.debug(`[${eventName}] ${table.tableName}: ${rows.length}`);
console.debug(` ${tableInfo.fk}: ${row[tableInfo.fk]}`);
if (column) {
let before = formatValue(rows[0].before[column]);
let after = formatValue(rows[0].after[column]);
console.debug(` ${column}: ${before} <- ${after}`);
let nChanges;
switch(change.mode) {
case 'fk':
jsonData.fks = Array.from(change.fks);
nChanges = change.fks.size;
break;
case 'changes':
jsonData.rows = change.rows;
nChanges = change.rows.length;
break;
}
if (!nChanges) continue;
const data = JSON.stringify(jsonData);
this.channel.sendToQueue(queueName,
Buffer.from(data), {persistent: true});
console.debug('Queued'.blue, queueName.yellow, `[${eventName}] ${table.tableName}`);
}
this.position = evt.nextPosition;

48
package-lock.json generated
View File

@ -1,5 +1,5 @@
{
"name": "db-async",
"name": "mycdc",
"lockfileVersion": 2,
"requires": true,
"packages": {
@ -8,6 +8,7 @@
"amqplib": "^0.10.3",
"colors": "^1.4.0",
"mysql2": "^2.3.3",
"require-yaml": "^0.0.1",
"zongji": "file:../zongji"
}
},
@ -39,6 +40,11 @@
"node": ">=10"
}
},
"node_modules/argparse": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/argparse/-/argparse-2.0.1.tgz",
"integrity": "sha512-8+9WqebbFzpX9OR+Wa6O29asIogeRMzcGtAINdpMHHyAg10f05aSFVBbcEqGf/PXw1EjAZ+q2/bEBg3DvurK3Q=="
},
"node_modules/buffer-more-ints": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/buffer-more-ints/-/buffer-more-ints-1.0.0.tgz",
@ -115,6 +121,17 @@
"resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz",
"integrity": "sha512-D2S+3GLxWH+uhrNEcoh/fnmYeP8E8/zHl644d/jdA0g2uyXvy3sb0qxotE+ne0LtccHknQzWwZEzhak7oJ0COQ=="
},
"node_modules/js-yaml": {
"version": "4.1.0",
"resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-4.1.0.tgz",
"integrity": "sha512-wpxZs9NoxZaJESJGIZTyDEaYpl0FKSA+FB9aJiyemKhMwkxQg63h4T1KJgUGHpTqPDNRcmmYLugrRjJlBtWvRA==",
"dependencies": {
"argparse": "^2.0.1"
},
"bin": {
"js-yaml": "bin/js-yaml.js"
}
},
"node_modules/long": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/long/-/long-4.0.0.tgz",
@ -200,6 +217,14 @@
"string_decoder": "~0.10.x"
}
},
"node_modules/require-yaml": {
"version": "0.0.1",
"resolved": "https://registry.npmjs.org/require-yaml/-/require-yaml-0.0.1.tgz",
"integrity": "sha512-M6eVEgLPRbeOhgSCnOTtdrOOEQzbXRchg24Xa13c39dMuraFKdI9emUo97Rih0YEFzSICmSKg8w4RQp+rd9pOQ==",
"dependencies": {
"js-yaml": ""
}
},
"node_modules/requires-port": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz",
@ -274,6 +299,11 @@
"url-parse": "~1.5.10"
}
},
"argparse": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/argparse/-/argparse-2.0.1.tgz",
"integrity": "sha512-8+9WqebbFzpX9OR+Wa6O29asIogeRMzcGtAINdpMHHyAg10f05aSFVBbcEqGf/PXw1EjAZ+q2/bEBg3DvurK3Q=="
},
"buffer-more-ints": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/buffer-more-ints/-/buffer-more-ints-1.0.0.tgz",
@ -333,6 +363,14 @@
"resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz",
"integrity": "sha512-D2S+3GLxWH+uhrNEcoh/fnmYeP8E8/zHl644d/jdA0g2uyXvy3sb0qxotE+ne0LtccHknQzWwZEzhak7oJ0COQ=="
},
"js-yaml": {
"version": "4.1.0",
"resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-4.1.0.tgz",
"integrity": "sha512-wpxZs9NoxZaJESJGIZTyDEaYpl0FKSA+FB9aJiyemKhMwkxQg63h4T1KJgUGHpTqPDNRcmmYLugrRjJlBtWvRA==",
"requires": {
"argparse": "^2.0.1"
}
},
"long": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/long/-/long-4.0.0.tgz",
@ -411,6 +449,14 @@
"string_decoder": "~0.10.x"
}
},
"require-yaml": {
"version": "0.0.1",
"resolved": "https://registry.npmjs.org/require-yaml/-/require-yaml-0.0.1.tgz",
"integrity": "sha512-M6eVEgLPRbeOhgSCnOTtdrOOEQzbXRchg24Xa13c39dMuraFKdI9emUo97Rih0YEFzSICmSKg8w4RQp+rd9pOQ==",
"requires": {
"js-yaml": ""
}
},
"requires-port": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz",

View File

@ -3,6 +3,7 @@
"amqplib": "^0.10.3",
"colors": "^1.4.0",
"mysql2": "^2.3.3",
"require-yaml": "^0.0.1",
"zongji": "file:../zongji"
}
}

View File

@ -10,5 +10,3 @@ CREATE USER 'zongji'@'%' IDENTIFIED BY 'password';
GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT ON *.* TO 'zongji'@'%';
GRANT INSERT, DELETE ON `util`.* TO 'zongji'@'%';
GRANT INSERT ON `hedera`.`orderRecalc` TO 'zongji'@'%';
GRANT INSERT ON `vn`.`ticketRecalc` TO 'zongji'@'%';