commit 81cf6168b82b1b4d447c4311501b5ed016fcd0ac Author: Juan Ferrer Toribio Date: Mon Aug 15 18:12:17 2022 +0200 First commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cc8e0ea --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +node_modules +zongji \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..c4cfb4d --- /dev/null +++ b/README.md @@ -0,0 +1,25 @@ +# Asynchronous DB calculations reading the binary log + +Clone *zongji* repo into project directory. +```text +git clone https://github.com/juan-ferrer-toribio/zongji.git +cd zongji +git checkout fix-143 +``` + +Install dependencies. +```text +npm i +``` + +Apply SQL commands from *zongji.sql* into DB. + +Launch app. +```text +node index.js +``` + +## Built With + +* [Zongji](https://github.com/nevill/zongji) +* [MySQL2](https://github.com/sidorares/node-mysql2#readme) diff --git a/config.json b/config.json new file mode 100644 index 0000000..f38143f --- /dev/null +++ b/config.json @@ -0,0 +1,54 @@ +{ + "db": { + "host": "localhost", + "port": 3306, + "user": "zongji", + "password": "4!e*16!qZ98F5LZ]" + }, + "includeEvents": [ + "tablemap", + "writerows", + "updaterows", + "deleterows" + ], + "interval": 5000, + "queries": { + "queue": "INSERT INTO `hedera`.`orderRecalc` (`orderFk`) VALUES ?", + "getPos": "SELECT `logName`, `position` FROM `hedera`.`orderRecalcPos`", + "setPos": "REPLACE INTO `hedera`.`orderRecalcPos` (`id`, `logName`, `position`) VALUES (1, ?, ?)" + }, + "includeSchema": { + "account": { + "userPassword": { + "fk": "nDigits", + "columns": [ + "nDigits" + ] + } + }, + "hedera": { + "order": { + "fk": "id", + "events": ["updaterows"], + "columns": [ + "id", + "address_id", + "company_id", + "date_send", + "customer_id" + ] + }, + "orderRow": { + "fk": "orderFk", + "columns": [ + "id", + "orderFk", + "itemFk", + "warehouseFk", + "shipment", + "amount" + ] + } + } + } +} diff --git a/index.js b/index.js new file mode 100644 index 0000000..ac9c2a8 --- /dev/null +++ b/index.js @@ -0,0 +1,131 @@ +const ZongJi = require('./zongji'); +const mysql = require('mysql2/promise'); +const config = require('./config.json'); + +const zongji = new ZongJi(config.db); +const schemaMap = new Map(); + +const allEvents = new Set([ + 'writerows', + 'updaterows', + 'deleterows' +]); + +let db; +let nextPosition; +let filename; +const fks = new Set(); + +async function main() { + db = await mysql.createConnection(config.db); + + const includeSchema = {}; + for (const schemaName in config.includeSchema) { + const schema = config.includeSchema[schemaName]; + const tables = []; + const tableMap = new Map(); + + for (const tableName in schema) { + const table = schema[tableName]; + tables.push(tableName); + + const tableInfo = { + events: allEvents, + columns: true, + fk: 'id' + }; + tableMap.set(tableName, tableInfo); + + if (typeof table === 'object') { + if (Array.isArray(table.events)) + tableInfo.events = new Set(table.events); + if (Array.isArray(table.columns)) + tableInfo.columns = new Set(table.columns); + if (table.fk) + tableInfo.fk = table.fk; + } + } + + includeSchema[schemaName] = tables; + schemaMap.set(schemaName, tableMap); + } + + const opts = { + includeEvents: config.includeEvents, + includeSchema + }; + + const [res] = await db.query(config.queries.getPos); + if (res.length) { + const [row] = res; + filename = row.logName; + position = row.position; + Object.assign(opts, {filename, position}); + } else + opts.startAtEnd = true; + + zongji.start(opts); + setInterval(flushQueue, config.interval); + console.log('Listenig binary log events.'); + + process.on('SIGINT', async function() { + console.log('Got SIGINT.'); + zongji.stop(); + await db.end(); + process.exit(); + }); +} + +async function flushQueue() { + console.log('==========================================================') + console.log('Flush:', `filename=${filename}`, `position=${nextPosition}`); + console.log(fks); + if (!fks.size) return; + + const ids = []; + for (const fk of fks) ids.push([fk]); + await db.query(config.queries.queue, [ids]); + await db.query(config.queries.setPos, [filename, nextPosition]); + fks.clear(); +} + +zongji.on('binlog', function(evt) { + //evt.dump(); + const eventName = evt.getEventName(); + const table = evt.tableMap[evt.tableId]; + if (eventName === 'tablemap') return; + + const tableMap = schemaMap.get(table.parentSchema); + if (!tableMap) return; + + const tableInfo = tableMap.get(table.tableName); + if (!tableInfo) return; + + if (!tableInfo.events.has(eventName)) return; + + if (eventName === 'updaterows') { + if (tableInfo.columns !== true) { + for (const row of evt.rows) { + const after = row.after; + for (const col in after) { + if (tableInfo.columns.has(col) && row.before[col] !== after[col]) { + fks.add(after[tableInfo.fk]); + break; + } + } + } + } else { + for (const row of evt.rows) + fks.add(row.after[tableInfo.fk]); + } + } else { + for (const row of evt.rows) + fks.add(row[tableInfo.fk]); + } + + console.log(`[${eventName}] ${table.tableName}: ${evt.rows.length}`); + nextPosition = evt.nextPosition; + filename = zongji.options.filename; +}); + +main(); diff --git a/package-lock.json b/package-lock.json new file mode 100644 index 0000000..87c4bb6 --- /dev/null +++ b/package-lock.json @@ -0,0 +1,245 @@ +{ + "name": "binlog", + "lockfileVersion": 2, + "requires": true, + "packages": { + "": { + "dependencies": { + "mysql2": "^2.3.3", + "zongji": "file:../zongji" + } + }, + "../zongji": {}, + "node_modules/denque": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", + "engines": { + "node": ">=0.10" + } + }, + "node_modules/generate-function": { + "version": "2.3.1", + "resolved": "https://registry.npmjs.org/generate-function/-/generate-function-2.3.1.tgz", + "integrity": "sha512-eeB5GfMNeevm/GRYq20ShmsaGcmI81kIX2K9XQx5miC8KdHaC6Jm0qQ8ZNeGOi7wYB8OsdxKs+Y2oVuTFuVwKQ==", + "dependencies": { + "is-property": "^1.0.2" + } + }, + "node_modules/iconv-lite": { + "version": "0.6.3", + "resolved": "https://registry.npmjs.org/iconv-lite/-/iconv-lite-0.6.3.tgz", + "integrity": "sha512-4fCk79wshMdzMp2rH06qWrJE4iolqLhCUH+OiuIgU++RB0+94NlDL81atO7GX55uUKueo0txHNtvEyI6D7WdMw==", + "dependencies": { + "safer-buffer": ">= 2.1.2 < 3.0.0" + }, + "engines": { + "node": ">=0.10.0" + } + }, + "node_modules/is-property": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/is-property/-/is-property-1.0.2.tgz", + "integrity": "sha512-Ks/IoX00TtClbGQr4TWXemAnktAQvYB7HzcCxDGqEZU6oCmb2INHuOoKxbtR+HFkmYWBKv/dOZtGRiAjDhj92g==" + }, + "node_modules/long": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/long/-/long-4.0.0.tgz", + "integrity": "sha512-XsP+KhQif4bjX1kbuSiySJFNAehNxgLb6hPRGJ9QsUr8ajHkuXGdrHmFUTUUXhDwVX2R5bY4JNZEwbUiMhV+MA==" + }, + "node_modules/lru-cache": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", + "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", + "dependencies": { + "yallist": "^4.0.0" + }, + "engines": { + "node": ">=10" + } + }, + "node_modules/mysql2": { + "version": "2.3.3", + "resolved": "https://registry.npmjs.org/mysql2/-/mysql2-2.3.3.tgz", + "integrity": "sha512-wxJUev6LgMSgACDkb/InIFxDprRa6T95+VEoR+xPvtngtccNH2dGjEB/fVZ8yg1gWv1510c9CvXuJHi5zUm0ZA==", + "dependencies": { + "denque": "^2.0.1", + "generate-function": "^2.3.1", + "iconv-lite": "^0.6.3", + "long": "^4.0.0", + "lru-cache": "^6.0.0", + "named-placeholders": "^1.1.2", + "seq-queue": "^0.0.5", + "sqlstring": "^2.3.2" + }, + "engines": { + "node": ">= 8.0" + } + }, + "node_modules/named-placeholders": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/named-placeholders/-/named-placeholders-1.1.2.tgz", + "integrity": "sha512-wiFWqxoLL3PGVReSZpjLVxyJ1bRqe+KKJVbr4hGs1KWfTZTQyezHFBbuKj9hsizHyGV2ne7EMjHdxEGAybD5SA==", + "dependencies": { + "lru-cache": "^4.1.3" + }, + "engines": { + "node": ">=6.0.0" + } + }, + "node_modules/named-placeholders/node_modules/lru-cache": { + "version": "4.1.5", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-4.1.5.tgz", + "integrity": "sha512-sWZlbEP2OsHNkXrMl5GYk/jKk70MBng6UU4YI/qGDYbgf6YbP4EvmqISbXCoJiRKs+1bSpFHVgQxvJ17F2li5g==", + "dependencies": { + "pseudomap": "^1.0.2", + "yallist": "^2.1.2" + } + }, + "node_modules/named-placeholders/node_modules/yallist": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/yallist/-/yallist-2.1.2.tgz", + "integrity": "sha512-ncTzHV7NvsQZkYe1DW7cbDLm0YpzHmZF5r/iyP3ZnQtMiJ+pjzisCiMNI+Sj+xQF5pXhSHxSB3uDbsBTzY/c2A==" + }, + "node_modules/pseudomap": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/pseudomap/-/pseudomap-1.0.2.tgz", + "integrity": "sha512-b/YwNhb8lk1Zz2+bXXpS/LK9OisiZZ1SNsSLxN1x2OXVEhW2Ckr/7mWE5vrC1ZTiJlD9g19jWszTmJsB+oEpFQ==" + }, + "node_modules/safer-buffer": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz", + "integrity": "sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg==" + }, + "node_modules/seq-queue": { + "version": "0.0.5", + "resolved": "https://registry.npmjs.org/seq-queue/-/seq-queue-0.0.5.tgz", + "integrity": "sha512-hr3Wtp/GZIc/6DAGPDcV4/9WoZhjrkXsi5B/07QgX8tsdc6ilr7BFM6PM6rbdAX1kFSDYeZGLipIZZKyQP0O5Q==" + }, + "node_modules/sqlstring": { + "version": "2.3.3", + "resolved": "https://registry.npmjs.org/sqlstring/-/sqlstring-2.3.3.tgz", + "integrity": "sha512-qC9iz2FlN7DQl3+wjwn3802RTyjCx7sDvfQEXchwa6CWOx07/WVfh91gBmQ9fahw8snwGEWU3xGzOt4tFyHLxg==", + "engines": { + "node": ">= 0.6" + } + }, + "node_modules/yallist": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz", + "integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==" + }, + "node_modules/zongji": { + "resolved": "../zongji", + "link": true + } + }, + "dependencies": { + "denque": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==" + }, + "generate-function": { + "version": "2.3.1", + "resolved": "https://registry.npmjs.org/generate-function/-/generate-function-2.3.1.tgz", + "integrity": "sha512-eeB5GfMNeevm/GRYq20ShmsaGcmI81kIX2K9XQx5miC8KdHaC6Jm0qQ8ZNeGOi7wYB8OsdxKs+Y2oVuTFuVwKQ==", + "requires": { + "is-property": "^1.0.2" + } + }, + "iconv-lite": { + "version": "0.6.3", + "resolved": "https://registry.npmjs.org/iconv-lite/-/iconv-lite-0.6.3.tgz", + "integrity": "sha512-4fCk79wshMdzMp2rH06qWrJE4iolqLhCUH+OiuIgU++RB0+94NlDL81atO7GX55uUKueo0txHNtvEyI6D7WdMw==", + "requires": { + "safer-buffer": ">= 2.1.2 < 3.0.0" + } + }, + "is-property": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/is-property/-/is-property-1.0.2.tgz", + "integrity": "sha512-Ks/IoX00TtClbGQr4TWXemAnktAQvYB7HzcCxDGqEZU6oCmb2INHuOoKxbtR+HFkmYWBKv/dOZtGRiAjDhj92g==" + }, + "long": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/long/-/long-4.0.0.tgz", + "integrity": "sha512-XsP+KhQif4bjX1kbuSiySJFNAehNxgLb6hPRGJ9QsUr8ajHkuXGdrHmFUTUUXhDwVX2R5bY4JNZEwbUiMhV+MA==" + }, + "lru-cache": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", + "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", + "requires": { + "yallist": "^4.0.0" + } + }, + "mysql2": { + "version": "2.3.3", + "resolved": "https://registry.npmjs.org/mysql2/-/mysql2-2.3.3.tgz", + "integrity": "sha512-wxJUev6LgMSgACDkb/InIFxDprRa6T95+VEoR+xPvtngtccNH2dGjEB/fVZ8yg1gWv1510c9CvXuJHi5zUm0ZA==", + "requires": { + "denque": "^2.0.1", + "generate-function": "^2.3.1", + "iconv-lite": "^0.6.3", + "long": "^4.0.0", + "lru-cache": "^6.0.0", + "named-placeholders": "^1.1.2", + "seq-queue": "^0.0.5", + "sqlstring": "^2.3.2" + } + }, + "named-placeholders": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/named-placeholders/-/named-placeholders-1.1.2.tgz", + "integrity": "sha512-wiFWqxoLL3PGVReSZpjLVxyJ1bRqe+KKJVbr4hGs1KWfTZTQyezHFBbuKj9hsizHyGV2ne7EMjHdxEGAybD5SA==", + "requires": { + "lru-cache": "^4.1.3" + }, + "dependencies": { + "lru-cache": { + "version": "4.1.5", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-4.1.5.tgz", + "integrity": "sha512-sWZlbEP2OsHNkXrMl5GYk/jKk70MBng6UU4YI/qGDYbgf6YbP4EvmqISbXCoJiRKs+1bSpFHVgQxvJ17F2li5g==", + "requires": { + "pseudomap": "^1.0.2", + "yallist": "^2.1.2" + } + }, + "yallist": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/yallist/-/yallist-2.1.2.tgz", + "integrity": "sha512-ncTzHV7NvsQZkYe1DW7cbDLm0YpzHmZF5r/iyP3ZnQtMiJ+pjzisCiMNI+Sj+xQF5pXhSHxSB3uDbsBTzY/c2A==" + } + } + }, + "pseudomap": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/pseudomap/-/pseudomap-1.0.2.tgz", + "integrity": "sha512-b/YwNhb8lk1Zz2+bXXpS/LK9OisiZZ1SNsSLxN1x2OXVEhW2Ckr/7mWE5vrC1ZTiJlD9g19jWszTmJsB+oEpFQ==" + }, + "safer-buffer": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz", + "integrity": "sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg==" + }, + "seq-queue": { + "version": "0.0.5", + "resolved": "https://registry.npmjs.org/seq-queue/-/seq-queue-0.0.5.tgz", + "integrity": "sha512-hr3Wtp/GZIc/6DAGPDcV4/9WoZhjrkXsi5B/07QgX8tsdc6ilr7BFM6PM6rbdAX1kFSDYeZGLipIZZKyQP0O5Q==" + }, + "sqlstring": { + "version": "2.3.3", + "resolved": "https://registry.npmjs.org/sqlstring/-/sqlstring-2.3.3.tgz", + "integrity": "sha512-qC9iz2FlN7DQl3+wjwn3802RTyjCx7sDvfQEXchwa6CWOx07/WVfh91gBmQ9fahw8snwGEWU3xGzOt4tFyHLxg==" + }, + "yallist": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz", + "integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==" + }, + "zongji": { + "version": "file:../zongji" + } + } +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..7356ef9 --- /dev/null +++ b/package.json @@ -0,0 +1,6 @@ +{ + "dependencies": { + "mysql2": "^2.3.3", + "zongji": "file:../zongji" + } +} diff --git a/zongji.sql b/zongji.sql new file mode 100644 index 0000000..9322376 --- /dev/null +++ b/zongji.sql @@ -0,0 +1,13 @@ + +CREATE TABLE `hedera`.`orderRecalcPos`( + `id` INT NOT NULL AUTO_INCREMENT , + `logName` VARCHAR(255) NOT NULL , + `position` BIGINT UNSIGNED NOT NULL , + PRIMARY KEY (`id`) +) ENGINE = InnoDB; + +CREATE USER 'zongji'@'%' IDENTIFIED BY '4!e*16!qZ98F5LZ]'; +GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT ON *.* TO 'zongji'@'%'; + +GRANT INSERT ON `hedera`.`orderRecalc` TO 'zongji'@'%'; +GRANT INSERT, DELETE ON `hedera`.`orderRecalcPos` TO 'zongji'@'%'; diff --git a/zongji.undo.sql b/zongji.undo.sql new file mode 100644 index 0000000..81e21a5 --- /dev/null +++ b/zongji.undo.sql @@ -0,0 +1,3 @@ + +DROP TABLE IF EXISTS `hedera`.`orderRecalcPos`; +DROP USER 'zongji'@'%';