diff --git a/config/salix.local.yml b/config/salix.local.yml new file mode 100644 index 0000000..cf838c8 --- /dev/null +++ b/config/salix.local.yml @@ -0,0 +1 @@ +salixVersion: 24.40.0 diff --git a/lib/__tests__/salix.spec.js b/lib/__tests__/salix.spec.js new file mode 100644 index 0000000..bf74b40 --- /dev/null +++ b/lib/__tests__/salix.spec.js @@ -0,0 +1,99 @@ +const assert = require("assert"); +const fs = require("fs"); +const path = require("path"); +const Salix = require("./Salix"); // Ajusta la ruta si está en otro directorio + +// Mock del sistema de archivos +const mockFs = { + files: {}, + + existsSync(filePath) { + return !!this.files[filePath]; + }, + + writeFileSync(filePath, content) { + this.files[filePath] = content; + }, + + readFileSync(filePath) { + if (!this.existsSync(filePath)) { + throw { code: "ENOENT" }; + } + return this.files[filePath]; + }, + + reset() { + this.files = {}; + }, +}; + +// Reemplaza los métodos reales de fs con el mock +const originalFs = { ...fs }; +Object.assign(fs, mockFs); + +(async () => { + try { + // Configuración inicial + const loggerMock = { + conf: { + salix: { + url: "http://example.com", + api: { + status: "status", + logInfo: "logInfo", + }, + renewInterval: 10, + }, + }, + }; + + // Test: Inicialización básica + const salix = new Salix(); + await salix.init(loggerMock); + + assert.strictEqual(salix.conf, loggerMock.conf, "La configuración no se inicializó correctamente"); + console.log("✅ Salix.init configuró correctamente los valores iniciales."); + + // Test: Cargar configuración + const mockFilePath = path.join(__dirname, "..", "config/salix.local.yml"); + mockFs.files[mockFilePath] = JSON.stringify({ salixVersion: "1.0.0" }); + + const loadedConfig = await salix.loadConfig(); + assert.deepStrictEqual( + loadedConfig, + { salixVersion: "1.0.0" }, + "La configuración cargada no coincide con lo esperado" + ); + console.log("✅ Salix.loadConfig cargó correctamente el archivo YAML."); + + // Test: Guardar configuración + const newConfig = { log: "testLog" }; + await salix.saveConfig(newConfig); + + const savedContent = JSON.parse(mockFs.files[mockFilePath]); + assert.strictEqual( + savedContent.log, + "testLog", + "La configuración no se guardó correctamente" + ); + console.log("✅ Salix.saveConfig guardó correctamente el archivo YAML."); + + // Test: Comparar versiones + const isDeprecated = salix.compareVersion("2.0.0", "1.0.0"); + assert.strictEqual( + isDeprecated, + true, + "La comparación de versiones no detectó correctamente la versión como desactualizada" + ); + console.log("✅ Salix.compareVersion funciona correctamente."); + + // Limpia el mock del sistema de archivos + mockFs.reset(); + + } catch (error) { + console.error("❌ Error en las pruebas:", error); + } finally { + // Restaura los métodos originales de fs + Object.assign(fs, originalFs); + } +})(); diff --git a/lib/model-loader.js b/lib/model-loader.js index ca1663a..8a894f5 100644 --- a/lib/model-loader.js +++ b/lib/model-loader.js @@ -6,7 +6,7 @@ const MultiMap = require('./multi-map'); * Loads model configuration. */ module.exports = class ModelLoader { - init(logger) { + init(logger, logs = []) { const configDir = path.join(__dirname, '..'); const conf = loadConfig(configDir, 'logs'); const schemaMap = new MultiMap(); @@ -20,6 +20,11 @@ module.exports = class ModelLoader { schemaMap, logMap }); + + if(!conf.logs ) conf.logs = {} + Object.keys(conf.logs??{}).length > 0 && console.warn('‼️ - Existen modelos NO exportados') + Object.assign(conf.logs, logs) + for (const logName in conf.logs) { const logConf = conf.logs[logName]; diff --git a/lib/salix.js b/lib/salix.js new file mode 100644 index 0000000..e14b1e9 --- /dev/null +++ b/lib/salix.js @@ -0,0 +1,242 @@ +const fs = require("fs"); +const path = require("path"); +const SALIX_VERSION_HEADER = "Salix-version"; +const CONFIG_FILENAME = "/config/salix.local.yml"; +const i18n = { + versionChanged: "La versión ha cambiado", + deprecatedVersion: "La configuracion guardada está desactualizada", + noDeprecatedVersion: "La configuración guardada no requiere actualización", + erroFs: { + noExists: "El archivo no existe", + noWrite: "Error al escribir el archivo YAML:", + noRead: "Error al leer el archivo YAML:", + }, +}; +module.exports = class Salix { + constructor() { + this.conf = null; + this._logInfo = []; + this._salixVersion = null; + this._salixConfig = null; + this._token = null; + } + emit() {} + subscribe(externalListenerFunction) { + this.emit = externalListenerFunction; + } + + async getToken() { + const { url: path, user, password } = this.conf.salix.api.login; + let response; + try { + response = await fetch(`${this.conf.salix.url}/${path}`, { + method: "POST", + headers: { + "Content-Type": "application/json", + Accept: "application/json", + }, + body: JSON.stringify({ + user, + password, + }), + }); + response = await response.json(); + } catch (error) { + console.error(error); + } + this.token = response.token; + } + + async init(logger) { + this.conf = logger.conf; + const salixFileConfig = await this.loadConfig(); + try { + // No exite fichero almacenado. Solicito config y version + // else { + + this._salixConfig = salixFileConfig; + //Obtengo la version + await this.getSalixVersion(false); + this.salixVersion && (await this.handleVersion(salixFileConfig)); + if (!salixFileConfig) await this.salixLogInfo(); + this._logInfo = salixFileConfig.log; + // } + } catch (error) {} + + setInterval( + () => this.getSalixVersion(), + this.conf.salix.renewInterval * 1000 + ); + } + get filePath() { + // Leer el contenido del archivo YAML + const configDir = path.join(__dirname, ".."); + return `${configDir}${CONFIG_FILENAME}`; + } + async loadConfig() { + // try { + if (fs.existsSync(this.filePath)) { + const contenidoYAML = require(this.filePath); + return Object.assign({}, contenidoYAML); + } else { + new Error(`No existe el archivo de configuración`); + this.log("Creando archivo de configuración"); + fs.writeFileSync(this.filePath, require("js-yaml").dump({}), "utf-8"); + this.log("Archivo de configuración creado"); + } + // } catch (error) { + // if (error.code === "ENOENT") { + // console.error(i18n.erroFs.noExists); + // } else { + // console.error(i18n.erroFs.noRead, error); + // } + // return null; + // } + } + + async saveConfig(configValue) { + const defaultConfig = { + salixVersion: this.salixVersion, + }; + try { + const contenidoYAML = fs.writeFileSync( + this.filePath, + require("js-yaml").dump( + Object.assign(defaultConfig, { log: configValue }) + ), + "utf-8" + ); + return Object.assign({}, contenidoYAML); + } catch (error) { + if (error.code === "ENOENT") { + console.error(i18n.erroFs.noExists); + } else { + console.error(i18n.erroFs.noWrite, error); + } + return null; + } + } + + parseVersion(value) { + return value.split(".").map(Number); + } + + async handleVersion(salixFileConfig) { + const isDeprecated = this.compareVersion( + this.salixVersion, + salixFileConfig.salixVersion + ); + isDeprecated && (await this.salixLogInfo()); + } + compareVersion(v1, v2) { + let deprecatedConfig = false; + const version1 = this.parseVersion(v1); + const version2 = this.parseVersion(v2); + for (let i = 0; i < 3; i++) { + if (version1[i] > version2[i]) { + console.log( + // `La versión ${v1} es mayor que la versión ${v2}` + i18n.deprecatedVersion + ); + deprecatedConfig = true; + break; + } else if (version1[i] < version2[i]) { + console.log( + // `La versión ${v1} es menor que la versión ${v2}` + i18n.noDeprecatedVersion + ); + deprecatedConfig = false; + } + } + return deprecatedConfig; + } + + async fetch(path) { + try { + // Configuración de la llamada fetch + const fetchConfig = { + method: "GET", + headers: { + "Content-Type": "application/json", + Authorization: `${this.token}`, + }, + }; + + const salixCall = await fetch( + `${this.conf.salix.url}/${path}`, + fetchConfig + ); + if (salixCall.status !== 200) { + throw new Error(response); + } + const responseText = await salixCall.text(); + const response = JSON.parse(responseText); + + const newVersion = salixCall.headers.get(SALIX_VERSION_HEADER); + if (!this.salixVersion) { + this.salixVersion = newVersion; + this.saveConfig(); + await this.salixLogInfo(); + } + if (this.salixVersion !== newVersion) { + const isDeprecated = this.compareVersion(newVersion, this.salixVersion); + if (isDeprecated) { + this.salixVersion = newVersion; + console.info(i18n.versionChanged); + await this.salixLogInfo(); + } + } + return response; + } catch (response) { + const message = response?.error?.message ?? response.message; + this.log(message, 'error'); + } + } + + async getSalixVersion() { + this.log("CHECK VERSION"); + + await this.fetch(this.conf.salix.api.status); + + this.log("VERSION CHECKED"); + } + + async salixLogInfo() { + this.log("REQUEST LOGINFO"); + await this.getToken(); + let salixLogConfig = await this.fetch(this.conf.salix.api.logInfo); + + this._logInfo = salixLogConfig; + this.saveConfig(salixLogConfig); + this.log("LOGINFO REQUESTED"); + } + + log(param, type = 'log') { + const colors = { + log: '\x1b[37m', + error: '\x1b[31m', + warn: '\x1b[33m' , + } + console[type](colors[type], `${Salix.name} - ${type} - ${param}`); + } + + get logInfo() { + return this._logInfo; + } + + get token() { + return this._token; + } + set token(token) { + this._token = token; + } + get salixVersion() { + return this._salixVersion; + } + set salixVersion(newVersion) { + if (this._salixVersion && this._salixVersion !== newVersion) + this.emit(newVersion); + + this._salixVersion = newVersion; + } +}; diff --git a/mylogger.js b/mylogger.js index 6700069..bd4153b 100644 --- a/mylogger.js +++ b/mylogger.js @@ -2,9 +2,10 @@ require('require-yaml'); require('colors'); const ZongJi = require('./zongji'); const mysql = require('mysql2/promise'); -const {loadConfig} = require('./lib/util'); +const { loadConfig } = require('./lib/util'); const ModelLoader = require('./lib/model-loader'); const ShowDb = require('./lib/show-db'); +const Salix = require('./lib/salix'); module.exports = class MyLogger { constructor() { @@ -19,33 +20,44 @@ module.exports = class MyLogger { } async start() { - const conf = this.conf = loadConfig(__dirname, 'config'); - - this.modelLoader.init(this); + const conf = (this.conf = loadConfig(__dirname, 'config')); + const salix = new Salix(); + salix.subscribe(( ) => { + this.stop(); + this.start(); + }); + await salix.init(this); + // await salix.salixLogInfo(); + this.modelLoader.init(this, salix.logInfo); this.showDb.init(this); - const includeSchema = {}; for (const [schemaName, tableMap] of this.schemaMap.map) includeSchema[schemaName] = Array.from(tableMap.keys()); - + this.zongjiOpts = { includeEvents: [ 'rotate', 'tablemap', 'writerows', 'updaterows', - 'deleterows' + 'deleterows', ], includeSchema, - serverId: conf.serverId + serverId: conf.serverId, }; if (conf.testMode) console.log('Test mode enabled, just logging queries to console.'); console.log('Starting process.'); - await this.init(); - console.log('Process started.'); + try { + + await this.init(); + console.log('Process started.'); + } catch (error) { + console.error(error); + + } } async stop() { @@ -55,18 +67,23 @@ module.exports = class MyLogger { } async init() { - const {conf} = this; + const { conf } = this; this.debug('MyLogger', 'Initializing.'); + this.onErrorListener = (err) => this.onError(err); // DB connection - const db = this.db = await mysql.createConnection(conf.dstDb); + + const db = (this.db = await mysql.createConnection(conf.dstDb)); + db.on('error', this.onErrorListener); await this.modelLoader.loadSchema(); await this.showDb.loadSchema(); for (const logInfo of this.logMap.values()) { const table = logInfo.table; - const sqlTable = `${db.escapeId(table.schema)}.${db.escapeId(table.name)}`; + const sqlTable = `${db.escapeId(table.schema)}.${db.escapeId( + table.name + )}`; logInfo.addStmt = await db.prepare( `INSERT INTO ${sqlTable} SET originFk = ?, @@ -101,7 +118,7 @@ module.exports = class MyLogger { // Zongji - this.onBinlogListener = evt => this.onBinlog(evt); + this.onBinlogListener = (evt) => this.onBinlog(evt); const [res] = await db.query( 'SELECT `logName`, `position` FROM `util`.`binlogQueue` WHERE code = ?', @@ -116,9 +133,13 @@ module.exports = class MyLogger { await this.zongjiStart(); this.flushInterval = setInterval( - () => this.flushQueue(), conf.flushInterval * 1000); + () => this.flushQueue(), + conf.flushInterval * 1000 + ); this.pingInterval = setInterval( - () => this.connectionPing(), conf.pingInterval * 1000); + () => this.connectionPing(), + conf.pingInterval * 1000 + ); // Summary @@ -132,6 +153,8 @@ module.exports = class MyLogger { this.running = false; this.debug('MyLogger', 'Ending.'); + this.db.off('error', this.onErrorListener); + clearInterval(this.flushInterval); clearInterval(this.pingInterval); clearInterval(this.flushTimeout); @@ -172,12 +195,13 @@ module.exports = class MyLogger { const zongjiOpts = this.zongjiOpts; if (this.binlogName) { - this.debug('Zongji', + this.debug( + 'Zongji', `Starting: ${this.binlogName}, position: ${this.binlogPosition}` ); Object.assign(zongjiOpts, { filename: this.binlogName, - position: this.binlogPosition + position: this.binlogPosition, }); } else { this.debug('Zongji', 'Starting at end.'); @@ -191,41 +215,47 @@ module.exports = class MyLogger { zongji.off('error', onError); resolve(); }; - const onError = err => { + const onError = (err) => { zongji.off('ready', onReady); zongji.off('binlog', this.onBinlogListener); reject(err); - } + }; zongji.once('ready', onReady); - zongji.once('error', onError); + zongji.once('error', onError); zongji.start(zongjiOpts); }); + zongji.on('error', this.onErrorListener); this.zongji = zongji; this.debug('Zongji', 'Started.'); } async zongjiStop() { if (!this.zongji) return; - this.debug('Zongji', + this.debug( + 'Zongji', `Stopping: ${this.binlogName}, position: ${this.binlogPosition}` ); const zongji = this.zongji; this.zongji = null; zongji.off('binlog', this.onBinlogListener); + zongji.off('error', this.onErrorListener); // FIXME: Cannot call Zongji.stop(), it doesn't wait to end connection zongji.connection.destroy(() => { console.log('zongji.connection.destroy'); }); - await new Promise(resolve => { - zongji.ctrlConnection.query('KILL ?', [zongji.connection.threadId], - err => { - // if (err && err.code !== 'ER_NO_SUCH_THREAD'); - // console.error(err); - resolve(); - }); + await new Promise((resolve) => { + zongji.ctrlConnection.query( + 'KILL ?', + [zongji.connection.threadId], + (err) => { + // if (err && err.code !== 'ER_NO_SUCH_THREAD'); + // console.error(err); + resolve(); + } + ); }); zongji.ctrlConnection.destroy(() => { console.log('zongji.ctrlConnection.destroy'); @@ -238,7 +268,7 @@ module.exports = class MyLogger { try { await this.init(); console.log('Process restarted.'); - } catch(err) { + } catch (err) { setTimeout(() => this.tryRestart(), this.conf.restartTimeout * 1000); } } @@ -250,21 +280,17 @@ module.exports = class MyLogger { try { await this.end(true); - } catch(e) {} + } catch (e) {} - // FIXME: Error of mysql2/promise - if (err.message === `Can't add new command when connection is in closed state`) - await this.tryRestart(); - else - switch (err.code) { - case 'PROTOCOL_CONNECTION_LOST': - case 'ECONNRESET': + switch (err.code) { + case 'PROTOCOL_CONNECTION_LOST': + case 'ECONNRESET': console.log('Trying to restart process.'); await this.tryRestart(); break; default: process.exit(); - } + } } async onBinlog(evt) { @@ -286,8 +312,7 @@ module.exports = class MyLogger { } else { shouldFlush = true; this.binlogPosition = evt.nextPosition; - if (catchEvents.has(eventName)) - this.onRowEvent(evt, eventName); + if (catchEvents.has(eventName)) this.onRowEvent(evt, eventName); } if (shouldFlush) this.isFlushed = false; @@ -296,7 +321,7 @@ module.exports = class MyLogger { this.debug('MyLogger', 'Queue full, stopping Zongji.'); await this.zongjiStop(); } - } catch(err) { + } catch (err) { this.handleError(err); } } @@ -307,9 +332,9 @@ module.exports = class MyLogger { const tableInfo = this.schemaMap.get(table.parentSchema, tableName); if (!tableInfo) return; - + const action = actions[eventName]; - const {rowExcludeField, ignoreSystem} = tableInfo; + const { rowExcludeField, ignoreSystem } = tableInfo; const changes = []; function isExcluded(row) { @@ -318,19 +343,16 @@ module.exports = class MyLogger { } function cast(value, type) { - if (value == null || !type) - return value; + if (value == null || !type) return value; const fn = castFn[type]; return fn ? fn(value) : value; } function equals(a, b) { - if (a === b) - return true; + if (a === b) return true; const type = typeof a; - if (a == null || b == null || type !== typeof b) - return false; + if (a == null || b == null || type !== typeof b) return false; if (type === 'object' && a.constructor === b.constructor) { if (a instanceof Date) { // FIXME: zongji creates invalid dates for NULL DATE @@ -339,14 +361,14 @@ module.exports = class MyLogger { if (isNaN(aTime)) aTime = null; let bTime = b.getTime(); if (isNaN(bTime)) bTime = null; - + return aTime === bTime; } } return false; } - const {castTypes} = tableInfo; + const { castTypes } = tableInfo; if (action == 'update') { const cols = tableInfo.columns; @@ -372,8 +394,7 @@ module.exports = class MyLogger { } } - if (nColsChanged) - changes.push({row: after, oldI, newI}); + if (nColsChanged) changes.push({ row: after, oldI, newI }); } } else { const cols = tableInfo.instanceColumns; @@ -388,7 +409,7 @@ module.exports = class MyLogger { instance[col] = cast(row[col], type); } - changes.push({row, instance}); + changes.push({ row, instance }); } } @@ -399,7 +420,7 @@ module.exports = class MyLogger { action, evt, changes, - binlogName: this.binlogName + binlogName: this.binlogName, }); if (!this.flushTimeout) @@ -409,7 +430,8 @@ module.exports = class MyLogger { ); if (this.conf.debug) - console.debug('Evt:'.blue, + console.debug( + 'Evt:'.blue, `[${action}]`[actionColor[action]], `${tableName}: ${changes.length} changes, queue: ${this.queue.length} elements` ); @@ -418,7 +440,7 @@ module.exports = class MyLogger { async flushQueue() { if (this.isFlushed || this.isFlushing || !this.isOk) return; this.isFlushing = true; - const {conf, db, queue} = this; + const { conf, db, queue } = this; let op; try { @@ -435,15 +457,17 @@ module.exports = class MyLogger { await db.query('START TRANSACTION'); txStarted = true; - - for (op of ops) - await this.applyOp(op); - this.debug('Queue', `applied: ${ops.length}, remaining: ${queue.length}`); + for (op of ops) await this.applyOp(op); + + this.debug( + 'Queue', + `applied: ${ops.length}, remaining: ${queue.length}` + ); await this.savePosition(op.binlogName, op.evt.nextPosition); await db.query('COMMIT'); - } catch(err) { + } catch (err) { queue.unshift(...ops); if (txStarted) try { @@ -460,7 +484,7 @@ module.exports = class MyLogger { } else { await this.savePosition(this.binlogName, this.binlogPosition); } - } catch(err) { + } catch (err) { this.handleError(err); } finally { this.flushTimeout = null; @@ -469,20 +493,9 @@ module.exports = class MyLogger { } async applyOp(op) { - const {conf} = this; - const { - tableInfo, - action, - evt, - changes - } = op; - const { - logInfo, - isMain, - relation, - modelName, - logFields - } = tableInfo; + const { conf } = this; + const { tableInfo, action, evt, changes } = op; + const { logInfo, isMain, relation, modelName, logFields } = tableInfo; const isDelete = action == 'delete'; const isUpdate = action == 'update'; @@ -493,14 +506,13 @@ module.exports = class MyLogger { let newI, oldI; const row = change.row; - switch(action) { + switch (action) { case 'update': newI = change.newI; oldI = change.oldI; if (logFields) { for (const field of logFields) - if (newI[field] === undefined) - newI[field] = row[field]; + if (newI[field] === undefined) newI[field] = row[field]; } break; case 'insert': @@ -515,12 +527,12 @@ module.exports = class MyLogger { const modelValue = change.modelValue ?? null; const oldInstance = oldI ? JSON.stringify(oldI) : null; const originFk = !isMain ? row[relation] : modelId; - const originChanged = isUpdate && !isMain - && newI[relation] !== undefined; + const originChanged = isUpdate && !isMain && newI[relation] !== undefined; let deleteRow; if (conf.debug) - console.debug('Log:'.blue, + console.debug( + 'Log:'.blue, `[${action}]`[actionColor[action]], `${logInfo.name}: ${originFk}, ${modelName}: ${modelId}` ); @@ -528,7 +540,9 @@ module.exports = class MyLogger { try { if (isDelete) { [[deleteRow]] = await logInfo.fetchStmt.execute([ - modelName, modelId, originFk + modelName, + modelId, + originFk, ]); if (!conf.testMode && deleteRow) await logInfo.updateStmt.execute([ @@ -536,8 +550,7 @@ module.exports = class MyLogger { created, oldInstance, modelValue, - summaryId, - deleteRow.id + deleteRow.id, ]); } if (!conf.testMode && (!isDelete || !deleteRow)) { @@ -553,33 +566,34 @@ module.exports = class MyLogger { newI ? JSON.stringify(newI) : null, modelId, modelValue, - summaryId ]); } - if (originChanged) - await log(oldI[relation]); + if (originChanged) await log(oldI[relation]); await log(originFk); } } catch (err) { if (err.code == 'ER_NO_REFERENCED_ROW_2') { this.debug('Log', `Ignored because of constraint failed.`); - } else - throw err; + } else throw err; } } } async savePosition(binlogName, binlogPosition) { this.debug('Flush', `filename: ${binlogName}, position: ${binlogPosition}`); - + const replaceQuery = 'REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?'; if (!this.conf.testMode) - await this.db.query(replaceQuery, [this.conf.code, binlogName, binlogPosition]); + await this.db.query(replaceQuery, [ + this.conf.code, + binlogName, + binlogPosition, + ]); - this.isFlushed = this.binlogName == binlogName - && this.binlogPosition == binlogPosition; + this.isFlushed = + this.binlogName == binlogName && this.binlogPosition == binlogPosition; } async connectionPing() { @@ -590,45 +604,40 @@ module.exports = class MyLogger { if (this.zongji) { // FIXME: Should Zongji.connection be pinged? await new Promise((resolve, reject) => { - this.zongji.ctrlConnection.ping(err => { + this.zongji.ctrlConnection.ping((err) => { if (err) return reject(err); resolve(); }); - }) + }); } await this.db.ping(); - } catch(err) { + } catch (err) { this.handleError(err); } } debug(namespace, message) { - if (this.conf.debug) - console.debug(`${namespace}:`.blue, message.yellow); + if (this.conf.debug) console.debug(`${namespace}:`.blue, message.yellow); } -} +}; -const catchEvents = new Set([ - 'writerows', - 'updaterows', - 'deleterows' -]); +const catchEvents = new Set(['writerows', 'updaterows', 'deleterows']); const actions = { writerows: 'insert', updaterows: 'update', - deleterows: 'delete' + deleterows: 'delete', }; const actionColor = { insert: 'green', update: 'yellow', - delete: 'red' + delete: 'red', }; const castFn = { - boolean: function(value) { + boolean: function (value) { return !!value; - } + }, };