Fixes and improvements

This commit is contained in:
Juan Ferrer 2022-08-16 11:45:25 +02:00
parent c43023b897
commit 5d0d68d85b
5 changed files with 63 additions and 24 deletions

3
.gitignore vendored
View File

@ -1,2 +1,3 @@
node_modules node_modules
zongji zongji
config.local.json

View File

@ -3,7 +3,7 @@
"host": "localhost", "host": "localhost",
"port": 3306, "port": 3306,
"user": "zongji", "user": "zongji",
"password": "4!e*16!qZ98F5LZ]" "password": "password"
}, },
"includeEvents": [ "includeEvents": [
"tablemap", "tablemap",
@ -12,11 +12,8 @@
"deleterows" "deleterows"
], ],
"interval": 5000, "interval": 5000,
"queries": { "queue": "orderRecalc",
"queue": "INSERT INTO `hedera`.`orderRecalc` (`orderFk`) VALUES ?", "addQuery": "INSERT INTO `hedera`.`orderRecalc` (`orderFk`) VALUES ?",
"getPos": "SELECT `logName`, `position` FROM `hedera`.`orderRecalcPos`",
"setPos": "REPLACE INTO `hedera`.`orderRecalcPos` (`id`, `logName`, `position`) VALUES (1, ?, ?)"
},
"includeSchema": { "includeSchema": {
"hedera": { "hedera": {
"order": { "order": {

View File

@ -1,6 +1,15 @@
const ZongJi = require('./zongji'); const ZongJi = require('./zongji');
const mysql = require('mysql2/promise'); const mysql = require('mysql2/promise');
const config = require('./config.json'); const fs = require('fs');
const path = require('path');
const defaultConfig = require('./config.json');
const config = Object.assign({}, defaultConfig);
const localPath = path.join(__dirname, 'config.local.json');
if (fs.existsSync(localPath)) {
const localConfig = require(localPath);
Object.assign(config, localConfig);
}
const zongji = new ZongJi(config.db); const zongji = new ZongJi(config.db);
const schemaMap = new Map(); const schemaMap = new Map();
@ -55,7 +64,10 @@ async function main() {
includeSchema includeSchema
}; };
const [res] = await db.query(config.queries.getPos); const [res] = await db.query(
'SELECT `logName`, `position` FROM `util`.`asyncQueue` WHERE code = ?',
[config.queue]
);
if (res.length) { if (res.length) {
const [row] = res; const [row] = res;
filename = row.logName; filename = row.logName;
@ -84,11 +96,25 @@ async function flushQueue() {
const ids = []; const ids = [];
for (const fk of fks) ids.push([fk]); for (const fk of fks) ids.push([fk]);
await db.query(config.queries.queue, [ids]); await db.query(config.addQuery, [ids]);
await db.query(config.queries.setPos, [filename, nextPosition]); await db.query(
'REPLACE INTO `util`.`asyncQueue` (`code`, `logName`, `position`) VALUES (?, ?, ?)',
[config.queue, filename, nextPosition]
);
fks.clear(); fks.clear();
} }
function equals(a, b) {
if (a === b)
return true;
const type = typeof a;
if (type !== typeof b)
return false;
if (type == 'object' && a instanceof Date)
return a.getTime() === b.getTime();
return false;
}
zongji.on('binlog', function(evt) { zongji.on('binlog', function(evt) {
//evt.dump(); //evt.dump();
const eventName = evt.getEventName(); const eventName = evt.getEventName();
@ -103,27 +129,42 @@ zongji.on('binlog', function(evt) {
if (!tableInfo.events.has(eventName)) return; if (!tableInfo.events.has(eventName)) return;
let column;
const rows = evt.rows;
if (eventName === 'updaterows') { if (eventName === 'updaterows') {
if (tableInfo.columns !== true) { if (tableInfo.columns !== true) {
for (const row of evt.rows) { let changes = false;
for (const row of rows) {
const after = row.after; const after = row.after;
for (const col in after) { for (const col in after) {
if (tableInfo.columns.has(col) && row.before[col] !== after[col]) { if (tableInfo.columns.has(col) && !equals(after[col], row.before[col])) {
fks.add(after[tableInfo.fk]); fks.add(after[tableInfo.fk]);
changes = true;
if (!column) column = col;
break; break;
} }
} }
} }
if (!changes) return;
} else { } else {
for (const row of evt.rows) for (const row of rows)
fks.add(row.after[tableInfo.fk]); fks.add(row.after[tableInfo.fk]);
} }
} else { } else {
for (const row of evt.rows) for (const row of rows)
fks.add(row[tableInfo.fk]); fks.add(row[tableInfo.fk]);
} }
console.log(`[${eventName}] ${table.tableName}: ${evt.rows.length}`); const row = eventName === 'updaterows'
? rows[0].after
: rows[0];
console.log(`[${eventName}] ${table.tableName}: ${rows.length}`);
console.log(` ${tableInfo.fk}: ${row[tableInfo.fk]}`);
if (column)
console.log(` ${column}: ${rows[0].after[column]} <- ${rows[0].before[column]}`);
nextPosition = evt.nextPosition; nextPosition = evt.nextPosition;
filename = zongji.options.filename; filename = zongji.options.filename;
}); });

View File

@ -1,13 +1,13 @@
CREATE TABLE `hedera`.`orderRecalcPos`( CREATE TABLE `util`.`asyncQueue`(
`id` INT NOT NULL AUTO_INCREMENT , `code` VARCHAR(255) NOT NULL,
`logName` VARCHAR(255) NOT NULL , `logName` VARCHAR(255) NOT NULL,
`position` BIGINT UNSIGNED NOT NULL , `position` BIGINT UNSIGNED NOT NULL,
PRIMARY KEY (`id`) PRIMARY KEY (`code`)
) ENGINE = InnoDB; ) ENGINE = InnoDB;
CREATE USER 'zongji'@'%' IDENTIFIED BY '4!e*16!qZ98F5LZ]'; CREATE USER 'zongji'@'%' IDENTIFIED BY 'password';
GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT ON *.* TO 'zongji'@'%'; GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT ON *.* TO 'zongji'@'%';
GRANT INSERT, DELETE ON `util`.`asyncQueue` TO 'zongji'@'%';
GRANT INSERT ON `hedera`.`orderRecalc` TO 'zongji'@'%'; GRANT INSERT ON `hedera`.`orderRecalc` TO 'zongji'@'%';
GRANT INSERT, DELETE ON `hedera`.`orderRecalcPos` TO 'zongji'@'%';

View File

@ -1,3 +1,3 @@
DROP TABLE IF EXISTS `hedera`.`orderRecalcPos`; DROP TABLE IF EXISTS `util`.`asyncQueue`;
DROP USER 'zongji'@'%'; DROP USER 'zongji'@'%';