require("require-yaml"); require("colors"); const ZongJi = require("./zongji"); const mysql = require("mysql2/promise"); const { loadConfig } = require("./lib/util"); const ModelLoader = require("./lib/model-loader"); const ShowDb = require("./lib/show-db"); const Salix = require("./lib/salix"); module.exports = class MyLogger { constructor() { this.running = false; this.isOk = null; this.binlogName = null; this.binlogPosition = null; this.isFlushed = true; this.queue = []; this.modelLoader = new ModelLoader(); this.showDb = new ShowDb(); } async start() { const conf = (this.conf = loadConfig(__dirname, "config")); const salix = new Salix(); await salix.init(this); // await salix.salixLogInfo(); this.modelLoader.init(this, salix.logInfo); this.showDb.init(this); const includeSchema = {}; for (const [schemaName, tableMap] of this.schemaMap.map) includeSchema[schemaName] = Array.from(tableMap.keys()); this.zongjiOpts = { includeEvents: [ "rotate", "tablemap", "writerows", "updaterows", "deleterows", ], includeSchema, serverId: conf.serverId, }; if (conf.testMode) console.log("Test mode enabled, just logging queries to console."); console.log("Starting process."); await this.init(); console.log("Process started."); } async stop() { console.log("Stopping process."); await this.end(); console.log("Process stopped."); } async init() { const { conf } = this; this.debug("MyLogger", "Initializing."); this.onErrorListener = (err) => this.onError(err); // DB connection const db = (this.db = await mysql.createConnection(conf.dstDb)); db.on("error", this.onErrorListener); await this.modelLoader.loadSchema(); await this.showDb.loadSchema(); for (const logInfo of this.logMap.values()) { const table = logInfo.table; const sqlTable = `${db.escapeId(table.schema)}.${db.escapeId( table.name )}`; logInfo.addStmt = await db.prepare( `INSERT INTO ${sqlTable} SET originFk = ?, userFk = ?, action = ?, creationDate = ?, changedModel = ?, oldInstance = ?, newInstance = ?, changedModelId = ?, changedModelValue = ?` ); logInfo.fetchStmt = await db.prepare( `SELECT id FROM ${sqlTable} WHERE changedModel = ? AND changedModelId = ? AND action = 'delete' AND (originFk IS NULL OR originFk = ?) LIMIT 1` ); logInfo.updateStmt = await db.prepare( `UPDATE ${sqlTable} SET originFk = ?, creationDate = ?, oldInstance = ?, changedModelValue = ? WHERE id = ?` ); } // Zongji this.onBinlogListener = (evt) => this.onBinlog(evt); const [res] = await db.query( "SELECT `logName`, `position` FROM `util`.`binlogQueue` WHERE code = ?", [conf.code] ); if (res.length) { const [row] = res; this.binlogName = row.logName; this.binlogPosition = row.position; } await this.zongjiStart(); this.flushInterval = setInterval( () => this.flushQueue(), conf.flushInterval * 1000 ); this.pingInterval = setInterval( () => this.connectionPing(), conf.pingInterval * 1000 ); // Summary this.running = true; this.isOk = true; this.debug("MyLogger", "Initialized."); } async end(silent) { if (!this.running) return; this.running = false; this.debug("MyLogger", "Ending."); this.db.off("error", this.onErrorListener); clearInterval(this.flushInterval); clearInterval(this.pingInterval); clearInterval(this.flushTimeout); function logError(err) { if (!silent) console.error(err); } try { await this.flushQueue(); } catch (err) { logError(err); } // Zongji await this.zongjiStop(); this.zongji = null; // DB connection // FIXME: mysql2/promise bug, db.end() ends process this.db.on("error", () => {}); try { this.db.end(); } catch (err) { logError(err); } // Summary this.debug("MyLogger", "Ended."); } async zongjiStart() { await this.zongjiStop(); const zongji = new ZongJi(this.conf.srcDb); const zongjiOpts = this.zongjiOpts; if (this.binlogName) { this.debug( "Zongji", `Starting: ${this.binlogName}, position: ${this.binlogPosition}` ); Object.assign(zongjiOpts, { filename: this.binlogName, position: this.binlogPosition, }); } else { this.debug("Zongji", "Starting at end."); zongjiOpts.startAtEnd = true; } zongji.on("binlog", this.onBinlogListener); await new Promise((resolve, reject) => { const onReady = () => { zongji.off("error", onError); resolve(); }; const onError = (err) => { zongji.off("ready", onReady); zongji.off("binlog", this.onBinlogListener); reject(err); }; zongji.once("ready", onReady); zongji.once("error", onError); zongji.start(zongjiOpts); }); zongji.on("error", this.onErrorListener); this.zongji = zongji; this.debug("Zongji", "Started."); } async zongjiStop() { if (!this.zongji) return; this.debug( "Zongji", `Stopping: ${this.binlogName}, position: ${this.binlogPosition}` ); const zongji = this.zongji; this.zongji = null; zongji.off("binlog", this.onBinlogListener); zongji.off("error", this.onErrorListener); // FIXME: Cannot call Zongji.stop(), it doesn't wait to end connection zongji.connection.destroy(() => { console.log("zongji.connection.destroy"); }); await new Promise((resolve) => { zongji.ctrlConnection.query( "KILL ?", [zongji.connection.threadId], (err) => { // if (err && err.code !== 'ER_NO_SUCH_THREAD'); // console.error(err); resolve(); } ); }); zongji.ctrlConnection.destroy(() => { console.log("zongji.ctrlConnection.destroy"); }); zongji.emit("stopped"); this.debug("Zongji", "Stopped."); } async tryRestart() { try { await this.init(); console.log("Process restarted."); } catch (err) { setTimeout(() => this.tryRestart(), this.conf.restartTimeout * 1000); } } async onError(err) { if (!this.isOk) return; this.isOk = false; console.log(`Error: ${err.code}: ${err.message}`); try { await this.end(true); } catch (e) {} switch (err.code) { case "PROTOCOL_CONNECTION_LOST": case "ECONNRESET": console.log("Trying to restart process."); await this.tryRestart(); break; default: process.exit(); } } handleError(err) { console.error(err); } async onBinlog(evt) { //evt.dump(); try { let shouldFlush; const eventName = evt.getEventName(); if (eventName == "rotate") { if (evt.binlogName !== this.binlogName) { shouldFlush = true; this.binlogName = evt.binlogName; this.binlogPosition = evt.position; console.log( `[${eventName}] filename: ${this.binlogName}`, `position: ${this.binlogPosition}` ); } } else { shouldFlush = true; this.binlogPosition = evt.nextPosition; if (catchEvents.has(eventName)) this.onRowEvent(evt, eventName); } if (shouldFlush) this.isFlushed = false; if (this.queue.length > this.conf.maxQueueEvents) { this.debug("MyLogger", "Queue full, stopping Zongji."); await this.zongjiStop(); } } catch (err) { this.handleError(err); } } onRowEvent(evt, eventName) { const table = evt.tableMap[evt.tableId]; const tableName = table.tableName; const tableInfo = this.schemaMap.get(table.parentSchema, tableName); if (!tableInfo) return; const action = actions[eventName]; const { rowExcludeField } = tableInfo; const changes = []; function isExcluded(row) { return rowExcludeField && row[rowExcludeField]; } function cast(value, type) { if (value == null || !type) return value; const fn = castFn[type]; return fn ? fn(value) : value; } function equals(a, b) { if (a === b) return true; const type = typeof a; if (a == null || b == null || type !== typeof b) return false; if (type === "object" && a.constructor === b.constructor) { if (a instanceof Date) { // FIXME: zongji creates invalid dates for NULL DATE // Error is somewhere here: zongji/lib/rows_event.js:129 let aTime = a.getTime(); if (isNaN(aTime)) aTime = null; let bTime = b.getTime(); if (isNaN(bTime)) bTime = null; return aTime === bTime; } } return false; } const { castTypes } = tableInfo; if (action == "update") { const cols = tableInfo.columns; for (const row of evt.rows) { const after = row.after; if (isExcluded(after)) continue; const before = row.before; const oldI = {}; const newI = {}; let nColsChanged = 0; for (const col in before) { if (!cols.has(col)) continue; const type = castTypes.get(col); const oldValue = cast(before[col], type); const newValue = cast(after[col], type); if (!equals(oldValue, newValue)) { oldI[col] = oldValue; newI[col] = newValue; nColsChanged++; } } if (nColsChanged) changes.push({ row: after, oldI, newI }); } } else { const cols = tableInfo.instanceColumns; for (const row of evt.rows) { if (isExcluded(row)) continue; const instance = {}; for (const col of cols) { if (row[col] == null) continue; const type = castTypes.get(col); instance[col] = cast(row[col], type); } changes.push({ row, instance }); } } if (!changes.length) return; this.queue.push({ tableInfo, action, evt, changes, binlogName: this.binlogName, }); if (!this.flushTimeout) this.flushTimeout = setTimeout( () => this.flushQueue(), this.conf.queueFlushDelay ); if (this.conf.debug) console.debug( "Evt:".blue, `[${action}]`[actionColor[action]], `${tableName}: ${changes.length} changes, queue: ${this.queue.length} elements` ); } async flushQueue() { if (this.isFlushed || this.isFlushing || !this.isOk) return; this.isFlushing = true; const { conf, db, queue } = this; let op; try { if (queue.length) { do { const ops = []; let txStarted; try { for (let i = 0; i < conf.maxBulkLog && queue.length; i++) ops.push(queue.shift()); await this.showDb.getValues(db, ops); await db.query("START TRANSACTION"); txStarted = true; for (op of ops) await this.applyOp(op); this.debug( "Queue", `applied: ${ops.length}, remaining: ${queue.length}` ); await this.savePosition(op.binlogName, op.evt.nextPosition); await db.query("COMMIT"); } catch (err) { queue.unshift(...ops); if (txStarted) try { await db.query("ROLLBACK"); } catch (err) {} throw err; } } while (queue.length); if (!this.zongji) { this.debug("MyLogger", "Queue flushed, restarting Zongji."); await this.zongjiStart(); } } else { await this.savePosition(this.binlogName, this.binlogPosition); } } catch (err) { this.handleError(err); } finally { this.flushTimeout = null; this.isFlushing = false; } } async applyOp(op) { const { conf } = this; const { tableInfo, action, evt, changes } = op; const { logInfo, isMain, relation, modelName, logFields } = tableInfo; const isDelete = action == "delete"; const isUpdate = action == "update"; const created = new Date(evt.timestamp); for (const change of changes) { let newI, oldI; const row = change.row; switch (action) { case "update": newI = change.newI; oldI = change.oldI; if (logFields) { for (const field of logFields) if (newI[field] === undefined) newI[field] = row[field]; } break; case "insert": newI = change.instance; break; case "delete": oldI = change.instance; break; } const modelId = row[tableInfo.idName]; const modelValue = change.modelValue ?? null; const oldInstance = oldI ? JSON.stringify(oldI) : null; const originFk = !isMain ? row[relation] : modelId; const originChanged = isUpdate && !isMain && newI[relation] !== undefined; let deleteRow; if (conf.debug) console.debug( "Log:".blue, `[${action}]`[actionColor[action]], `${logInfo.name}: ${originFk}, ${modelName}: ${modelId}` ); try { if (isDelete) { [[deleteRow]] = await logInfo.fetchStmt.execute([ modelName, modelId, originFk, ]); if (!conf.testMode && deleteRow) await logInfo.updateStmt.execute([ originFk, created, oldInstance, modelValue, deleteRow.id, ]); } if (!conf.testMode && (!isDelete || !deleteRow)) { async function log(originFk) { if (originFk == null) return; await logInfo.addStmt.execute([ originFk, row[tableInfo.userField] ?? null, action, created, modelName, oldInstance, newI ? JSON.stringify(newI) : null, modelId, modelValue, ]); } if (originChanged) await log(oldI[relation]); await log(originFk); } } catch (err) { if (err.code == "ER_NO_REFERENCED_ROW_2") { this.debug("Log", `Ignored because of constraint failed.`); } else throw err; } } } async savePosition(binlogName, binlogPosition) { this.debug("Flush", `filename: ${binlogName}, position: ${binlogPosition}`); const replaceQuery = "REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?"; if (!this.conf.testMode) await this.db.query(replaceQuery, [ this.conf.code, binlogName, binlogPosition, ]); this.isFlushed = this.binlogName == binlogName && this.binlogPosition == binlogPosition; } async connectionPing() { if (!this.isOk) return; try { this.debug("Ping", "Sending ping to database."); if (this.zongji) { // FIXME: Should Zongji.connection be pinged? await new Promise((resolve, reject) => { this.zongji.ctrlConnection.ping((err) => { if (err) return reject(err); resolve(); }); }); } await this.db.ping(); } catch (err) { this.handleError(err); } } debug(namespace, message) { if (this.conf.debug) console.debug(`${namespace}:`.blue, message.yellow); } }; const catchEvents = new Set(["writerows", "updaterows", "deleterows"]); const actions = { writerows: "insert", updaterows: "update", deleterows: "delete", }; const actionColor = { insert: "green", update: "yellow", delete: "red", }; const castFn = { boolean: function (value) { return !!value; }, };