diff --git a/lib/model-loader.js b/lib/model-loader.js index 0c3a998..b66ec70 100644 --- a/lib/model-loader.js +++ b/lib/model-loader.js @@ -6,7 +6,7 @@ const MultiMap = require('./multi-map'); * Loads model configuration. */ module.exports = class ModelLoader { - init(logger) { + init(logger, logs = []) { const configDir = path.join(__dirname, '..'); const conf = loadConfig(configDir, 'logs'); const schemaMap = new MultiMap(); diff --git a/mylogger.js b/mylogger.js index be2c508..480c17f 100644 --- a/mylogger.js +++ b/mylogger.js @@ -1,10 +1,11 @@ -require('require-yaml'); -require('colors'); -const ZongJi = require('./zongji'); -const mysql = require('mysql2/promise'); -const {loadConfig} = require('./lib/util'); -const ModelLoader = require('./lib/model-loader'); -const ShowDb = require('./lib/show-db'); +require("require-yaml"); +require("colors"); +const ZongJi = require("./zongji"); +const mysql = require("mysql2/promise"); +const { loadConfig } = require("./lib/util"); +const ModelLoader = require("./lib/model-loader"); +const ShowDb = require("./lib/show-db"); +const Salix = require("./lib/salix"); module.exports = class MyLogger { constructor() { @@ -19,57 +20,61 @@ module.exports = class MyLogger { } async start() { - const conf = this.conf = loadConfig(__dirname, 'config'); - - this.modelLoader.init(this); + const conf = (this.conf = loadConfig(__dirname, "config")); + const salix = new Salix(); + await salix.init(this); + // await salix.salixLogInfo(); + this.modelLoader.init(this, salix.logInfo); this.showDb.init(this); const includeSchema = {}; for (const [schemaName, tableMap] of this.schemaMap.map) includeSchema[schemaName] = Array.from(tableMap.keys()); - + this.zongjiOpts = { includeEvents: [ - 'rotate', - 'tablemap', - 'writerows', - 'updaterows', - 'deleterows' + "rotate", + "tablemap", + "writerows", + "updaterows", + "deleterows", ], includeSchema, - serverId: conf.serverId + serverId: conf.serverId, }; if (conf.testMode) - console.log('Test mode enabled, just logging queries to console.'); + console.log("Test mode enabled, just logging queries to console."); - console.log('Starting process.'); + console.log("Starting process."); await this.init(); - console.log('Process started.'); + console.log("Process started."); } async stop() { - console.log('Stopping process.'); + console.log("Stopping process."); await this.end(); - console.log('Process stopped.'); + console.log("Process stopped."); } async init() { - const {conf} = this; - this.debug('MyLogger', 'Initializing.'); - this.onErrorListener = err => this.onError(err); + const { conf } = this; + this.debug("MyLogger", "Initializing."); + this.onErrorListener = (err) => this.onError(err); // DB connection - const db = this.db = await mysql.createConnection(conf.dstDb); - db.on('error', this.onErrorListener); + const db = (this.db = await mysql.createConnection(conf.dstDb)); + db.on("error", this.onErrorListener); await this.modelLoader.loadSchema(); await this.showDb.loadSchema(); for (const logInfo of this.logMap.values()) { const table = logInfo.table; - const sqlTable = `${db.escapeId(table.schema)}.${db.escapeId(table.name)}`; + const sqlTable = `${db.escapeId(table.schema)}.${db.escapeId( + table.name + )}`; logInfo.addStmt = await db.prepare( `INSERT INTO ${sqlTable} SET originFk = ?, @@ -102,10 +107,10 @@ module.exports = class MyLogger { // Zongji - this.onBinlogListener = evt => this.onBinlog(evt); + this.onBinlogListener = (evt) => this.onBinlog(evt); const [res] = await db.query( - 'SELECT `logName`, `position` FROM `util`.`binlogQueue` WHERE code = ?', + "SELECT `logName`, `position` FROM `util`.`binlogQueue` WHERE code = ?", [conf.code] ); if (res.length) { @@ -117,23 +122,27 @@ module.exports = class MyLogger { await this.zongjiStart(); this.flushInterval = setInterval( - () => this.flushQueue(), conf.flushInterval * 1000); + () => this.flushQueue(), + conf.flushInterval * 1000 + ); this.pingInterval = setInterval( - () => this.connectionPing(), conf.pingInterval * 1000); + () => this.connectionPing(), + conf.pingInterval * 1000 + ); // Summary this.running = true; this.isOk = true; - this.debug('MyLogger', 'Initialized.'); + this.debug("MyLogger", "Initialized."); } async end(silent) { if (!this.running) return; this.running = false; - this.debug('MyLogger', 'Ending.'); - - this.db.off('error', this.onErrorListener); + this.debug("MyLogger", "Ending."); + + this.db.off("error", this.onErrorListener); clearInterval(this.flushInterval); clearInterval(this.pingInterval); @@ -157,7 +166,7 @@ module.exports = class MyLogger { // DB connection // FIXME: mysql2/promise bug, db.end() ends process - this.db.on('error', () => {}); + this.db.on("error", () => {}); try { this.db.end(); } catch (err) { @@ -166,7 +175,7 @@ module.exports = class MyLogger { // Summary - this.debug('MyLogger', 'Ended.'); + this.debug("MyLogger", "Ended."); } async zongjiStart() { @@ -175,75 +184,80 @@ module.exports = class MyLogger { const zongjiOpts = this.zongjiOpts; if (this.binlogName) { - this.debug('Zongji', + this.debug( + "Zongji", `Starting: ${this.binlogName}, position: ${this.binlogPosition}` ); Object.assign(zongjiOpts, { filename: this.binlogName, - position: this.binlogPosition + position: this.binlogPosition, }); } else { - this.debug('Zongji', 'Starting at end.'); + this.debug("Zongji", "Starting at end."); zongjiOpts.startAtEnd = true; } - zongji.on('binlog', this.onBinlogListener); + zongji.on("binlog", this.onBinlogListener); await new Promise((resolve, reject) => { const onReady = () => { - zongji.off('error', onError); + zongji.off("error", onError); resolve(); }; - const onError = err => { - zongji.off('ready', onReady); - zongji.off('binlog', this.onBinlogListener); + const onError = (err) => { + zongji.off("ready", onReady); + zongji.off("binlog", this.onBinlogListener); reject(err); - } + }; - zongji.once('ready', onReady); - zongji.once('error', onError); + zongji.once("ready", onReady); + zongji.once("error", onError); zongji.start(zongjiOpts); }); - zongji.on('error', this.onErrorListener); + zongji.on("error", this.onErrorListener); this.zongji = zongji; - this.debug('Zongji', 'Started.'); + this.debug("Zongji", "Started."); } async zongjiStop() { if (!this.zongji) return; - this.debug('Zongji', + this.debug( + "Zongji", `Stopping: ${this.binlogName}, position: ${this.binlogPosition}` ); const zongji = this.zongji; this.zongji = null; - zongji.off('binlog', this.onBinlogListener); - zongji.off('error', this.onErrorListener); + zongji.off("binlog", this.onBinlogListener); + zongji.off("error", this.onErrorListener); // FIXME: Cannot call Zongji.stop(), it doesn't wait to end connection zongji.connection.destroy(() => { - console.log('zongji.connection.destroy'); + console.log("zongji.connection.destroy"); }); - await new Promise(resolve => { - zongji.ctrlConnection.query('KILL ?', [zongji.connection.threadId], - err => { - // if (err && err.code !== 'ER_NO_SUCH_THREAD'); - // console.error(err); - resolve(); - }); + await new Promise((resolve) => { + zongji.ctrlConnection.query( + "KILL ?", + [zongji.connection.threadId], + (err) => { + // if (err && err.code !== 'ER_NO_SUCH_THREAD'); + // console.error(err); + resolve(); + } + ); }); zongji.ctrlConnection.destroy(() => { - console.log('zongji.ctrlConnection.destroy'); + console.log("zongji.ctrlConnection.destroy"); }); - zongji.emit('stopped'); - this.debug('Zongji', 'Stopped.'); + zongji.emit("stopped"); + this.debug("Zongji", "Stopped."); } async tryRestart() { try { await this.init(); - console.log('Process restarted.'); - } catch(err) { + console.log("Process restarted."); + } catch (err) { setTimeout(() => this.tryRestart(), this.conf.restartTimeout * 1000); } } @@ -255,16 +269,16 @@ module.exports = class MyLogger { try { await this.end(true); - } catch(e) {} + } catch (e) {} switch (err.code) { - case 'PROTOCOL_CONNECTION_LOST': - case 'ECONNRESET': - console.log('Trying to restart process.'); - await this.tryRestart(); - break; - default: - process.exit(); + case "PROTOCOL_CONNECTION_LOST": + case "ECONNRESET": + console.log("Trying to restart process."); + await this.tryRestart(); + break; + default: + process.exit(); } } @@ -278,7 +292,7 @@ module.exports = class MyLogger { let shouldFlush; const eventName = evt.getEventName(); - if (eventName == 'rotate') { + if (eventName == "rotate") { if (evt.binlogName !== this.binlogName) { shouldFlush = true; this.binlogName = evt.binlogName; @@ -291,17 +305,16 @@ module.exports = class MyLogger { } else { shouldFlush = true; this.binlogPosition = evt.nextPosition; - if (catchEvents.has(eventName)) - this.onRowEvent(evt, eventName); + if (catchEvents.has(eventName)) this.onRowEvent(evt, eventName); } if (shouldFlush) this.isFlushed = false; if (this.queue.length > this.conf.maxQueueEvents) { - this.debug('MyLogger', 'Queue full, stopping Zongji.'); + this.debug("MyLogger", "Queue full, stopping Zongji."); await this.zongjiStop(); } - } catch(err) { + } catch (err) { this.handleError(err); } } @@ -313,7 +326,7 @@ module.exports = class MyLogger { if (!tableInfo) return; const action = actions[eventName]; - const {rowExcludeField} = tableInfo; + const { rowExcludeField } = tableInfo; const changes = []; function isExcluded(row) { @@ -321,20 +334,17 @@ module.exports = class MyLogger { } function cast(value, type) { - if (value == null || !type) - return value; + if (value == null || !type) return value; const fn = castFn[type]; return fn ? fn(value) : value; } function equals(a, b) { - if (a === b) - return true; + if (a === b) return true; const type = typeof a; - if (a == null || b == null || type !== typeof b) - return false; - if (type === 'object' && a.constructor === b.constructor) { + if (a == null || b == null || type !== typeof b) return false; + if (type === "object" && a.constructor === b.constructor) { if (a instanceof Date) { // FIXME: zongji creates invalid dates for NULL DATE // Error is somewhere here: zongji/lib/rows_event.js:129 @@ -342,16 +352,16 @@ module.exports = class MyLogger { if (isNaN(aTime)) aTime = null; let bTime = b.getTime(); if (isNaN(bTime)) bTime = null; - + return aTime === bTime; } } return false; } - const {castTypes} = tableInfo; + const { castTypes } = tableInfo; - if (action == 'update') { + if (action == "update") { const cols = tableInfo.columns; for (const row of evt.rows) { @@ -375,8 +385,7 @@ module.exports = class MyLogger { } } - if (nColsChanged) - changes.push({row: after, oldI, newI}); + if (nColsChanged) changes.push({ row: after, oldI, newI }); } } else { const cols = tableInfo.instanceColumns; @@ -391,7 +400,7 @@ module.exports = class MyLogger { instance[col] = cast(row[col], type); } - changes.push({row, instance}); + changes.push({ row, instance }); } } @@ -402,7 +411,7 @@ module.exports = class MyLogger { action, evt, changes, - binlogName: this.binlogName + binlogName: this.binlogName, }); if (!this.flushTimeout) @@ -412,7 +421,8 @@ module.exports = class MyLogger { ); if (this.conf.debug) - console.debug('Evt:'.blue, + console.debug( + "Evt:".blue, `[${action}]`[actionColor[action]], `${tableName}: ${changes.length} changes, queue: ${this.queue.length} elements` ); @@ -421,7 +431,7 @@ module.exports = class MyLogger { async flushQueue() { if (this.isFlushed || this.isFlushing || !this.isOk) return; this.isFlushing = true; - const {conf, db, queue} = this; + const { conf, db, queue } = this; let op; try { @@ -436,34 +446,36 @@ module.exports = class MyLogger { await this.showDb.getValues(db, ops); - await db.query('START TRANSACTION'); + await db.query("START TRANSACTION"); txStarted = true; - - for (op of ops) - await this.applyOp(op); - this.debug('Queue', `applied: ${ops.length}, remaining: ${queue.length}`); + for (op of ops) await this.applyOp(op); + + this.debug( + "Queue", + `applied: ${ops.length}, remaining: ${queue.length}` + ); await this.savePosition(op.binlogName, op.evt.nextPosition); - await db.query('COMMIT'); - } catch(err) { + await db.query("COMMIT"); + } catch (err) { queue.unshift(...ops); if (txStarted) try { - await db.query('ROLLBACK'); + await db.query("ROLLBACK"); } catch (err) {} throw err; } } while (queue.length); if (!this.zongji) { - this.debug('MyLogger', 'Queue flushed, restarting Zongji.'); + this.debug("MyLogger", "Queue flushed, restarting Zongji."); await this.zongjiStart(); } } else { await this.savePosition(this.binlogName, this.binlogPosition); } - } catch(err) { + } catch (err) { this.handleError(err); } finally { this.flushTimeout = null; @@ -472,43 +484,31 @@ module.exports = class MyLogger { } async applyOp(op) { - const {conf} = this; - const { - tableInfo, - action, - evt, - changes - } = op; - const { - logInfo, - isMain, - relation, - modelName, - logFields - } = tableInfo; + const { conf } = this; + const { tableInfo, action, evt, changes } = op; + const { logInfo, isMain, relation, modelName, logFields } = tableInfo; - const isDelete = action == 'delete'; - const isUpdate = action == 'update'; + const isDelete = action == "delete"; + const isUpdate = action == "update"; const created = new Date(evt.timestamp); for (const change of changes) { let newI, oldI; const row = change.row; - switch(action) { - case 'update': + switch (action) { + case "update": newI = change.newI; oldI = change.oldI; if (logFields) { for (const field of logFields) - if (newI[field] === undefined) - newI[field] = row[field]; + if (newI[field] === undefined) newI[field] = row[field]; } break; - case 'insert': + case "insert": newI = change.instance; break; - case 'delete': + case "delete": oldI = change.instance; break; } @@ -517,12 +517,12 @@ module.exports = class MyLogger { const modelValue = change.modelValue ?? null; const oldInstance = oldI ? JSON.stringify(oldI) : null; const originFk = !isMain ? row[relation] : modelId; - const originChanged = isUpdate && !isMain - && newI[relation] !== undefined; + const originChanged = isUpdate && !isMain && newI[relation] !== undefined; let deleteRow; if (conf.debug) - console.debug('Log:'.blue, + console.debug( + "Log:".blue, `[${action}]`[actionColor[action]], `${logInfo.name}: ${originFk}, ${modelName}: ${modelId}` ); @@ -530,7 +530,9 @@ module.exports = class MyLogger { try { if (isDelete) { [[deleteRow]] = await logInfo.fetchStmt.execute([ - modelName, modelId, originFk + modelName, + modelId, + originFk, ]); if (!conf.testMode && deleteRow) await logInfo.updateStmt.execute([ @@ -538,7 +540,7 @@ module.exports = class MyLogger { created, oldInstance, modelValue, - deleteRow.id + deleteRow.id, ]); } if (!conf.testMode && (!isDelete || !deleteRow)) { @@ -553,82 +555,79 @@ module.exports = class MyLogger { oldInstance, newI ? JSON.stringify(newI) : null, modelId, - modelValue + modelValue, ]); } - if (originChanged) - await log(oldI[relation]); + if (originChanged) await log(oldI[relation]); await log(originFk); } } catch (err) { - if (err.code == 'ER_NO_REFERENCED_ROW_2') { - this.debug('Log', `Ignored because of constraint failed.`); - } else - throw err; + if (err.code == "ER_NO_REFERENCED_ROW_2") { + this.debug("Log", `Ignored because of constraint failed.`); + } else throw err; } } } async savePosition(binlogName, binlogPosition) { - this.debug('Flush', `filename: ${binlogName}, position: ${binlogPosition}`); - - const replaceQuery = - 'REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?'; - if (!this.conf.testMode) - await this.db.query(replaceQuery, [this.conf.code, binlogName, binlogPosition]); + this.debug("Flush", `filename: ${binlogName}, position: ${binlogPosition}`); - this.isFlushed = this.binlogName == binlogName - && this.binlogPosition == binlogPosition; + const replaceQuery = + "REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?"; + if (!this.conf.testMode) + await this.db.query(replaceQuery, [ + this.conf.code, + binlogName, + binlogPosition, + ]); + + this.isFlushed = + this.binlogName == binlogName && this.binlogPosition == binlogPosition; } async connectionPing() { if (!this.isOk) return; try { - this.debug('Ping', 'Sending ping to database.'); + this.debug("Ping", "Sending ping to database."); if (this.zongji) { // FIXME: Should Zongji.connection be pinged? await new Promise((resolve, reject) => { - this.zongji.ctrlConnection.ping(err => { + this.zongji.ctrlConnection.ping((err) => { if (err) return reject(err); resolve(); }); - }) + }); } await this.db.ping(); - } catch(err) { + } catch (err) { this.handleError(err); } } debug(namespace, message) { - if (this.conf.debug) - console.debug(`${namespace}:`.blue, message.yellow); + if (this.conf.debug) console.debug(`${namespace}:`.blue, message.yellow); } -} +}; -const catchEvents = new Set([ - 'writerows', - 'updaterows', - 'deleterows' -]); +const catchEvents = new Set(["writerows", "updaterows", "deleterows"]); const actions = { - writerows: 'insert', - updaterows: 'update', - deleterows: 'delete' + writerows: "insert", + updaterows: "update", + deleterows: "delete", }; const actionColor = { - insert: 'green', - update: 'yellow', - delete: 'red' + insert: "green", + update: "yellow", + delete: "red", }; const castFn = { - boolean: function(value) { + boolean: function (value) { return !!value; - } + }, };