const ZongJi = require('./zongji'); const mysql = require('mysql2/promise'); const amqp = require('amqplib'); require('colors'); const allEvents = new Set([ 'writerows', 'updaterows', 'deleterows' ]); module.exports = class MyCDC { constructor(config) { this.config = config; this.running = false; this.filename = null; this.position = null; this.schemaMap = new Map(); this.fks = new Set(); const includeSchema = {}; for (const schemaName in this.config.includeSchema) { const schema = this.config.includeSchema[schemaName]; const tables = []; const tableMap = new Map(); for (const tableName in schema) { const table = schema[tableName]; tables.push(tableName); const tableInfo = { events: allEvents, columns: true, fk: 'id' }; tableMap.set(tableName, tableInfo); if (typeof table === 'object') { if (Array.isArray(table.events)) tableInfo.events = new Set(table.events); if (Array.isArray(table.columns)) tableInfo.columns = new Set(table.columns); if (table.fk) tableInfo.fk = table.fk; } } includeSchema[schemaName] = tables; this.schemaMap.set(schemaName, tableMap); } this.opts = { includeEvents: this.config.includeEvents, includeSchema }; } async start() { if (this.config.testMode) console.log('Test mode enabled, just logging queries to console.'); console.log('Starting process.'); await this.init(); console.log('Process started.'); } async stop() { console.log('Stopping process.'); await this.end(); console.log('Process stopped.'); } async init() { this.debug('DbAsync', 'Initializing.'); this.onErrorListener = err => this.onError(err); // DB connection this.db = await mysql.createConnection(this.config.db); this.db.on('error', this.onErrorListener); // RabbitMQ this.publisher = await amqp.connect(this.config.amqp); this.channel = await this.publisher.createChannel(); this.channel.assertQueue(this.config.queue, { durable: true }); // Zongji const zongji = new ZongJi(this.config.db); this.zongji = zongji; this.onBinlogListener = evt => this.onBinlog(evt); zongji.on('binlog', this.onBinlogListener); const [res] = await this.db.query( 'SELECT `logName`, `position` FROM `binlogQueue` WHERE code = ?', [this.config.queue] ); if (res.length) { const [row] = res; this.filename = row.logName; this.position = row.position; Object.assign(this.opts, { filename: this.filename, position: this.position }); } else this.opts.startAtEnd = true; this.debug('Zongji', 'Starting.'); await new Promise((resolve, reject) => { const onReady = () => { zongji.off('error', onError); resolve(); }; const onError = err => { this.zongji = null; zongji.off('ready', onReady); zongji.off('binlog', this.onBinlogListener); reject(err); } zongji.once('ready', onReady); zongji.once('error', onError); zongji.start(this.opts); }); this.debug('Zongji', 'Started.'); this.zongji.on('error', this.onErrorListener); this.flushInterval = setInterval( () => this.flushQueue(), this.config.flushInterval); this.pingInterval = setInterval( () => this.connectionPing(), this.config.pingInterval * 1000); // Summary this.running = true; this.debug('DbAsync', 'Initialized.'); } async end(silent) { const zongji = this.zongji; if (!zongji) return; this.debug('DbAsync', 'Ending.'); // Zongji clearInterval(this.flushInterval); clearInterval(this.pingInterval); zongji.off('binlog', this.onBinlogListener); zongji.off('error', this.onErrorListener); this.zongji = null; this.running = false; this.debug('Zongji', 'Stopping.'); // FIXME: Cannot call Zongji.stop(), it doesn't wait to end connection zongji.connection.destroy(() => { console.log('zongji.connection.destroy'); }); await new Promise(resolve => { zongji.ctrlConnection.query('KILL ' + zongji.connection.threadId, err => { if (err && !silent) console.error(err); resolve(); }); }); zongji.ctrlConnection.destroy(() => { console.log('zongji.ctrlConnection.destroy'); }); zongji.emit('stopped'); this.debug('Zongji', 'Stopped.'); // RabbitMQ await this.publisher.close(); // DB connection this.db.off('error', this.onErrorListener); // FIXME: mysql2/promise bug, db.end() ends process this.db.on('error', () => {}); try { await this.db.end(); } catch (err) { if (!silent) console.error(err); } // Summary this.debug('DbAsync', 'Ended.'); } async tryRestart() { try { await this.init(); console.log('Process restarted.'); } catch(err) { setTimeout(() => this.tryRestart(), 30); } } async onError(err) { console.log(`Error: ${err.code}: ${err.message}`); try { await this.end(true); } catch(e) {} switch (err.code) { case 'PROTOCOL_CONNECTION_LOST': case 'ECONNRESET': console.log('Trying to restart process.'); await this.tryRestart(); break; default: process.exit(); } } onBinlog(evt) { //evt.dump(); const eventName = evt.getEventName(); if (eventName === 'tablemap') return; if (eventName === 'rotate') { this.filename = evt.binlogName; this.position = evt.position; console.log(`[${eventName}] filename: ${this.filename}`, `position: ${this.position}`); return; } const table = evt.tableMap[evt.tableId]; const tableMap = this.schemaMap.get(table.parentSchema); if (!tableMap) return; const tableInfo = tableMap.get(table.tableName); if (!tableInfo) return; if (!tableInfo.events.has(eventName)) return; let column; const rows = evt.rows; const fks = new Set(); if (eventName === 'updaterows') { if (tableInfo.columns !== true) { let changes = false; for (const row of rows) { const after = row.after; for (const col in after) { if (tableInfo.columns.has(col) && !equals(after[col], row.before[col])) { fks.add(after[tableInfo.fk]); changes = true; if (!column) column = col; break; } } } if (!changes) return; } else { for (const row of rows) fks.add(row.after[tableInfo.fk]); } } else { for (const row of rows) fks.add(row[tableInfo.fk]); } if (fks.size) { const data = JSON.stringify(Array.from(fks)); this.channel.sendToQueue(this.config.queue, Buffer.from(data)); this.debug('Queued', data); } const row = eventName === 'updaterows' ? rows[0].after : rows[0]; if (this.config.debug) { console.debug(`[${eventName}] ${table.tableName}: ${rows.length}`); console.debug(` ${tableInfo.fk}: ${row[tableInfo.fk]}`); if (column) { let before = formatValue(rows[0].before[column]); let after = formatValue(rows[0].after[column]); console.debug(` ${column}: ${before} <- ${after}`); } } this.position = evt.nextPosition; this.flushed = false; } async flushQueue() { if (this.flushed) return; this.debug('Flush', `filename: ${this.filename}, position: ${this.position}`); const replaceQuery = 'REPLACE INTO `binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?'; if (!this.config.testMode) await this.db.query(replaceQuery, [this.config.queue, this.filename, this.position]); this.flushed = true; } async connectionPing() { this.debug('Ping', 'Sending ping to database.'); // FIXME: Should Zongji.connection be pinged? await new Promise((resolve, reject) => { this.zongji.ctrlConnection.ping(err => { if (err) return reject(err); resolve(); }); }) await this.db.ping(); } debug(namespace, message) { if (this.config.debug) console.debug(`${namespace}:`.blue, message.yellow); } } function equals(a, b) { if (a === b) return true; const type = typeof a; if (a == null || b == null || type !== typeof b) return false; if (type === 'object' && a.constructor === b.constructor) { if (a instanceof Date) return a.getTime() === b.getTime(); } return false; } function formatValue(value) { if (value instanceof Date) return value.toJSON(); return value; }