feat: updates

This commit is contained in:
Javier Segarra 2024-05-06 13:44:55 +02:00
parent 61d01b56c5
commit 206b279ced
2 changed files with 205 additions and 87 deletions

View File

@ -1,14 +1,120 @@
const fs = require("fs");
const path = require("path");
const SALIX_VERSION_HEADER = "Salix-version";
const CONFIG_FILENAME = '/config/salix.local.yml';
const i18n = {
versionChanged: "La versión ha cambiado",
deprecatedVersion: "La configuracion guardada está desactualizada",
noDeprecatedVersion: "La configuración guardada no requiere actualización",
erroFs:{
noExists: 'El archivo no existe',
noWrite: 'Error al escribir el archivo YAML:',
noRead: 'Error al leer el archivo YAML:'
}
};
module.exports = class Salix { module.exports = class Salix {
constructor() { constructor() {
this.conf = null; this.conf = null;
this._logInfo = []; this._logInfo = [];
this._salixVersion = null; this._salixVersion = null;
this._salixConfig = null;
} }
async init(logger) { async init(logger) {
this.conf = logger.conf; this.conf = logger.conf;
const salixFileConfig = await this.loadConfig();
try {
// No exite fichero almacenado. Solicito config y version
if (!salixFileConfig) await this.salixLogInfo();
else {
this._salixConfig = salixFileConfig;
//Obtengo la version
await this.getSalixVersion(false);
this._salixVersion && this.handleVersion(salixFileConfig);
this._logInfo = salixFileConfig.log;
}
} catch (error) {
}
setInterval(() => this.getSalixVersion, this.conf.salix.renewInterval); setInterval(
await this.salixLogInfo(); () => this.getSalixVersion(),
this.conf.salix.renewInterval * 1000
);
}
get filePath() {
// Leer el contenido del archivo YAML
const configDir = path.join(__dirname, "..");
return `${configDir}${CONFIG_FILENAME}`;
}
async loadConfig() {
try {
if (fs.existsSync(this.filePath)) {
const contenidoYAML = require(this.filePath);
return Object.assign({}, contenidoYAML);
}
} catch (error) {
if (error.code === "ENOENT") {
console.error(i18n.erroFs.noExists);
} else {
console.error(i18n.erroFs.noRead, error);
}
return null;
}
}
async saveConfig(configValue) {
const defaultConfig = {
salixVersion: this._salixVersion,
};
try {
const contenidoYAML = fs.writeFileSync(
this.filePath,
require("js-yaml").dump(
Object.assign(defaultConfig, { log: configValue })
),
"utf-8"
);
return Object.assign({}, contenidoYAML);
} catch (error) {
if (error.code === "ENOENT") {
console.error(i18n.erroFs.noExists);
} else {
console.error(i18n.erroFs.noWrite, error);
}
return null;
}
}
parseVersion(value) {
return value.split(".").map(Number);
}
async handleVersion(salixFileConfig) {
const isDeprecated = this.compareVersion(
this._salixVersion,
salixFileConfig.salixVersion
);
isDeprecated && (await this.salixLogInfo());
}
compareVersion(v1, v2) {
let deprecatedConfig = false;
const version1 = this.parseVersion(v1);
const version2 = this.parseVersion(v2);
for (let i = 0; i < 3; i++) {
if (version1[i] > version2[i]) {
console.log(
// `La versión ${v1} es mayor que la versión ${v2}`
i18n.deprecatedVersion
);
deprecatedConfig = true;
} else if (version1[i] < version2[i]) {
console.log(
// `La versión ${v1} es menor que la versión ${v2}`
i18n.noDeprecatedVersion
);
deprecatedConfig = false;
}
}
return deprecatedConfig;
} }
async fetch(path) { async fetch(path) {
@ -21,28 +127,39 @@ module.exports = class Salix {
console.error(response.error); console.error(response.error);
throw new Error(response.error.message); throw new Error(response.error.message);
} }
this._salixVersion = salixCall.headers.get("Salix-version"); const newVersion = salixCall.headers.get(SALIX_VERSION_HEADER);
if (this._salixVersion) {
const isDeprecated = this.compareVersion(
this._salixVersion,
newVersion
);
if (isDeprecated) {
this._salixVersion = newVersion;
console.info(i18n.versionChanged);
await this.salixLogInfo();
}
} else this._salixVersion = newVersion;
return response; return response;
} catch (error) { } catch ({ message }) {
throw new Error(error.message); console.error(message);
if (!this._salixConfig) throw new Error(message);
} }
} }
async getSalixVersion() { async getSalixVersion() {
this.log("CHECK VERSION"); this.log("CHECK VERSION");
const salixVersion = await this.fetch("applications/status"); await this.fetch(this.conf.salix.api.status);
if (this._salixVersion !== salixVersion) fetch();
this.log("VERSION CHECKED"); this.log("VERSION CHECKED");
} }
async salixLogInfo() { async salixLogInfo() {
this.log("LOGINFO REQUEST"); this.log("REQUEST LOGINFO");
let salixLogConfig = await this.fetch("schemas/logInfo"); let salixLogConfig = await this.fetch(this.conf.salix.api.logInfo);
this._logInfo = salixLogConfig; this._logInfo = salixLogConfig;
this.saveConfig(salixLogConfig);
this.log("LOGINFO REQUESTED"); this.log("LOGINFO REQUESTED");
} }

