const ZongJi = require('./zongji'); const mysql = require('mysql2/promise'); const config = require('./config.json'); const zongji = new ZongJi(config.db); const schemaMap = new Map(); const allEvents = new Set([ 'writerows', 'updaterows', 'deleterows' ]); let db; let nextPosition; let filename; const fks = new Set(); async function main() { db = await mysql.createConnection(config.db); const includeSchema = {}; for (const schemaName in config.includeSchema) { const schema = config.includeSchema[schemaName]; const tables = []; const tableMap = new Map(); for (const tableName in schema) { const table = schema[tableName]; tables.push(tableName); const tableInfo = { events: allEvents, columns: true, fk: 'id' }; tableMap.set(tableName, tableInfo); if (typeof table === 'object') { if (Array.isArray(table.events)) tableInfo.events = new Set(table.events); if (Array.isArray(table.columns)) tableInfo.columns = new Set(table.columns); if (table.fk) tableInfo.fk = table.fk; } } includeSchema[schemaName] = tables; schemaMap.set(schemaName, tableMap); } const opts = { includeEvents: config.includeEvents, includeSchema }; const [res] = await db.query(config.queries.getPos); if (res.length) { const [row] = res; filename = row.logName; position = row.position; Object.assign(opts, {filename, position}); } else opts.startAtEnd = true; zongji.start(opts); setInterval(flushQueue, config.interval); console.log('Listenig binary log events.'); process.on('SIGINT', async function() { console.log('Got SIGINT.'); zongji.stop(); await db.end(); process.exit(); }); } async function flushQueue() { console.log('==========================================================') console.log('Flush:', `filename=${filename}`, `position=${nextPosition}`); console.log(fks); if (!fks.size) return; const ids = []; for (const fk of fks) ids.push([fk]); await db.query(config.queries.queue, [ids]); await db.query(config.queries.setPos, [filename, nextPosition]); fks.clear(); } zongji.on('binlog', function(evt) { //evt.dump(); const eventName = evt.getEventName(); const table = evt.tableMap[evt.tableId]; if (eventName === 'tablemap') return; const tableMap = schemaMap.get(table.parentSchema); if (!tableMap) return; const tableInfo = tableMap.get(table.tableName); if (!tableInfo) return; if (!tableInfo.events.has(eventName)) return; if (eventName === 'updaterows') { if (tableInfo.columns !== true) { for (const row of evt.rows) { const after = row.after; for (const col in after) { if (tableInfo.columns.has(col) && row.before[col] !== after[col]) { fks.add(after[tableInfo.fk]); break; } } } } else { for (const row of evt.rows) fks.add(row.after[tableInfo.fk]); } } else { for (const row of evt.rows) fks.add(row[tableInfo.fk]); } console.log(`[${eventName}] ${table.tableName}: ${evt.rows.length}`); nextPosition = evt.nextPosition; filename = zongji.options.filename; }); main();