diff --git a/lib/salix.js b/lib/salix.js index 653978d..fedaa98 100644 --- a/lib/salix.js +++ b/lib/salix.js @@ -1,14 +1,120 @@ +const fs = require("fs"); +const path = require("path"); +const SALIX_VERSION_HEADER = "Salix-version"; +const CONFIG_FILENAME = '/config/salix.local.yml'; +const i18n = { + versionChanged: "La versión ha cambiado", + deprecatedVersion: "La configuracion guardada está desactualizada", + noDeprecatedVersion: "La configuración guardada no requiere actualización", + erroFs:{ + noExists: 'El archivo no existe', + noWrite: 'Error al escribir el archivo YAML:', + noRead: 'Error al leer el archivo YAML:' + } +}; module.exports = class Salix { constructor() { this.conf = null; this._logInfo = []; this._salixVersion = null; + this._salixConfig = null; } async init(logger) { this.conf = logger.conf; + const salixFileConfig = await this.loadConfig(); + try { + // No exite fichero almacenado. Solicito config y version + if (!salixFileConfig) await this.salixLogInfo(); + else { + this._salixConfig = salixFileConfig; + //Obtengo la version + await this.getSalixVersion(false); + this._salixVersion && this.handleVersion(salixFileConfig); + this._logInfo = salixFileConfig.log; + } + } catch (error) { + } - setInterval(() => this.getSalixVersion, this.conf.salix.renewInterval); - await this.salixLogInfo(); + setInterval( + () => this.getSalixVersion(), + this.conf.salix.renewInterval * 1000 + ); + } + get filePath() { + // Leer el contenido del archivo YAML + const configDir = path.join(__dirname, ".."); + return `${configDir}${CONFIG_FILENAME}`; + } + async loadConfig() { + try { + if (fs.existsSync(this.filePath)) { + const contenidoYAML = require(this.filePath); + return Object.assign({}, contenidoYAML); + } + } catch (error) { + if (error.code === "ENOENT") { + console.error(i18n.erroFs.noExists); + } else { + console.error(i18n.erroFs.noRead, error); + } + return null; + } + } + + async saveConfig(configValue) { + const defaultConfig = { + salixVersion: this._salixVersion, + }; + try { + const contenidoYAML = fs.writeFileSync( + this.filePath, + require("js-yaml").dump( + Object.assign(defaultConfig, { log: configValue }) + ), + "utf-8" + ); + return Object.assign({}, contenidoYAML); + } catch (error) { + if (error.code === "ENOENT") { + console.error(i18n.erroFs.noExists); + } else { + console.error(i18n.erroFs.noWrite, error); + } + return null; + } + } + + parseVersion(value) { + return value.split(".").map(Number); + } + + async handleVersion(salixFileConfig) { + const isDeprecated = this.compareVersion( + this._salixVersion, + salixFileConfig.salixVersion + ); + isDeprecated && (await this.salixLogInfo()); + } + compareVersion(v1, v2) { + let deprecatedConfig = false; + const version1 = this.parseVersion(v1); + const version2 = this.parseVersion(v2); + for (let i = 0; i < 3; i++) { + if (version1[i] > version2[i]) { + console.log( + // `La versión ${v1} es mayor que la versión ${v2}` + i18n.deprecatedVersion + ); + deprecatedConfig = true; + } else if (version1[i] < version2[i]) { + console.log( + // `La versión ${v1} es menor que la versión ${v2}` + i18n.noDeprecatedVersion + ); + deprecatedConfig = false; + } + } + return deprecatedConfig; } async fetch(path) { @@ -21,28 +127,39 @@ module.exports = class Salix { console.error(response.error); throw new Error(response.error.message); } - this._salixVersion = salixCall.headers.get("Salix-version"); + const newVersion = salixCall.headers.get(SALIX_VERSION_HEADER); + if (this._salixVersion) { + const isDeprecated = this.compareVersion( + this._salixVersion, + newVersion + ); + if (isDeprecated) { + this._salixVersion = newVersion; + console.info(i18n.versionChanged); + await this.salixLogInfo(); + } + } else this._salixVersion = newVersion; return response; - } catch (error) { - throw new Error(error.message); + } catch ({ message }) { + console.error(message); + if (!this._salixConfig) throw new Error(message); } } async getSalixVersion() { this.log("CHECK VERSION"); - const salixVersion = await this.fetch("applications/status"); - if (this._salixVersion !== salixVersion) fetch(); + await this.fetch(this.conf.salix.api.status); this.log("VERSION CHECKED"); } async salixLogInfo() { - this.log("LOGINFO REQUEST"); - let salixLogConfig = await this.fetch("schemas/logInfo"); + this.log("REQUEST LOGINFO"); + let salixLogConfig = await this.fetch(this.conf.salix.api.logInfo); this._logInfo = salixLogConfig; - + this.saveConfig(salixLogConfig); this.log("LOGINFO REQUESTED"); } diff --git a/mylogger.js b/mylogger.js index 480c17f..76823ae 100644 --- a/mylogger.js +++ b/mylogger.js @@ -1,11 +1,11 @@ -require("require-yaml"); -require("colors"); -const ZongJi = require("./zongji"); -const mysql = require("mysql2/promise"); -const { loadConfig } = require("./lib/util"); -const ModelLoader = require("./lib/model-loader"); -const ShowDb = require("./lib/show-db"); -const Salix = require("./lib/salix"); +require('require-yaml'); +require('colors'); +const ZongJi = require('./zongji'); +const mysql = require('mysql2/promise'); +const { loadConfig } = require('./lib/util'); +const ModelLoader = require('./lib/model-loader'); +const ShowDb = require('./lib/show-db'); +const Salix = require('./lib/salix'); module.exports = class MyLogger { constructor() { @@ -20,10 +20,11 @@ module.exports = class MyLogger { } async start() { - const conf = (this.conf = loadConfig(__dirname, "config")); + const conf = (this.conf = loadConfig(__dirname, 'config')); const salix = new Salix(); await salix.init(this); // await salix.salixLogInfo(); + // salix.registerNewListener((val) => console.log(`New Value: ${val}`)); this.modelLoader.init(this, salix.logInfo); this.showDb.init(this); @@ -33,39 +34,39 @@ module.exports = class MyLogger { this.zongjiOpts = { includeEvents: [ - "rotate", - "tablemap", - "writerows", - "updaterows", - "deleterows", + 'rotate', + 'tablemap', + 'writerows', + 'updaterows', + 'deleterows', ], includeSchema, serverId: conf.serverId, }; if (conf.testMode) - console.log("Test mode enabled, just logging queries to console."); + console.log('Test mode enabled, just logging queries to console.'); - console.log("Starting process."); + console.log('Starting process.'); await this.init(); - console.log("Process started."); + console.log('Process started.'); } async stop() { - console.log("Stopping process."); + console.log('Stopping process.'); await this.end(); - console.log("Process stopped."); + console.log('Process stopped.'); } async init() { const { conf } = this; - this.debug("MyLogger", "Initializing."); + this.debug('MyLogger', 'Initializing.'); this.onErrorListener = (err) => this.onError(err); // DB connection const db = (this.db = await mysql.createConnection(conf.dstDb)); - db.on("error", this.onErrorListener); + db.on('error', this.onErrorListener); await this.modelLoader.loadSchema(); await this.showDb.loadSchema(); @@ -110,7 +111,7 @@ module.exports = class MyLogger { this.onBinlogListener = (evt) => this.onBinlog(evt); const [res] = await db.query( - "SELECT `logName`, `position` FROM `util`.`binlogQueue` WHERE code = ?", + 'SELECT `logName`, `position` FROM `util`.`binlogQueue` WHERE code = ?', [conf.code] ); if (res.length) { @@ -134,15 +135,15 @@ module.exports = class MyLogger { this.running = true; this.isOk = true; - this.debug("MyLogger", "Initialized."); + this.debug('MyLogger', 'Initialized.'); } async end(silent) { if (!this.running) return; this.running = false; - this.debug("MyLogger", "Ending."); + this.debug('MyLogger', 'Ending.'); - this.db.off("error", this.onErrorListener); + this.db.off('error', this.onErrorListener); clearInterval(this.flushInterval); clearInterval(this.pingInterval); @@ -166,7 +167,7 @@ module.exports = class MyLogger { // DB connection // FIXME: mysql2/promise bug, db.end() ends process - this.db.on("error", () => {}); + this.db.on('error', () => {}); try { this.db.end(); } catch (err) { @@ -175,7 +176,7 @@ module.exports = class MyLogger { // Summary - this.debug("MyLogger", "Ended."); + this.debug('MyLogger', 'Ended.'); } async zongjiStart() { @@ -185,7 +186,7 @@ module.exports = class MyLogger { if (this.binlogName) { this.debug( - "Zongji", + 'Zongji', `Starting: ${this.binlogName}, position: ${this.binlogPosition}` ); Object.assign(zongjiOpts, { @@ -193,51 +194,51 @@ module.exports = class MyLogger { position: this.binlogPosition, }); } else { - this.debug("Zongji", "Starting at end."); + this.debug('Zongji', 'Starting at end.'); zongjiOpts.startAtEnd = true; } - zongji.on("binlog", this.onBinlogListener); + zongji.on('binlog', this.onBinlogListener); await new Promise((resolve, reject) => { const onReady = () => { - zongji.off("error", onError); + zongji.off('error', onError); resolve(); }; const onError = (err) => { - zongji.off("ready", onReady); - zongji.off("binlog", this.onBinlogListener); + zongji.off('ready', onReady); + zongji.off('binlog', this.onBinlogListener); reject(err); }; - zongji.once("ready", onReady); - zongji.once("error", onError); + zongji.once('ready', onReady); + zongji.once('error', onError); zongji.start(zongjiOpts); }); - zongji.on("error", this.onErrorListener); + zongji.on('error', this.onErrorListener); this.zongji = zongji; - this.debug("Zongji", "Started."); + this.debug('Zongji', 'Started.'); } async zongjiStop() { if (!this.zongji) return; this.debug( - "Zongji", + 'Zongji', `Stopping: ${this.binlogName}, position: ${this.binlogPosition}` ); const zongji = this.zongji; this.zongji = null; - zongji.off("binlog", this.onBinlogListener); - zongji.off("error", this.onErrorListener); + zongji.off('binlog', this.onBinlogListener); + zongji.off('error', this.onErrorListener); // FIXME: Cannot call Zongji.stop(), it doesn't wait to end connection zongji.connection.destroy(() => { - console.log("zongji.connection.destroy"); + console.log('zongji.connection.destroy'); }); await new Promise((resolve) => { zongji.ctrlConnection.query( - "KILL ?", + 'KILL ?', [zongji.connection.threadId], (err) => { // if (err && err.code !== 'ER_NO_SUCH_THREAD'); @@ -247,16 +248,16 @@ module.exports = class MyLogger { ); }); zongji.ctrlConnection.destroy(() => { - console.log("zongji.ctrlConnection.destroy"); + console.log('zongji.ctrlConnection.destroy'); }); - zongji.emit("stopped"); - this.debug("Zongji", "Stopped."); + zongji.emit('stopped'); + this.debug('Zongji', 'Stopped.'); } async tryRestart() { try { await this.init(); - console.log("Process restarted."); + console.log('Process restarted.'); } catch (err) { setTimeout(() => this.tryRestart(), this.conf.restartTimeout * 1000); } @@ -272,9 +273,9 @@ module.exports = class MyLogger { } catch (e) {} switch (err.code) { - case "PROTOCOL_CONNECTION_LOST": - case "ECONNRESET": - console.log("Trying to restart process."); + case 'PROTOCOL_CONNECTION_LOST': + case 'ECONNRESET': + console.log('Trying to restart process.'); await this.tryRestart(); break; default: @@ -292,7 +293,7 @@ module.exports = class MyLogger { let shouldFlush; const eventName = evt.getEventName(); - if (eventName == "rotate") { + if (eventName == 'rotate') { if (evt.binlogName !== this.binlogName) { shouldFlush = true; this.binlogName = evt.binlogName; @@ -311,7 +312,7 @@ module.exports = class MyLogger { if (shouldFlush) this.isFlushed = false; if (this.queue.length > this.conf.maxQueueEvents) { - this.debug("MyLogger", "Queue full, stopping Zongji."); + this.debug('MyLogger', 'Queue full, stopping Zongji.'); await this.zongjiStop(); } } catch (err) { @@ -344,7 +345,7 @@ module.exports = class MyLogger { if (a === b) return true; const type = typeof a; if (a == null || b == null || type !== typeof b) return false; - if (type === "object" && a.constructor === b.constructor) { + if (type === 'object' && a.constructor === b.constructor) { if (a instanceof Date) { // FIXME: zongji creates invalid dates for NULL DATE // Error is somewhere here: zongji/lib/rows_event.js:129 @@ -361,7 +362,7 @@ module.exports = class MyLogger { const { castTypes } = tableInfo; - if (action == "update") { + if (action == 'update') { const cols = tableInfo.columns; for (const row of evt.rows) { @@ -422,7 +423,7 @@ module.exports = class MyLogger { if (this.conf.debug) console.debug( - "Evt:".blue, + 'Evt:'.blue, `[${action}]`[actionColor[action]], `${tableName}: ${changes.length} changes, queue: ${this.queue.length} elements` ); @@ -446,30 +447,30 @@ module.exports = class MyLogger { await this.showDb.getValues(db, ops); - await db.query("START TRANSACTION"); + await db.query('START TRANSACTION'); txStarted = true; for (op of ops) await this.applyOp(op); this.debug( - "Queue", + 'Queue', `applied: ${ops.length}, remaining: ${queue.length}` ); await this.savePosition(op.binlogName, op.evt.nextPosition); - await db.query("COMMIT"); + await db.query('COMMIT'); } catch (err) { queue.unshift(...ops); if (txStarted) try { - await db.query("ROLLBACK"); + await db.query('ROLLBACK'); } catch (err) {} throw err; } } while (queue.length); if (!this.zongji) { - this.debug("MyLogger", "Queue flushed, restarting Zongji."); + this.debug('MyLogger', 'Queue flushed, restarting Zongji.'); await this.zongjiStart(); } } else { @@ -488,8 +489,8 @@ module.exports = class MyLogger { const { tableInfo, action, evt, changes } = op; const { logInfo, isMain, relation, modelName, logFields } = tableInfo; - const isDelete = action == "delete"; - const isUpdate = action == "update"; + const isDelete = action == 'delete'; + const isUpdate = action == 'update'; const created = new Date(evt.timestamp); for (const change of changes) { @@ -497,7 +498,7 @@ module.exports = class MyLogger { const row = change.row; switch (action) { - case "update": + case 'update': newI = change.newI; oldI = change.oldI; if (logFields) { @@ -505,10 +506,10 @@ module.exports = class MyLogger { if (newI[field] === undefined) newI[field] = row[field]; } break; - case "insert": + case 'insert': newI = change.instance; break; - case "delete": + case 'delete': oldI = change.instance; break; } @@ -522,7 +523,7 @@ module.exports = class MyLogger { let deleteRow; if (conf.debug) console.debug( - "Log:".blue, + 'Log:'.blue, `[${action}]`[actionColor[action]], `${logInfo.name}: ${originFk}, ${modelName}: ${modelId}` ); @@ -563,18 +564,18 @@ module.exports = class MyLogger { await log(originFk); } } catch (err) { - if (err.code == "ER_NO_REFERENCED_ROW_2") { - this.debug("Log", `Ignored because of constraint failed.`); + if (err.code == 'ER_NO_REFERENCED_ROW_2') { + this.debug('Log', `Ignored because of constraint failed.`); } else throw err; } } } async savePosition(binlogName, binlogPosition) { - this.debug("Flush", `filename: ${binlogName}, position: ${binlogPosition}`); + this.debug('Flush', `filename: ${binlogName}, position: ${binlogPosition}`); const replaceQuery = - "REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?"; + 'REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?'; if (!this.conf.testMode) await this.db.query(replaceQuery, [ this.conf.code, @@ -589,7 +590,7 @@ module.exports = class MyLogger { async connectionPing() { if (!this.isOk) return; try { - this.debug("Ping", "Sending ping to database."); + this.debug('Ping', 'Sending ping to database.'); if (this.zongji) { // FIXME: Should Zongji.connection be pinged? @@ -612,18 +613,18 @@ module.exports = class MyLogger { } }; -const catchEvents = new Set(["writerows", "updaterows", "deleterows"]); +const catchEvents = new Set(['writerows', 'updaterows', 'deleterows']); const actions = { - writerows: "insert", - updaterows: "update", - deleterows: "delete", + writerows: 'insert', + updaterows: 'update', + deleterows: 'delete', }; const actionColor = { - insert: "green", - update: "yellow", - delete: "red", + insert: 'green', + update: 'yellow', + delete: 'red', }; const castFn = {