Refactor, fixes, featured

This commit is contained in:
Juan Ferrer 2022-11-06 13:00:55 +01:00
parent 4cc2dc067a
commit b9826d3e8e
9 changed files with 314 additions and 330 deletions

2
.gitignore vendored
View File

@ -1,3 +1,3 @@
node_modules
zongji
config.local.yml
config/*.local.yml

View File

@ -1,136 +0,0 @@
debug: true
testMode: false
code: mycdc
db:
host: localhost
port: 3306
user: zongji
password: password
database: util
consumerDb:
host: localhost
port: 3306
user: zongji
password: password
database: util
amqp: amqp://user:password@localhost:5672
pingInterval: 60
flushInterval: 10
queues:
orderTotal:
query: CALL hedera.order_recalc(?)
mode: fk
includeSchema:
hedera:
order:
key: id
columns:
- id
- address_id
- company_id
- date_send
- customer_id
events:
- updaterows
orderRow:
key: orderFk
columns:
- id
- orderFk
- itemFk
- warehouseFk
- shipment
- amount
ticketTotal:
query: CALL vn.ticket_recalc(?)
mode: fk
includeSchema:
vn:
ticket:
key: id
columns:
- id
- shipped
- warehouseFk
- clientFk
events:
- updaterows
sale:
key: ticketFk
columns:
- id
- ticketFk
- itemFk
- quantity
- price
comparative:
query: CALL vn.comparative_refresh(?, ?, ?)
mode: changes
includeSchema:
vn:
ticket:
key: id
columns:
- id
- shipped
- warehouseFk
- isDeleted
events:
- updaterows
sale:
key: id
columns:
- id
- ticketFk
- itemFk
- quantity
- price
stock:
query: CALL stock.available_refresh(?, ?, ?)
mode: changes
includeSchema:
vn:
ticket:
key: id
columns:
- id
- shipped
- warehouseFk
- isDeleted
events:
- updaterows
sale:
key: id
columns:
- id
- ticketFk
- itemFk
- quantity
travel:
key: id
columns:
- id
- shipped
- landing
- warehouseInFk
- warehouseOutFk
- isDelivered
- isReceived
events:
- updaterows
entry:
key: id
columns:
- id
- travelFk
events:
- updaterows
buy:
key: id
columns:
- id
- entryFk
- itemFk
- quantity
- life

12
config/consumer.yml Normal file
View File

@ -0,0 +1,12 @@
debug: false
testMode: false
amqp: amqp://user:password@localhost:5672
db:
host: localhost
port: 3306
user: zongji
password: password
database: util
pollQueues:
- queue1
- queue2

13
config/producer.yml Normal file
View File

@ -0,0 +1,13 @@
code: mycdc
debug: false
testMode: false
amqp: amqp://user:password@localhost:5672
pingInterval: 60
flushInterval: 10
db:
host: localhost
port: 3306
user: zongji
password: password
database: util

72
config/queues.yml Normal file
View File

@ -0,0 +1,72 @@
orderTotal:
query: CALL hedera.order_recalc(?)
mode: fk
flushInterval: 5000
includeSchema:
hedera:
order:
key: id
columns:
- id
- address_id
- company_id
- date_send
- customer_id
events:
- updaterows
orderRow:
key: orderFk
columns:
- id
- orderFk
- itemFk
- warehouseFk
- shipment
- amount
ticketTotal:
query: CALL vn.ticket_recalc(?)
mode: fk
flushInterval: 5000
includeSchema:
vn:
ticket:
key: id
columns:
- id
- shipped
- warehouseFk
- clientFk
events:
- updaterows
sale:
key: ticketFk
columns:
- id
- ticketFk
- itemFk
- quantity
- price
comparative:
query: CALL vn.itemComparative_refresh(?, ?, ?)
mode: changes
flushInterval: 5000
includeSchema:
vn:
ticket:
key: id
columns:
- id
- shipped
- warehouseFk
- isDeleted
- clientFk
events:
- updaterows
sale:
key: id
columns:
- id
- ticketFk
- itemFk
- quantity
- priceFixed

View File

@ -4,20 +4,20 @@ const fs = require('fs');
const path = require('path');
const mysql = require('mysql2/promise');
const amqp = require('amqplib');
const Queue = require('./queue');
const {cpus} = require('os');
class Consumer {
async start() {
const defaultConfig = require('./config.yml');
const config = this.config = Object.assign({}, defaultConfig);
const localPath = path.join(__dirname, 'config.local.yml');
const defaultConfig = require('./config/consumer.yml');
const conf = this.conf = Object.assign({}, defaultConfig);
const localPath = path.join(__dirname, 'config/consumer.local.yml');
if (fs.existsSync(localPath)) {
const localConfig = require(localPath);
Object.assign(config, localConfig);
Object.assign(conf, localConfig);
}
if (config.testMode)
if (conf.testMode)
console.log('Test mode enabled, just logging queries to console.');
console.log('Starting process.');
@ -34,33 +34,32 @@ class Consumer {
}
async init() {
const config = this.config;
const conf = this.conf;
this.onErrorListener = err => this.onError(err);
const dbConfig = Object.assign({
connectionLimit: cpus().length
}, config.consumerDb);
}, conf.db);
this.db = await mysql.createPool(dbConfig);
this.db.on('error', this.onErrorListener);
this.consumer = await amqp.connect(config.amqp);
this.channel = await this.consumer.createChannel();
this.channel.prefetch(1);
this.amqpConn = await amqp.connect(conf.amqp);
}
async consumeQueues() {
for (const queueName in this.config.queues) {
await this.channel.assertQueue(queueName, {
durable: true
});
await this.channel.consume(queueName,
msg => this.onConsume(msg, queueName));
const queuesConf = require('./config/queues.yml');
this.queues = {};
for (const queueName in queuesConf) {
const queue = new Queue(this, queueName, queuesConf[queueName]);
this.queues[queueName] = queue;
await queue.consume();
}
}
async end(silent) {
await this.consumer.close();
await this.amqpConn.close();
this.db.off('error', this.onErrorListener);
// FIXME: mysql2/promise bug, db.end() ends process
@ -73,47 +72,6 @@ class Consumer {
}
}
async onConsume(msg, queueName) {
const config = this.config;
const data = JSON.parse(msg.content.toString());
if (config.debug)
console.debug('Message:'.blue, queueName.yellow, data.table);
const queue = config.queues[queueName];
let query = queue.query;
if (!query) return;
// XXX: Testing
//query = 'SELECT 1 sleep';
switch(queue.mode) {
case 'fk':
for (const fk of data.fks) {
const sql = this.db.format(query, fk);
this.debug('SQL', sql);
if (!config.testMode)
await this.db.query(query, fk);
}
break;
case 'changes':
const queueTable = queue.includeSchema[data.schema][data.table];
for (const row of data.rows) {
const sql = this.db.format(query, [
data.table,
row[queueTable.key],
JSON.stringify(row)
]);
this.debug('SQL', sql);
if (!config.testMode)
await this.db.query(query, row);
}
break;
}
await this.channel.ack(msg);
}
async tryRestart() {
try {
await this.init();
@ -141,7 +99,7 @@ class Consumer {
}
debug(namespace, message) {
if (this.config.debug)
if (this.conf.debug)
console.debug(`${namespace}:`.blue, message.yellow);
}
}

218
mycdc.js
View File

@ -22,15 +22,16 @@ module.exports = class MyCDC {
}
async start() {
const defaultConfig = require('./config.yml');
const config = this.config = Object.assign({}, defaultConfig);
const localPath = path.join(__dirname, 'config.local.yml');
const defaultConfig = require('./config/producer.yml');
const conf = this.conf = Object.assign({}, defaultConfig);
const localPath = path.join(__dirname, 'config/producer.local.yml');
if (fs.existsSync(localPath)) {
const localConfig = require(localPath);
Object.assign(config, localConfig);
Object.assign(conf, localConfig);
}
this.queuesConf = require('./config/queues.yml');
const queues = config.queues;
const queues = this.queuesConf;
for (const queueName in queues) {
const includeSchema = queues[queueName].includeSchema;
for (const schemaName in includeSchema) {
@ -100,7 +101,7 @@ module.exports = class MyCDC {
includeSchema
};
if (config.testMode)
if (conf.testMode)
console.log('Test mode enabled, just logging queries to console.');
console.log('Starting process.');
@ -115,21 +116,28 @@ module.exports = class MyCDC {
}
async init() {
const config = this.config;
const conf = this.conf;
this.debug('MyCDC', 'Initializing.');
this.onErrorListener = err => this.onError(err);
// DB connection
this.db = await mysql.createConnection(config.db);
this.db = await mysql.createConnection(conf.db);
this.db.on('error', this.onErrorListener);
// RabbitMQ
this.publisher = await amqp.connect(config.amqp);
this.publisher = await amqp.connect(conf.amqp);
this.channel = await this.publisher.createChannel();
for (const queueName in config.queues) {
for (const tableMap of this.schemaMap.values()) {
for (const tableName of tableMap.keys()) {
await this.channel.assertExchange(tableName, 'headers', {
durable: true
});
}
}
for (const queueName in this.queuesConf) {
await this.channel.assertQueue(queueName, {
durable: true
});
@ -137,7 +145,7 @@ module.exports = class MyCDC {
// Zongji
const zongji = new ZongJi(config.db);
const zongji = new ZongJi(conf.db);
this.zongji = zongji;
this.onBinlogListener = evt => this.onBinlog(evt);
@ -145,7 +153,7 @@ module.exports = class MyCDC {
const [res] = await this.db.query(
'SELECT `logName`, `position` FROM `binlogQueue` WHERE code = ?',
[config.code]
[conf.code]
);
if (res.length) {
const [row] = res;
@ -180,9 +188,9 @@ module.exports = class MyCDC {
this.zongji.on('error', this.onErrorListener);
this.flushInterval = setInterval(
() => this.flushQueue(), config.flushInterval * 1000);
() => this.flushQueue(), conf.flushInterval * 1000);
this.pingInterval = setInterval(
() => this.connectionPing(), config.pingInterval * 1000);
() => this.connectionPing(), conf.pingInterval * 1000);
// Summary
@ -211,9 +219,9 @@ module.exports = class MyCDC {
console.log('zongji.connection.destroy');
});
await new Promise(resolve => {
zongji.ctrlConnection.query('KILL ' + zongji.connection.threadId,
zongji.ctrlConnection.query('KILL ?', [zongji.connection.threadId],
err => {
if (err && !silent)
if (err && err.code !== 'ER_NO_SUCH_THREAD' && !silent)
console.error(err);
resolve();
});
@ -274,145 +282,117 @@ module.exports = class MyCDC {
onBinlog(evt) {
//evt.dump();
const eventName = evt.getEventName();
if (eventName === 'tablemap') return;
let position = evt.nextPosition;
if (eventName === 'rotate') {
switch (eventName) {
case 'rotate':
this.filename = evt.binlogName;
this.position = evt.position;
console.log(`[${eventName}] filename: ${this.filename}`, `position: ${this.position}`);
return;
position = evt.position;
console.log(`[${eventName}] filename: ${this.filename}`, `position: ${this.position}, nextPosition: ${evt.nextPosition}`);
break;
case 'writerows':
case 'deleterows':
case 'updaterows':
this.onRowEvent(evt, eventName);
break;
}
this.position = position;
this.flushed = false;
}
onRowEvent(evt, eventName) {
const table = evt.tableMap[evt.tableId];
const tableName = table.tableName;
const tableMap = this.schemaMap.get(table.parentSchema);
if (!tableMap) return;
const tableInfo = tableMap.get(table.tableName);
const tableInfo = tableMap.get(tableName);
if (!tableInfo) return;
const queueNames = tableInfo.events.get(eventName);
if (!queueNames) return;
const queues = tableInfo.events.get(eventName);
if (!queues) return;
const rows = evt.rows;
const queues = this.config.queues;
const tableQueues = tableInfo.queues;
const isUpdate = eventName === 'updaterows';
let rows;
let cols;
const changes = new Map();
for (const queueName of queueNames) {
const change = {
mode: queues[queueName].mode
};
changes.set(queueName, change);
if (isUpdate) {
rows = [];
cols = new Set();
const columns = tableInfo.columns.keys();
switch(change.mode) {
case 'fk':
change.fks = new Set();
break;
case 'changes':
change.rows = [];
break;
}
}
function addChange(queueNames, row, old) {
for (const queueName of queueNames) {
const queueInfo = tableQueues.get(queueName);
const change = changes.get(queueName);
const key = row[queueInfo.key];
const oldKey = old ? old[queueInfo.key] : null;
switch(change.mode) {
case 'fk':
change.fks.add(key);
if (old && !equals(oldKey, key))
change.fks.add(oldKey);
break;
case 'changes':
const queueRow = {};
for (const column of queueInfo.columns)
if (row[column] !== undefined)
queueRow[column] = row[column];
change.rows.push(queueRow);
break;
}
}
}
const columnMap = tableInfo.columns;
const columns = columnMap.keys();
if (eventName === 'updaterows') {
const changedQueues = new Set();
for (const row of rows) {
changedQueues.clear();
for (const row of evt.rows) {
let nColsChanged = 0;
const after = row.after;
for (const col of columns) {
if (after[col] === undefined
|| equals(after[col], row.before[col]))
continue;
for (const queue of columnMap.get(col))
changedQueues.add(queue);
if (changedQueues.size === queueNames.length)
break;
if (after[col] !== undefined
&& !equals(after[col], row.before[col])) {
nColsChanged++;
cols.add(col);
}
if (changedQueues.size)
addChange(changedQueues, after, row.before);
}
} else {
for (const row of rows)
addChange(queueNames, row);
if (nColsChanged)
rows.push(row);
}
} else
rows = evt.rows;
for (const [queueName, change] of changes) {
const jsonData = {
if (!rows || !rows.length) return;
const data = {
eventName,
table: table.tableName,
table: tableName,
schema: table.parentSchema,
mode: change.mode
rows
};
let nChanges;
switch(change.mode) {
case 'fk':
jsonData.fks = Array.from(change.fks);
nChanges = change.fks.size;
break;
case 'changes':
jsonData.rows = change.rows;
nChanges = change.rows.length;
break;
const headers = {};
if (isUpdate) {
for (const col of cols)
headers[col] = true;
data.cols = Array.from(cols);
} else {
headers['z-'+ eventName] = true;
}
if (!nChanges) continue;
const options = {
persistent: true,
headers
};
const data = JSON.stringify(jsonData);
this.channel.sendToQueue(queueName,
Buffer.from(data), {persistent: true});
const jsonData = JSON.stringify(data);
this.channel.publish(tableName, '',
Buffer.from(jsonData), options);
console.debug('Queued:'.blue, `${queueName}:`.yellow, `${table.tableName}(${nChanges}) [${eventName}]`);
if (this.debug) {
//console.debug(data, options);
console.debug('Queued:'.blue,
`${tableName}(${rows.length}) [${eventName}]`);
}
this.position = evt.nextPosition;
this.flushed = false;
}
async flushQueue() {
if (this.flushed) return;
this.debug('Flush', `filename: ${this.filename}, position: ${this.position}`);
const position = this.nextPosition;
if (position) {
const filename = this.nextFilename;
this.debug('Flush', `filename: ${filename}, position: ${position}`);
const replaceQuery =
'REPLACE INTO `binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?';
if (!this.config.testMode)
await this.db.query(replaceQuery, [this.config.code, this.filename, this.position]);
if (!this.conf.testMode)
await this.db.query(replaceQuery, [this.conf.code, filename, position]);
this.flushed = true;
}
this.nextFilename = this.filename;
this.nextPosition = this.position;
}
async connectionPing() {
this.debug('Ping', 'Sending ping to database.');
@ -427,7 +407,7 @@ module.exports = class MyCDC {
}
debug(namespace, message) {
if (this.config.debug)
if (this.conf.debug)
console.debug(`${namespace}:`.blue, message.yellow);
}
}
@ -444,9 +424,3 @@ function equals(a, b) {
}
return false;
}
function formatValue(value) {
if (value instanceof Date)
return value.toJSON();
return value;
}

91
queue.js Normal file
View File

@ -0,0 +1,91 @@
module.exports = class Queue {
constructor(consumer, name, conf) {
Object.assign(this, {consumer, name, conf});
this.reset();
}
async consume() {
if (this.conf.mode !== 'fk') return;
const channel = await this.consumer.amqpConn.createChannel();
channel.prefetch(this.conf.amqpPrefetch);
await channel.assertQueue(this.name, {
durable: true
});
this.channel = channel;
await channel.consume(this.name,
msg => this.onConsume(msg));
this.flush();
}
reset() {
this.lastMessage = null;
this.nMessages = 0;
this.ids = new Set();
}
flush(flushInterval) {
if (this.timeout) {
clearTimeout(this.timeout);
this.timeout = null;
}
this.timeout = setTimeout(
() => this.onFlushTimeout(), flushInterval);
}
async onFlushTimeout() {
const consumer = this.consumer;
if (this.ids.size) {
if (consumer.conf.debug)
console.debug('Flush:'.blue, this.name.yellow, this.ids);
const ids = Array.from(this.ids);
const lastMessage = this.lastMessage;
this.reset();
try {
for (const id of ids) {
const sql = consumer.db.format(this.conf.query, id);
consumer.debug('SQL', sql);
if (!consumer.conf.testMode)
await consumer.db.query(sql);
}
await this.channel.ack(lastMessage, true);
} catch(err) {
await this.channel.nack(lastMessage, true);
console.error(err);
}
}
this.flush(this.conf.flushInterval);
}
async onConsume(msg) {
const consumer = this.consumer;
const data = JSON.parse(msg.content.toString());
if (consumer.conf.debug)
console.debug('Message:'.blue, this.name.yellow, data.table);
const ids = this.ids;
const key = this.conf.includeSchema[data.schema][data.table].key;
if (data.eventName === 'updaterows') {
for (const row of data.rows) {
ids.add(row.before[key]);
ids.add(row.after[key]);
}
} else
for (const row of data.rows)
ids.add(row[key]);
this.nMessages++;
this.lastMessage = msg;
if (this.nMessages == consumer.conf.amqpPrefetch)
this.flush();
}
}

View File

@ -1,10 +1,10 @@
#!/bin/bash
docker rm -f some-rabbit
docker rm -f rabbit
docker run \
-d --hostname my-rabbit \
--name some-rabbit \
--name rabbit \
-e RABBITMQ_DEFAULT_USER=user \
-e RABBITMQ_DEFAULT_PASS=password \
-p 5672:5672 \