require('require-yaml'); require('colors'); const fs = require('fs-extra'); const path = require('path'); const ZongJi = require('./zongji'); const mysql = require('mysql2/promise'); const amqp = require('amqplib'); const allEvents = [ 'writerows', 'updaterows', 'deleterows' ]; module.exports = class MyCDC { constructor() { this.running = false; this.filename = null; this.position = null; this.schemaMap = new Map(); this.queues = {}; } async start() { const defaultConfig = require('./config/producer.yml'); const conf = this.conf = Object.assign({}, defaultConfig); const localPath = path.join(__dirname, 'config/producer.local.yml'); if (fs.existsSync(localPath)) { const localConfig = require(localPath); Object.assign(conf, localConfig); } this.queuesConf = {}; const queueDir = path.join(__dirname, 'queues'); const queueFiles = await fs.readdir(queueDir); for (const queueFile of queueFiles) { const match = queueFile.match(/^([a-zA-Z0-9-_]+)\.ya?ml$/); if (!match) throw new Error(`Invalid queue file name '${queueFile}'`); this.queuesConf[match[1]] = require(path.join(queueDir, queueFile)); } const queues = this.queuesConf; for (const queueName in queues) { const includeSchema = queues[queueName].includeSchema; for (const schemaName in includeSchema) { let tableMap = this.schemaMap.get(schemaName); if (!tableMap) { tableMap = new Map(); this.schemaMap.set(schemaName, tableMap); } const schema = includeSchema[schemaName]; for (const tableName in schema) { const table = schema[tableName]; //if (typeof table !== 'object') continue; let tableInfo = tableMap.get(tableName); if (!tableInfo) { tableInfo = { queues: new Map(), events: new Map(), columns: false, fk: 'id' }; tableMap.set(tableName, tableInfo); } tableInfo.queues.set(queueName, table); const events = table.events || allEvents; for (const event of events) { let eventInfo = tableInfo.events.get(event); if (!eventInfo) { eventInfo = []; tableInfo.events.set(event, eventInfo); } eventInfo.push(queueName); } const columns = table.columns; if (columns) { if (tableInfo.columns === false) tableInfo.columns = new Set(); if (tableInfo.columns !== true) for (const column of columns) tableInfo.columns.add(column); } else tableInfo.columns = true; if (table.id) tableInfo.id = table.id; } this.schemaMap.set(schemaName, tableMap); } } const includeSchema = {}; for (const [schemaName, tableMap] of this.schemaMap) includeSchema[schemaName] = Array.from(tableMap.keys()); this.opts = { includeEvents: [ 'rotate', 'tablemap', 'writerows', 'updaterows', 'deleterows' ], includeSchema }; if (conf.testMode) console.log('Test mode enabled, just logging queries to console.'); console.log('Starting process.'); await this.init(); console.log('Process started.'); } async stop() { console.log('Stopping process.'); await this.end(); console.log('Process stopped.'); } async init() { const conf = this.conf; this.debug('MyCDC', 'Initializing.'); this.onErrorListener = err => this.onError(err); // DB connection this.db = await mysql.createConnection(conf.db); this.db.on('error', this.onErrorListener); // RabbitMQ this.publisher = await amqp.connect(conf.amqp); const channel = this.channel = await this.publisher.createChannel(); for (const tableMap of this.schemaMap.values()) { for (const tableName of tableMap.keys()) { await channel.assertExchange(tableName, 'headers', { durable: true }); } } for (const queueName in this.queuesConf) { const options = conf.deleteNonEmpty ? {} : {ifEmpty: true}; await channel.deleteQueue(queueName, {options}); await channel.assertQueue(queueName, { durable: true }); const includeSchema = this.queuesConf[queueName].includeSchema; for (const schemaName in includeSchema) { const schema = includeSchema[schemaName]; for (const tableName in schema) { const table = schema[tableName]; const events = table.events || allEvents; for (const event of events) { let args; if (event === 'updaterows' && table.columns) { args = {'x-match': 'any'}; table.columns.map(c => args[c] = true); } else args = {'z-event': event}; await channel.bindQueue(queueName, tableName, '', args); } } } } // Zongji const zongji = new ZongJi(conf.db); this.zongji = zongji; this.onBinlogListener = evt => this.onBinlog(evt); zongji.on('binlog', this.onBinlogListener); const [res] = await this.db.query( 'SELECT `logName`, `position` FROM `binlogQueue` WHERE code = ?', [conf.code] ); if (res.length) { const [row] = res; this.filename = row.logName; this.position = row.position; this.isFlushed = true; Object.assign(this.opts, { filename: this.filename, position: this.position }); } else this.opts.startAtEnd = true; this.debug('Zongji', 'Starting.'); await new Promise((resolve, reject) => { const onReady = () => { zongji.off('error', onError); resolve(); }; const onError = err => { this.zongji = null; zongji.off('ready', onReady); zongji.off('binlog', this.onBinlogListener); reject(err); } zongji.once('ready', onReady); zongji.once('error', onError); zongji.start(this.opts); }); this.debug('Zongji', 'Started.'); this.zongji.on('error', this.onErrorListener); this.flushInterval = setInterval( () => this.flushQueue(), conf.flushInterval * 1000); this.pingInterval = setInterval( () => this.connectionPing(), conf.pingInterval * 1000); // Summary this.running = true; this.debug('MyCDC', 'Initialized.'); } async end(silent) { const zongji = this.zongji; if (!zongji) return; this.debug('MyCDC', 'Ending.'); // Zongji clearInterval(this.flushInterval); clearInterval(this.pingInterval); zongji.off('binlog', this.onBinlogListener); zongji.off('error', this.onErrorListener); this.zongji = null; this.running = false; this.debug('Zongji', 'Stopping.'); // FIXME: Cannot call Zongji.stop(), it doesn't wait to end connection zongji.connection.destroy(() => { console.log('zongji.connection.destroy'); }); await new Promise(resolve => { zongji.ctrlConnection.query('KILL ?', [zongji.connection.threadId], err => { if (err && err.code !== 'ER_NO_SUCH_THREAD' && !silent) console.error(err); resolve(); }); }); zongji.ctrlConnection.destroy(() => { console.log('zongji.ctrlConnection.destroy'); }); zongji.emit('stopped'); this.debug('Zongji', 'Stopped.'); // RabbitMQ await this.publisher.close(); // DB connection this.db.off('error', this.onErrorListener); // FIXME: mysql2/promise bug, db.end() ends process this.db.on('error', () => {}); try { await this.db.end(); } catch (err) { if (!silent) console.error(err); } // Summary this.debug('MyCDC', 'Ended.'); } async tryRestart() { try { await this.init(); console.log('Process restarted.'); } catch(err) { setTimeout(() => this.tryRestart(), 30); } } async onError(err) { console.log(`Error: ${err.code}: ${err.message}`); try { await this.end(true); } catch(e) {} switch (err.code) { case 'PROTOCOL_CONNECTION_LOST': case 'ECONNRESET': console.log('Trying to restart process.'); await this.tryRestart(); break; default: process.exit(); } } onBinlog(evt) { //evt.dump(); const eventName = evt.getEventName(); let position = evt.nextPosition; switch (eventName) { case 'rotate': this.filename = evt.binlogName; position = evt.position; console.log(`[${eventName}] filename: ${this.filename}`, `position: ${this.position}, nextPosition: ${evt.nextPosition}`); break; case 'writerows': case 'deleterows': case 'updaterows': this.onRowEvent(evt, eventName); break; } this.position = position; this.isFlushed = false; } onRowEvent(evt, eventName) { const table = evt.tableMap[evt.tableId]; const tableName = table.tableName; const tableMap = this.schemaMap.get(table.parentSchema); if (!tableMap) return; const tableInfo = tableMap.get(tableName); if (!tableInfo) return; const queues = tableInfo.events.get(eventName); if (!queues) return; const isUpdate = eventName === 'updaterows'; let rows; let cols; if (isUpdate) { rows = []; cols = new Set(); let columns = tableInfo.columns === true ? Object.keys(evt.rows[0].after) : tableInfo.columns.keys(); for (const row of evt.rows) { let nColsChanged = 0; const after = row.after; for (const col of columns) { if (after[col] !== undefined && !equals(after[col], row.before[col])) { nColsChanged++; cols.add(col); } } if (nColsChanged) rows.push(row); } } else rows = evt.rows; if (!rows || !rows.length) return; const data = { eventName, table: tableName, schema: table.parentSchema, rows }; let headers = {}; headers['z-event'] = eventName; if (isUpdate) { for (const col of cols) headers[col] = true; data.cols = Array.from(cols); } const options = { persistent: true, headers }; const jsonData = JSON.stringify(data); this.channel.publish(tableName, '', Buffer.from(jsonData), options); if (this.debug) { // console.debug(data, options); console.debug('Queued:'.blue, `${tableName}(${rows.length}) [${eventName}]`); } } async flushQueue() { if (this.isFlushed) return; const {filename, position} = this; this.debug('Flush', `filename: ${filename}, position: ${position}`); const replaceQuery = 'REPLACE INTO `binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?'; if (!this.conf.testMode) await this.db.query(replaceQuery, [this.conf.code, filename, position]); this.isFlushed = true; } async connectionPing() { this.debug('Ping', 'Sending ping to database.'); // FIXME: Should Zongji.connection be pinged? await new Promise((resolve, reject) => { this.zongji.ctrlConnection.ping(err => { if (err) return reject(err); resolve(); }); }) await this.db.ping(); } debug(namespace, message) { if (this.conf.debug) console.debug(`${namespace}:`.blue, message.yellow); } } function equals(a, b) { if (a === b) return true; const type = typeof a; if (a == null || b == null || type !== typeof b) return false; if (type === 'object' && a.constructor === b.constructor) { if (a instanceof Date) return a.getTime() === b.getTime(); } return false; }