const ZongJi = require('./zongji'); const mysql = require('mysql2/promise'); const allEvents = new Set([ 'writerows', 'updaterows', 'deleterows' ]); const fks = new Set(); module.exports = class DbAsync { constructor(config) { this.config = config; this.running = false; this.filename = null; this.position = null; this.schemaMap = new Map(); } async start() { if (this.config.testMode) console.debug('Test mode enabled, just logging queries to console.'); console.log('Starting process.'); const db = await mysql.createConnection(this.config.db); this.db = db; const includeSchema = {}; for (const schemaName in this.config.includeSchema) { const schema = this.config.includeSchema[schemaName]; const tables = []; const tableMap = new Map(); for (const tableName in schema) { const table = schema[tableName]; tables.push(tableName); const tableInfo = { events: allEvents, columns: true, fk: 'id' }; tableMap.set(tableName, tableInfo); if (typeof table === 'object') { if (Array.isArray(table.events)) tableInfo.events = new Set(table.events); if (Array.isArray(table.columns)) tableInfo.columns = new Set(table.columns); if (table.fk) tableInfo.fk = table.fk; } } includeSchema[schemaName] = tables; this.schemaMap.set(schemaName, tableMap); } const opts = { includeEvents: this.config.includeEvents, includeSchema }; this.opts = opts; const [res] = await this.db.query( 'SELECT `logName`, `position` FROM `binlogQueue` WHERE code = ?', [this.config.queue] ); if (res.length) { const [row] = res; this.filename = row.logName; this.position = row.position; Object.assign(opts, { filename: this.filename, position: this.position }); } else opts.startAtEnd = true; await this.startZongji(); } async stop() { await this.stopZongji(); await this.db.end(); } async startZongji() { const zongji = new ZongJi(this.config.db); this.zongji = zongji; zongji.on('ready', () => this.onReady()); zongji.on('stopped', () => this.onStopped()); zongji.on('error', err => this.onError(err)); zongji.on('binlog', evt => this.onBinlog(evt)); zongji.start(this.opts); } async stopZongji() { console.debug('Stopping Zongji.'); this.running = false; clearInterval(this.flushInterval); clearInterval(this.pingInterval); this.zongji.stop(); } async restartZongji() { console.debug('Restaring Zongji.'); await this.stopZongji(); setTimeout(() => this.startZongji(this.opts), 1000); } onReady() { this.running = true; this.flushInterval = setInterval( () => this.flushQueue(), this.config.flushInterval); this.pingInterval = setInterval( () => this.connectionPing(), this.config.pingInterval * 1000); console.debug('Zongji ready.'); } onStopped() { console.debug('Zongji stopped.'); } async onError(err) { console.log(`Error: ${err.code}: ${err.message}`); switch (err.code) { case 'PROTOCOL_CONNECTION_LOST': case 'ECONNRESET': await this.restartZongji(); break; default: await this.stop(); process.exit(); } } onBinlog(evt) { //evt.dump(); const eventName = evt.getEventName(); if (eventName === 'tablemap') return; if (eventName === 'rotate') { this.filename = evt.binlogName; this.position = evt.position; console.log(`[${eventName}] filename: ${this.filename}`, `position: ${this.position}`); return; } const table = evt.tableMap[evt.tableId]; const tableMap = this.schemaMap.get(table.parentSchema); if (!tableMap) return; const tableInfo = tableMap.get(table.tableName); if (!tableInfo) return; if (!tableInfo.events.has(eventName)) return; let column; const rows = evt.rows; if (eventName === 'updaterows') { if (tableInfo.columns !== true) { let changes = false; for (const row of rows) { const after = row.after; for (const col in after) { if (tableInfo.columns.has(col) && !equals(after[col], row.before[col])) { fks.add(after[tableInfo.fk]); changes = true; if (!column) column = col; break; } } } if (!changes) return; } else { for (const row of rows) fks.add(row.after[tableInfo.fk]); } } else { for (const row of rows) fks.add(row[tableInfo.fk]); } const row = eventName === 'updaterows' ? rows[0].after : rows[0]; if (this.config.debug) { console.debug(`[${eventName}] ${table.tableName}: ${rows.length}`); console.debug(` ${tableInfo.fk}: ${row[tableInfo.fk]}`); if (column) { let before = formatValue(rows[0].before[column]); let after = formatValue(rows[0].after[column]); console.debug(` ${column}: ${before} <- ${after}`); } } this.position = evt.nextPosition; } async flushQueue() { if (!this.running) return; console.log('=========================================================='); console.log('Flush:', `filename: ${this.filename}`, `position: ${this.position}`); console.log(fks); if (!fks.size) return; const ids = []; for (const fk of fks) ids.push([fk]); const replaceQuery = 'REPLACE INTO `binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?'; if (this.config.testMode) { console.debug(this.config.addQuery); console.debug(replaceQuery); } else { await this.db.query(this.config.addQuery, [ids]); await this.db.query(replaceQuery, [this.config.queue, this.filename, this.position]); } fks.clear(); console.log('=========================================================='); } async connectionPing() { if (!this.running) return; if (this.config.debug) console.debug('Sending ping to database.') this.zongji.connection.ping(); this.zongji.ctrlConnection.ping(); await this.db.ping(); } } function equals(a, b) { if (a === b) return true; const type = typeof a; if (a == null || b == null || type !== typeof b) return false; if (type === 'object' && a.constructor === b.constructor) { if (a instanceof Date) return a.getTime() === b.getTime(); } return false; } function formatValue(value) { if (value instanceof Date) return value.toJSON(); return value; }