View File

@ -1,11 +1,11 @@
require("require-yaml"); require('require-yaml');
require("colors"); require('colors');
const ZongJi = require("./zongji"); const ZongJi = require('./zongji');
const mysql = require("mysql2/promise"); const mysql = require('mysql2/promise');
const { loadConfig } = require("./lib/util"); const { loadConfig } = require('./lib/util');
const ModelLoader = require("./lib/model-loader"); const ModelLoader = require('./lib/model-loader');
const ShowDb = require("./lib/show-db"); const ShowDb = require('./lib/show-db');
const Salix = require("./lib/salix"); const Salix = require('./lib/salix');
module.exports = class MyLogger { module.exports = class MyLogger {
constructor() { constructor() {
@ -20,10 +20,11 @@ module.exports = class MyLogger {
} }
async start() { async start() {
const conf = (this.conf = loadConfig(__dirname, "config")); const conf = (this.conf = loadConfig(__dirname, 'config'));
const salix = new Salix(); const salix = new Salix();
await salix.init(this); await salix.init(this);
// await salix.salixLogInfo(); // await salix.salixLogInfo();
// salix.registerNewListener((val) => console.log(`New Value: ${val}`));
this.modelLoader.init(this, salix.logInfo); this.modelLoader.init(this, salix.logInfo);
this.showDb.init(this); this.showDb.init(this);
@ -33,39 +34,39 @@ module.exports = class MyLogger {
this.zongjiOpts = { this.zongjiOpts = {
includeEvents: [ includeEvents: [
"rotate", 'rotate',
"tablemap", 'tablemap',
"writerows", 'writerows',
"updaterows", 'updaterows',
"deleterows", 'deleterows',
], ],
includeSchema, includeSchema,
serverId: conf.serverId, serverId: conf.serverId,
}; };
if (conf.testMode) if (conf.testMode)
console.log("Test mode enabled, just logging queries to console."); console.log('Test mode enabled, just logging queries to console.');
console.log("Starting process."); console.log('Starting process.');
await this.init(); await this.init();
console.log("Process started."); console.log('Process started.');
} }
async stop() { async stop() {
console.log("Stopping process."); console.log('Stopping process.');
await this.end(); await this.end();
console.log("Process stopped."); console.log('Process stopped.');
} }
async init() { async init() {
const { conf } = this; const { conf } = this;
this.debug("MyLogger", "Initializing."); this.debug('MyLogger', 'Initializing.');
this.onErrorListener = (err) => this.onError(err); this.onErrorListener = (err) => this.onError(err);
// DB connection // DB connection
const db = (this.db = await mysql.createConnection(conf.dstDb)); const db = (this.db = await mysql.createConnection(conf.dstDb));
db.on("error", this.onErrorListener); db.on('error', this.onErrorListener);
await this.modelLoader.loadSchema(); await this.modelLoader.loadSchema();
await this.showDb.loadSchema(); await this.showDb.loadSchema();
@ -110,7 +111,7 @@ module.exports = class MyLogger {
this.onBinlogListener = (evt) => this.onBinlog(evt); this.onBinlogListener = (evt) => this.onBinlog(evt);
const [res] = await db.query( const [res] = await db.query(
"SELECT `logName`, `position` FROM `util`.`binlogQueue` WHERE code = ?", 'SELECT `logName`, `position` FROM `util`.`binlogQueue` WHERE code = ?',
[conf.code] [conf.code]
); );
if (res.length) { if (res.length) {
@ -134,15 +135,15 @@ module.exports = class MyLogger {
this.running = true; this.running = true;
this.isOk = true; this.isOk = true;
this.debug("MyLogger", "Initialized."); this.debug('MyLogger', 'Initialized.');
} }
async end(silent) { async end(silent) {
if (!this.running) return; if (!this.running) return;
this.running = false; this.running = false;
this.debug("MyLogger", "Ending."); this.debug('MyLogger', 'Ending.');
this.db.off("error", this.onErrorListener); this.db.off('error', this.onErrorListener);
clearInterval(this.flushInterval); clearInterval(this.flushInterval);
clearInterval(this.pingInterval); clearInterval(this.pingInterval);
@ -166,7 +167,7 @@ module.exports = class MyLogger {
// DB connection // DB connection
// FIXME: mysql2/promise bug, db.end() ends process // FIXME: mysql2/promise bug, db.end() ends process
this.db.on("error", () => {}); this.db.on('error', () => {});
try { try {
this.db.end(); this.db.end();
} catch (err) { } catch (err) {
@ -175,7 +176,7 @@ module.exports = class MyLogger {
// Summary // Summary
this.debug("MyLogger", "Ended."); this.debug('MyLogger', 'Ended.');
} }
async zongjiStart() { async zongjiStart() {
@ -185,7 +186,7 @@ module.exports = class MyLogger {
if (this.binlogName) { if (this.binlogName) {
this.debug( this.debug(
"Zongji", 'Zongji',
`Starting: ${this.binlogName}, position: ${this.binlogPosition}` `Starting: ${this.binlogName}, position: ${this.binlogPosition}`
); );
Object.assign(zongjiOpts, { Object.assign(zongjiOpts, {
@ -193,51 +194,51 @@ module.exports = class MyLogger {
position: this.binlogPosition, position: this.binlogPosition,
}); });
} else { } else {
this.debug("Zongji", "Starting at end."); this.debug('Zongji', 'Starting at end.');
zongjiOpts.startAtEnd = true; zongjiOpts.startAtEnd = true;
} }
zongji.on("binlog", this.onBinlogListener); zongji.on('binlog', this.onBinlogListener);
await new Promise((resolve, reject) => { await new Promise((resolve, reject) => {
const onReady = () => { const onReady = () => {
zongji.off("error", onError); zongji.off('error', onError);
resolve(); resolve();
}; };
const onError = (err) => { const onError = (err) => {
zongji.off("ready", onReady); zongji.off('ready', onReady);
zongji.off("binlog", this.onBinlogListener); zongji.off('binlog', this.onBinlogListener);
reject(err); reject(err);
}; };
zongji.once("ready", onReady); zongji.once('ready', onReady);
zongji.once("error", onError); zongji.once('error', onError);
zongji.start(zongjiOpts); zongji.start(zongjiOpts);
}); });
zongji.on("error", this.onErrorListener); zongji.on('error', this.onErrorListener);
this.zongji = zongji; this.zongji = zongji;
this.debug("Zongji", "Started."); this.debug('Zongji', 'Started.');
} }
async zongjiStop() { async zongjiStop() {
if (!this.zongji) return; if (!this.zongji) return;
this.debug( this.debug(
"Zongji", 'Zongji',
`Stopping: ${this.binlogName}, position: ${this.binlogPosition}` `Stopping: ${this.binlogName}, position: ${this.binlogPosition}`
); );
const zongji = this.zongji; const zongji = this.zongji;
this.zongji = null; this.zongji = null;
zongji.off("binlog", this.onBinlogListener); zongji.off('binlog', this.onBinlogListener);
zongji.off("error", this.onErrorListener); zongji.off('error', this.onErrorListener);
// FIXME: Cannot call Zongji.stop(), it doesn't wait to end connection // FIXME: Cannot call Zongji.stop(), it doesn't wait to end connection
zongji.connection.destroy(() => { zongji.connection.destroy(() => {
console.log("zongji.connection.destroy"); console.log('zongji.connection.destroy');
}); });
await new Promise((resolve) => { await new Promise((resolve) => {
zongji.ctrlConnection.query( zongji.ctrlConnection.query(
"KILL ?", 'KILL ?',
[zongji.connection.threadId], [zongji.connection.threadId],
(err) => { (err) => {
// if (err && err.code !== 'ER_NO_SUCH_THREAD'); // if (err && err.code !== 'ER_NO_SUCH_THREAD');
@ -247,16 +248,16 @@ module.exports = class MyLogger {
); );
}); });
zongji.ctrlConnection.destroy(() => { zongji.ctrlConnection.destroy(() => {
console.log("zongji.ctrlConnection.destroy"); console.log('zongji.ctrlConnection.destroy');
}); });
zongji.emit("stopped"); zongji.emit('stopped');
this.debug("Zongji", "Stopped."); this.debug('Zongji', 'Stopped.');
} }
async tryRestart() { async tryRestart() {
try { try {
await this.init(); await this.init();
console.log("Process restarted."); console.log('Process restarted.');
} catch (err) { } catch (err) {
setTimeout(() => this.tryRestart(), this.conf.restartTimeout * 1000); setTimeout(() => this.tryRestart(), this.conf.restartTimeout * 1000);
} }
@ -272,9 +273,9 @@ module.exports = class MyLogger {
} catch (e) {} } catch (e) {}
switch (err.code) { switch (err.code) {
case "PROTOCOL_CONNECTION_LOST": case 'PROTOCOL_CONNECTION_LOST':
case "ECONNRESET": case 'ECONNRESET':
console.log("Trying to restart process."); console.log('Trying to restart process.');
await this.tryRestart(); await this.tryRestart();
break; break;
default: default:
@ -292,7 +293,7 @@ module.exports = class MyLogger {
let shouldFlush; let shouldFlush;
const eventName = evt.getEventName(); const eventName = evt.getEventName();
if (eventName == "rotate") { if (eventName == 'rotate') {
if (evt.binlogName !== this.binlogName) { if (evt.binlogName !== this.binlogName) {
shouldFlush = true; shouldFlush = true;
this.binlogName = evt.binlogName; this.binlogName = evt.binlogName;
@ -311,7 +312,7 @@ module.exports = class MyLogger {
if (shouldFlush) this.isFlushed = false; if (shouldFlush) this.isFlushed = false;
if (this.queue.length > this.conf.maxQueueEvents) { if (this.queue.length > this.conf.maxQueueEvents) {
this.debug("MyLogger", "Queue full, stopping Zongji."); this.debug('MyLogger', 'Queue full, stopping Zongji.');
await this.zongjiStop(); await this.zongjiStop();
} }
} catch (err) { } catch (err) {
@ -344,7 +345,7 @@ module.exports = class MyLogger {
if (a === b) return true; if (a === b) return true;
const type = typeof a; const type = typeof a;
if (a == null || b == null || type !== typeof b) return false; if (a == null || b == null || type !== typeof b) return false;
if (type === "object" && a.constructor === b.constructor) { if (type === 'object' && a.constructor === b.constructor) {
if (a instanceof Date) { if (a instanceof Date) {
// FIXME: zongji creates invalid dates for NULL DATE // FIXME: zongji creates invalid dates for NULL DATE
// Error is somewhere here: zongji/lib/rows_event.js:129 // Error is somewhere here: zongji/lib/rows_event.js:129
@ -361,7 +362,7 @@ module.exports = class MyLogger {
const { castTypes } = tableInfo; const { castTypes } = tableInfo;
if (action == "update") { if (action == 'update') {
const cols = tableInfo.columns; const cols = tableInfo.columns;
for (const row of evt.rows) { for (const row of evt.rows) {
@ -422,7 +423,7 @@ module.exports = class MyLogger {
if (this.conf.debug) if (this.conf.debug)
console.debug( console.debug(
"Evt:".blue, 'Evt:'.blue,
`[${action}]`[actionColor[action]], `[${action}]`[actionColor[action]],
`${tableName}: ${changes.length} changes, queue: ${this.queue.length} elements` `${tableName}: ${changes.length} changes, queue: ${this.queue.length} elements`
); );
@ -446,30 +447,30 @@ module.exports = class MyLogger {
await this.showDb.getValues(db, ops); await this.showDb.getValues(db, ops);
await db.query("START TRANSACTION"); await db.query('START TRANSACTION');
txStarted = true; txStarted = true;
for (op of ops) await this.applyOp(op); for (op of ops) await this.applyOp(op);
this.debug( this.debug(
"Queue", 'Queue',
`applied: ${ops.length}, remaining: ${queue.length}` `applied: ${ops.length}, remaining: ${queue.length}`
); );
await this.savePosition(op.binlogName, op.evt.nextPosition); await this.savePosition(op.binlogName, op.evt.nextPosition);
await db.query("COMMIT"); await db.query('COMMIT');
} catch (err) { } catch (err) {
queue.unshift(...ops); queue.unshift(...ops);
if (txStarted) if (txStarted)
try { try {
await db.query("ROLLBACK"); await db.query('ROLLBACK');
} catch (err) {} } catch (err) {}
throw err; throw err;
} }
} while (queue.length); } while (queue.length);
if (!this.zongji) { if (!this.zongji) {
this.debug("MyLogger", "Queue flushed, restarting Zongji."); this.debug('MyLogger', 'Queue flushed, restarting Zongji.');
await this.zongjiStart(); await this.zongjiStart();
} }
} else { } else {
@ -488,8 +489,8 @@ module.exports = class MyLogger {
const { tableInfo, action, evt, changes } = op; const { tableInfo, action, evt, changes } = op;
const { logInfo, isMain, relation, modelName, logFields } = tableInfo; const { logInfo, isMain, relation, modelName, logFields } = tableInfo;
const isDelete = action == "delete"; const isDelete = action == 'delete';
const isUpdate = action == "update"; const isUpdate = action == 'update';
const created = new Date(evt.timestamp); const created = new Date(evt.timestamp);
for (const change of changes) { for (const change of changes) {
@ -497,7 +498,7 @@ module.exports = class MyLogger {
const row = change.row; const row = change.row;
switch (action) { switch (action) {
case "update": case 'update':
newI = change.newI; newI = change.newI;
oldI = change.oldI; oldI = change.oldI;
if (logFields) { if (logFields) {
@ -505,10 +506,10 @@ module.exports = class MyLogger {
if (newI[field] === undefined) newI[field] = row[field]; if (newI[field] === undefined) newI[field] = row[field];
} }
break; break;
case "insert": case 'insert':
newI = change.instance; newI = change.instance;
break; break;
case "delete": case 'delete':
oldI = change.instance; oldI = change.instance;
break; break;
} }
@ -522,7 +523,7 @@ module.exports = class MyLogger {
let deleteRow; let deleteRow;
if (conf.debug) if (conf.debug)
console.debug( console.debug(
"Log:".blue, 'Log:'.blue,
`[${action}]`[actionColor[action]], `[${action}]`[actionColor[action]],
`${logInfo.name}: ${originFk}, ${modelName}: ${modelId}` `${logInfo.name}: ${originFk}, ${modelName}: ${modelId}`
); );
@ -563,18 +564,18 @@ module.exports = class MyLogger {
await log(originFk); await log(originFk);
} }
} catch (err) { } catch (err) {
if (err.code == "ER_NO_REFERENCED_ROW_2") { if (err.code == 'ER_NO_REFERENCED_ROW_2') {
this.debug("Log", `Ignored because of constraint failed.`); this.debug('Log', `Ignored because of constraint failed.`);
} else throw err; } else throw err;
} }
} }
} }
async savePosition(binlogName, binlogPosition) { async savePosition(binlogName, binlogPosition) {
this.debug("Flush", `filename: ${binlogName}, position: ${binlogPosition}`); this.debug('Flush', `filename: ${binlogName}, position: ${binlogPosition}`);
const replaceQuery = const replaceQuery =
"REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?"; 'REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?';
if (!this.conf.testMode) if (!this.conf.testMode)
await this.db.query(replaceQuery, [ await this.db.query(replaceQuery, [
this.conf.code, this.conf.code,
@ -589,7 +590,7 @@ module.exports = class MyLogger {
async connectionPing() { async connectionPing() {
if (!this.isOk) return; if (!this.isOk) return;
try { try {
this.debug("Ping", "Sending ping to database."); this.debug('Ping', 'Sending ping to database.');
if (this.zongji) { if (this.zongji) {
// FIXME: Should Zongji.connection be pinged? // FIXME: Should Zongji.connection be pinged?
@ -612,18 +613,18 @@ module.exports = class MyLogger {
} }
}; };
const catchEvents = new Set(["writerows", "updaterows", "deleterows"]); const catchEvents = new Set(['writerows', 'updaterows', 'deleterows']);
const actions = { const actions = {
writerows: "insert", writerows: 'insert',
updaterows: "update", updaterows: 'update',
deleterows: "delete", deleterows: 'delete',
}; };
const actionColor = { const actionColor = {
insert: "green", insert: 'green',
update: "yellow", update: 'yellow',
delete: "red", delete: 'red',
}; };
const castFn = { const castFn = {