diff --git a/config.json b/config.json index 832a8a1..9c585c2 100644 --- a/config.json +++ b/config.json @@ -1,4 +1,6 @@ { + "debug": true, + "testMode": true, "db": { "host": "localhost", "port": 3306, @@ -7,12 +9,14 @@ "database": "util" }, "includeEvents": [ + "rotate", "tablemap", "writerows", "updaterows", "deleterows" ], - "flushInterval": 5000, + "pingInterval": 60, + "flushInterval": 10000, "queue": "orderRecalc", "addQuery": "INSERT INTO `hedera`.`orderRecalc` (`orderFk`) VALUES ?", "includeSchema": { diff --git a/db-async.js b/db-async.js new file mode 100644 index 0000000..c8a56e6 --- /dev/null +++ b/db-async.js @@ -0,0 +1,259 @@ +const ZongJi = require('./zongji'); +const mysql = require('mysql2/promise'); + +const allEvents = new Set([ + 'writerows', + 'updaterows', + 'deleterows' + ]); +const fks = new Set(); + +module.exports = class DbAsync { + constructor(config) { + this.config = config; + this.running = false; + this.filename = null; + this.position = null; + this.schemaMap = new Map(); + } + + async start() { + if (this.config.testMode) + console.debug('Test mode enabled, just logging queries to console.'); + + console.log('Starting process.'); + + const db = await mysql.createConnection(this.config.db); + this.db = db; + + const includeSchema = {}; + for (const schemaName in this.config.includeSchema) { + const schema = this.config.includeSchema[schemaName]; + const tables = []; + const tableMap = new Map(); + + for (const tableName in schema) { + const table = schema[tableName]; + tables.push(tableName); + + const tableInfo = { + events: allEvents, + columns: true, + fk: 'id' + }; + tableMap.set(tableName, tableInfo); + + if (typeof table === 'object') { + if (Array.isArray(table.events)) + tableInfo.events = new Set(table.events); + if (Array.isArray(table.columns)) + tableInfo.columns = new Set(table.columns); + if (table.fk) + tableInfo.fk = table.fk; + } + } + + includeSchema[schemaName] = tables; + this.schemaMap.set(schemaName, tableMap); + } + + const opts = { + includeEvents: this.config.includeEvents, + includeSchema + }; + this.opts = opts; + + const [res] = await this.db.query( + 'SELECT `logName`, `position` FROM `binlogQueue` WHERE code = ?', + [this.config.queue] + ); + if (res.length) { + const [row] = res; + this.filename = row.logName; + this.position = row.position; + Object.assign(opts, { + filename: this.filename, + position: this.position + }); + } else + opts.startAtEnd = true; + + await this.startZongji(); + } + + async stop() { + await this.stopZongji(); + await this.db.end(); + } + + async startZongji() { + const zongji = new ZongJi(this.config.db); + this.zongji = zongji; + + zongji.on('ready', () => this.onReady()); + zongji.on('stopped', () => this.onStopped()); + zongji.on('error', err => this.onError(err)); + zongji.on('binlog', evt => this.onBinlog(evt)); + zongji.start(this.opts); + } + + async stopZongji() { + console.debug('Stopping Zongji.'); + this.running = false; + clearInterval(this.flushInterval); + clearInterval(this.pingInterval); + this.zongji.stop(); + } + + async restartZongji() { + console.debug('Restaring Zongji.'); + await this.stopZongji(); + setTimeout(() => this.startZongji(this.opts), 1000); + } + + onReady() { + this.running = true; + + this.flushInterval = setInterval( + () => this.flushQueue(), this.config.flushInterval); + this.pingInterval = setInterval( + () => this.connectionPing(), this.config.pingInterval * 1000); + + console.debug('Zongji ready.'); + } + + onStopped() { + console.debug('Zongji stopped.'); + } + + async onError(err) { + console.log(`Error: ${err.code}: ${err.message}`); + switch (err.code) { + case 'PROTOCOL_CONNECTION_LOST': + case 'ECONNRESET': + await this.restartZongji(); + break; + default: + await this.stop(); + process.exit(); + } + } + + onBinlog(evt) { + //evt.dump(); + const eventName = evt.getEventName(); + if (eventName === 'tablemap') return; + + if (eventName === 'rotate') { + this.filename = evt.binlogName; + this.position = evt.position; + console.log(`[${eventName}] filename: ${this.filename}`, `position: ${this.position}`); + return; + } + + const table = evt.tableMap[evt.tableId]; + const tableMap = this.schemaMap.get(table.parentSchema); + if (!tableMap) return; + + const tableInfo = tableMap.get(table.tableName); + if (!tableInfo) return; + + if (!tableInfo.events.has(eventName)) return; + + let column; + const rows = evt.rows; + + if (eventName === 'updaterows') { + if (tableInfo.columns !== true) { + let changes = false; + for (const row of rows) { + const after = row.after; + for (const col in after) { + if (tableInfo.columns.has(col) && !equals(after[col], row.before[col])) { + fks.add(after[tableInfo.fk]); + changes = true; + if (!column) column = col; + break; + } + } + } + if (!changes) return; + } else { + for (const row of rows) + fks.add(row.after[tableInfo.fk]); + } + } else { + for (const row of rows) + fks.add(row[tableInfo.fk]); + } + + const row = eventName === 'updaterows' + ? rows[0].after + : rows[0]; + + if (this.config.debug) { + console.debug(`[${eventName}] ${table.tableName}: ${rows.length}`); + console.debug(` ${tableInfo.fk}: ${row[tableInfo.fk]}`); + if (column) { + let before = formatValue(rows[0].before[column]); + let after = formatValue(rows[0].after[column]); + console.debug(` ${column}: ${before} <- ${after}`); + } + } + + this.position = evt.nextPosition; + } + + async flushQueue() { + if (!this.running) return; + console.log('=========================================================='); + console.log('Flush:', `filename: ${this.filename}`, `position: ${this.position}`); + console.log(fks); + if (!fks.size) return; + + const ids = []; + for (const fk of fks) ids.push([fk]); + + const replaceQuery = + 'REPLACE INTO `binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?'; + + if (this.config.testMode) { + console.debug(this.config.addQuery); + console.debug(replaceQuery); + } else { + await this.db.query(this.config.addQuery, [ids]); + await this.db.query(replaceQuery, [this.config.queue, this.filename, this.position]); + } + fks.clear(); + console.log('=========================================================='); + } + + async connectionPing() { + if (!this.running) return; + if (this.config.debug) + console.debug('Sending ping to database.') + this.zongji.connection.ping(); + this.zongji.ctrlConnection.ping(); + await this.db.ping(); + } + } + + function equals(a, b) { + if (a === b) + return true; + const type = typeof a; + if (a == null || b == null || type !== typeof b) + return false; + if (type === 'object' && a.constructor === b.constructor) { + if (a instanceof Date) + return a.getTime() === b.getTime(); + } + return false; + } + + function formatValue(value) { + if (value instanceof Date) + return value.toJSON(); + return value; + } + \ No newline at end of file diff --git a/index.js b/index.js index d30a77e..a8195d1 100644 --- a/index.js +++ b/index.js @@ -1,8 +1,8 @@ -const ZongJi = require('./zongji'); -const mysql = require('mysql2/promise'); + const fs = require('fs'); const path = require('path'); const defaultConfig = require('./config.json'); +const DbAsync = require('./db-async'); const config = Object.assign({}, defaultConfig); const localPath = path.join(__dirname, 'config.local.json'); @@ -11,164 +11,18 @@ if (fs.existsSync(localPath)) { Object.assign(config, localConfig); } -const zongji = new ZongJi(config.db); -const schemaMap = new Map(); - -const allEvents = new Set([ - 'writerows', - 'updaterows', - 'deleterows' -]); - -let db; -let nextPosition; -let filename; -const fks = new Set(); +let dbAsync; async function main() { - db = await mysql.createConnection(config.db); - const includeSchema = {}; - for (const schemaName in config.includeSchema) { - const schema = config.includeSchema[schemaName]; - const tables = []; - const tableMap = new Map(); - - for (const tableName in schema) { - const table = schema[tableName]; - tables.push(tableName); - - const tableInfo = { - events: allEvents, - columns: true, - fk: 'id' - }; - tableMap.set(tableName, tableInfo); - - if (typeof table === 'object') { - if (Array.isArray(table.events)) - tableInfo.events = new Set(table.events); - if (Array.isArray(table.columns)) - tableInfo.columns = new Set(table.columns); - if (table.fk) - tableInfo.fk = table.fk; - } - } - - includeSchema[schemaName] = tables; - schemaMap.set(schemaName, tableMap); - } - - const opts = { - includeEvents: config.includeEvents, - includeSchema - }; - - const [res] = await db.query( - 'SELECT `logName`, `position` FROM `binlogQueue` WHERE code = ?', - [config.queue] - ); - if (res.length) { - const [row] = res; - filename = row.logName; - position = row.position; - Object.assign(opts, {filename, position}); - } else - opts.startAtEnd = true; - - zongji.start(opts); - setInterval(flushQueue, config.flushInterval); - console.log('Listenig binary log events.'); + dbAsync = new DbAsync(config) + await dbAsync.start(); process.on('SIGINT', async function() { console.log('Got SIGINT.'); - zongji.stop(); - await db.end(); + await dbAsync.stop(); process.exit(); }); } -async function flushQueue() { - console.log('==========================================================') - console.log('Flush:', `filename=${filename}`, `position=${nextPosition}`); - console.log(fks); - if (!fks.size) return; - - const ids = []; - for (const fk of fks) ids.push([fk]); - await db.query(config.addQuery, [ids]); - await db.query( - 'REPLACE INTO `binlogQueue` (`code`, `logName`, `position`) VALUES (?, ?, ?)', - [config.queue, filename, nextPosition] - ); - fks.clear(); -} - -function equals(a, b) { - if (a === b) - return true; - const type = typeof a; - if (a == null || b == null || type !== typeof b) - return false; - if (type === 'object' && a.constructor === b.constructor) { - if (a instanceof Date) - return a.getTime() === b.getTime(); - } - return false; -} - -zongji.on('binlog', function(evt) { - //evt.dump(); - const eventName = evt.getEventName(); - const table = evt.tableMap[evt.tableId]; - if (eventName === 'tablemap') return; - - const tableMap = schemaMap.get(table.parentSchema); - if (!tableMap) return; - - const tableInfo = tableMap.get(table.tableName); - if (!tableInfo) return; - - if (!tableInfo.events.has(eventName)) return; - - let column; - const rows = evt.rows; - - if (eventName === 'updaterows') { - if (tableInfo.columns !== true) { - let changes = false; - for (const row of rows) { - const after = row.after; - for (const col in after) { - if (tableInfo.columns.has(col) && !equals(after[col], row.before[col])) { - fks.add(after[tableInfo.fk]); - changes = true; - if (!column) column = col; - break; - } - } - } - if (!changes) return; - } else { - for (const row of rows) - fks.add(row.after[tableInfo.fk]); - } - } else { - for (const row of rows) - fks.add(row[tableInfo.fk]); - } - - const row = eventName === 'updaterows' - ? rows[0].after - : rows[0]; - - console.log(`[${eventName}] ${table.tableName}: ${rows.length}`); - console.log(` ${tableInfo.fk}: ${row[tableInfo.fk]}`); - if (column) - console.log(` ${column}: ${rows[0].after[column]} <- ${rows[0].before[column]}`); - - nextPosition = evt.nextPosition; - filename = zongji.options.filename; -}); - main();