Compare commits

..

No commits in common. "6533_myLogger_salix" and "master" have entirely different histories.

5 changed files with 112 additions and 468 deletions

View File

@ -1 +0,0 @@
salixVersion: 24.40.0

View File

@ -1,99 +0,0 @@
const assert = require("assert");
const fs = require("fs");
const path = require("path");
const Salix = require("./Salix"); // Ajusta la ruta si está en otro directorio
// Mock del sistema de archivos
const mockFs = {
files: {},
existsSync(filePath) {
return !!this.files[filePath];
},
writeFileSync(filePath, content) {
this.files[filePath] = content;
},
readFileSync(filePath) {
if (!this.existsSync(filePath)) {
throw { code: "ENOENT" };
}
return this.files[filePath];
},
reset() {
this.files = {};
},
};
// Reemplaza los métodos reales de fs con el mock
const originalFs = { ...fs };
Object.assign(fs, mockFs);
(async () => {
try {
// Configuración inicial
const loggerMock = {
conf: {
salix: {
url: "http://example.com",
api: {
status: "status",
logInfo: "logInfo",
},
renewInterval: 10,
},
},
};
// Test: Inicialización básica
const salix = new Salix();
await salix.init(loggerMock);
assert.strictEqual(salix.conf, loggerMock.conf, "La configuración no se inicializó correctamente");
console.log("✅ Salix.init configuró correctamente los valores iniciales.");
// Test: Cargar configuración
const mockFilePath = path.join(__dirname, "..", "config/salix.local.yml");
mockFs.files[mockFilePath] = JSON.stringify({ salixVersion: "1.0.0" });
const loadedConfig = await salix.loadConfig();
assert.deepStrictEqual(
loadedConfig,
{ salixVersion: "1.0.0" },
"La configuración cargada no coincide con lo esperado"
);
console.log("✅ Salix.loadConfig cargó correctamente el archivo YAML.");
// Test: Guardar configuración
const newConfig = { log: "testLog" };
await salix.saveConfig(newConfig);
const savedContent = JSON.parse(mockFs.files[mockFilePath]);
assert.strictEqual(
savedContent.log,
"testLog",
"La configuración no se guardó correctamente"
);
console.log("✅ Salix.saveConfig guardó correctamente el archivo YAML.");
// Test: Comparar versiones
const isDeprecated = salix.compareVersion("2.0.0", "1.0.0");
assert.strictEqual(
isDeprecated,
true,
"La comparación de versiones no detectó correctamente la versión como desactualizada"
);
console.log("✅ Salix.compareVersion funciona correctamente.");
// Limpia el mock del sistema de archivos
mockFs.reset();
} catch (error) {
console.error("❌ Error en las pruebas:", error);
} finally {
// Restaura los métodos originales de fs
Object.assign(fs, originalFs);
}
})();

View File

@ -6,7 +6,7 @@ const MultiMap = require('./multi-map');
* Loads model configuration. * Loads model configuration.
*/ */
module.exports = class ModelLoader { module.exports = class ModelLoader {
init(logger, logs = []) { init(logger) {
const configDir = path.join(__dirname, '..'); const configDir = path.join(__dirname, '..');
const conf = loadConfig(configDir, 'logs'); const conf = loadConfig(configDir, 'logs');
const schemaMap = new MultiMap(); const schemaMap = new MultiMap();
@ -20,11 +20,6 @@ module.exports = class ModelLoader {
schemaMap, schemaMap,
logMap logMap
}); });
if(!conf.logs ) conf.logs = {}
Object.keys(conf.logs??{}).length > 0 && console.warn('‼️ - Existen modelos NO exportados')
Object.assign(conf.logs, logs)
for (const logName in conf.logs) { for (const logName in conf.logs) {
const logConf = conf.logs[logName]; const logConf = conf.logs[logName];

View File

@ -1,242 +0,0 @@
const fs = require("fs");
const path = require("path");
const SALIX_VERSION_HEADER = "Salix-version";
const CONFIG_FILENAME = "/config/salix.local.yml";
const i18n = {
versionChanged: "La versión ha cambiado",
deprecatedVersion: "La configuracion guardada está desactualizada",
noDeprecatedVersion: "La configuración guardada no requiere actualización",
erroFs: {
noExists: "El archivo no existe",
noWrite: "Error al escribir el archivo YAML:",
noRead: "Error al leer el archivo YAML:",
},
};
module.exports = class Salix {
constructor() {
this.conf = null;
this._logInfo = [];
this._salixVersion = null;
this._salixConfig = null;
this._token = null;
}
emit() {}
subscribe(externalListenerFunction) {
this.emit = externalListenerFunction;
}
async getToken() {
const { url: path, user, password } = this.conf.salix.api.login;
let response;
try {
response = await fetch(`${this.conf.salix.url}/${path}`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "application/json",
},
body: JSON.stringify({
user,
password,
}),
});
response = await response.json();
} catch (error) {
console.error(error);
}
this.token = response.token;
}
async init(logger) {
this.conf = logger.conf;
const salixFileConfig = await this.loadConfig();
try {
// No exite fichero almacenado. Solicito config y version
// else {
this._salixConfig = salixFileConfig;
//Obtengo la version
await this.getSalixVersion(false);
this.salixVersion && (await this.handleVersion(salixFileConfig));
if (!salixFileConfig) await this.salixLogInfo();
this._logInfo = salixFileConfig.log;
// }
} catch (error) {}
setInterval(
() => this.getSalixVersion(),
this.conf.salix.renewInterval * 1000
);
}
get filePath() {
// Leer el contenido del archivo YAML
const configDir = path.join(__dirname, "..");
return `${configDir}${CONFIG_FILENAME}`;
}
async loadConfig() {
// try {
if (fs.existsSync(this.filePath)) {
const contenidoYAML = require(this.filePath);
return Object.assign({}, contenidoYAML);
} else {
new Error(`No existe el archivo de configuración`);
this.log("Creando archivo de configuración");
fs.writeFileSync(this.filePath, require("js-yaml").dump({}), "utf-8");
this.log("Archivo de configuración creado");
}
// } catch (error) {
// if (error.code === "ENOENT") {
// console.error(i18n.erroFs.noExists);
// } else {
// console.error(i18n.erroFs.noRead, error);
// }
// return null;
// }
}
async saveConfig(configValue) {
const defaultConfig = {
salixVersion: this.salixVersion,
};
try {
const contenidoYAML = fs.writeFileSync(
this.filePath,
require("js-yaml").dump(
Object.assign(defaultConfig, { log: configValue })
),
"utf-8"
);
return Object.assign({}, contenidoYAML);
} catch (error) {
if (error.code === "ENOENT") {
console.error(i18n.erroFs.noExists);
} else {
console.error(i18n.erroFs.noWrite, error);
}
return null;
}
}
parseVersion(value) {
return value.split(".").map(Number);
}
async handleVersion(salixFileConfig) {
const isDeprecated = this.compareVersion(
this.salixVersion,
salixFileConfig.salixVersion
);
isDeprecated && (await this.salixLogInfo());
}
compareVersion(v1, v2) {
let deprecatedConfig = false;
const version1 = this.parseVersion(v1);
const version2 = this.parseVersion(v2);
for (let i = 0; i < 3; i++) {
if (version1[i] > version2[i]) {
console.log(
// `La versión ${v1} es mayor que la versión ${v2}`
i18n.deprecatedVersion
);
deprecatedConfig = true;
break;
} else if (version1[i] < version2[i]) {
console.log(
// `La versión ${v1} es menor que la versión ${v2}`
i18n.noDeprecatedVersion
);
deprecatedConfig = false;
}
}
return deprecatedConfig;
}
async fetch(path) {
try {
// Configuración de la llamada fetch
const fetchConfig = {
method: "GET",
headers: {
"Content-Type": "application/json",
Authorization: `${this.token}`,
},
};
const salixCall = await fetch(
`${this.conf.salix.url}/${path}`,
fetchConfig
);
if (salixCall.status !== 200) {
throw new Error(response);
}
const responseText = await salixCall.text();
const response = JSON.parse(responseText);
const newVersion = salixCall.headers.get(SALIX_VERSION_HEADER);
if (!this.salixVersion) {
this.salixVersion = newVersion;
this.saveConfig();
await this.salixLogInfo();
}
if (this.salixVersion !== newVersion) {
const isDeprecated = this.compareVersion(newVersion, this.salixVersion);
if (isDeprecated) {
this.salixVersion = newVersion;
console.info(i18n.versionChanged);
await this.salixLogInfo();
}
}
return response;
} catch (response) {
const message = response?.error?.message ?? response.message;
this.log(message, 'error');
}
}
async getSalixVersion() {
this.log("CHECK VERSION");
await this.fetch(this.conf.salix.api.status);
this.log("VERSION CHECKED");
}
async salixLogInfo() {
this.log("REQUEST LOGINFO");
await this.getToken();
let salixLogConfig = await this.fetch(this.conf.salix.api.logInfo);
this._logInfo = salixLogConfig;
this.saveConfig(salixLogConfig);
this.log("LOGINFO REQUESTED");
}
log(param, type = 'log') {
const colors = {
log: '\x1b[37m',
error: '\x1b[31m',
warn: '\x1b[33m' ,
}
console[type](colors[type], `${Salix.name} - ${type} - ${param}`);
}
get logInfo() {
return this._logInfo;
}
get token() {
return this._token;
}
set token(token) {
this._token = token;
}
get salixVersion() {
return this._salixVersion;
}
set salixVersion(newVersion) {
if (this._salixVersion && this._salixVersion !== newVersion)
this.emit(newVersion);
this._salixVersion = newVersion;
}
};

View File

@ -2,10 +2,9 @@ require('require-yaml');
require('colors'); require('colors');
const ZongJi = require('./zongji'); const ZongJi = require('./zongji');
const mysql = require('mysql2/promise'); const mysql = require('mysql2/promise');
const { loadConfig } = require('./lib/util'); const {loadConfig} = require('./lib/util');
const ModelLoader = require('./lib/model-loader'); const ModelLoader = require('./lib/model-loader');
const ShowDb = require('./lib/show-db'); const ShowDb = require('./lib/show-db');
const Salix = require('./lib/salix');
module.exports = class MyLogger { module.exports = class MyLogger {
constructor() { constructor() {
@ -20,44 +19,33 @@ module.exports = class MyLogger {
} }
async start() { async start() {
const conf = (this.conf = loadConfig(__dirname, 'config')); const conf = this.conf = loadConfig(__dirname, 'config');
const salix = new Salix();
salix.subscribe(( ) => { this.modelLoader.init(this);
this.stop();
this.start();
});
await salix.init(this);
// await salix.salixLogInfo();
this.modelLoader.init(this, salix.logInfo);
this.showDb.init(this); this.showDb.init(this);
const includeSchema = {}; const includeSchema = {};
for (const [schemaName, tableMap] of this.schemaMap.map) for (const [schemaName, tableMap] of this.schemaMap.map)
includeSchema[schemaName] = Array.from(tableMap.keys()); includeSchema[schemaName] = Array.from(tableMap.keys());
this.zongjiOpts = { this.zongjiOpts = {
includeEvents: [ includeEvents: [
'rotate', 'rotate',
'tablemap', 'tablemap',
'writerows', 'writerows',
'updaterows', 'updaterows',
'deleterows', 'deleterows'
], ],
includeSchema, includeSchema,
serverId: conf.serverId, serverId: conf.serverId
}; };
if (conf.testMode) if (conf.testMode)
console.log('Test mode enabled, just logging queries to console.'); console.log('Test mode enabled, just logging queries to console.');
console.log('Starting process.'); console.log('Starting process.');
try { await this.init();
console.log('Process started.');
await this.init();
console.log('Process started.');
} catch (error) {
console.error(error);
}
} }
async stop() { async stop() {
@ -67,23 +55,18 @@ module.exports = class MyLogger {
} }
async init() { async init() {
const { conf } = this; const {conf} = this;
this.debug('MyLogger', 'Initializing.'); this.debug('MyLogger', 'Initializing.');
this.onErrorListener = (err) => this.onError(err);
// DB connection // DB connection
const db = this.db = await mysql.createConnection(conf.dstDb);
const db = (this.db = await mysql.createConnection(conf.dstDb));
db.on('error', this.onErrorListener);
await this.modelLoader.loadSchema(); await this.modelLoader.loadSchema();
await this.showDb.loadSchema(); await this.showDb.loadSchema();
for (const logInfo of this.logMap.values()) { for (const logInfo of this.logMap.values()) {
const table = logInfo.table; const table = logInfo.table;
const sqlTable = `${db.escapeId(table.schema)}.${db.escapeId( const sqlTable = `${db.escapeId(table.schema)}.${db.escapeId(table.name)}`;
table.name
)}`;
logInfo.addStmt = await db.prepare( logInfo.addStmt = await db.prepare(
`INSERT INTO ${sqlTable} `INSERT INTO ${sqlTable}
SET originFk = ?, SET originFk = ?,
@ -118,7 +101,7 @@ module.exports = class MyLogger {
// Zongji // Zongji
this.onBinlogListener = (evt) => this.onBinlog(evt); this.onBinlogListener = evt => this.onBinlog(evt);
const [res] = await db.query( const [res] = await db.query(
'SELECT `logName`, `position` FROM `util`.`binlogQueue` WHERE code = ?', 'SELECT `logName`, `position` FROM `util`.`binlogQueue` WHERE code = ?',
@ -133,13 +116,9 @@ module.exports = class MyLogger {
await this.zongjiStart(); await this.zongjiStart();
this.flushInterval = setInterval( this.flushInterval = setInterval(
() => this.flushQueue(), () => this.flushQueue(), conf.flushInterval * 1000);
conf.flushInterval * 1000
);
this.pingInterval = setInterval( this.pingInterval = setInterval(
() => this.connectionPing(), () => this.connectionPing(), conf.pingInterval * 1000);
conf.pingInterval * 1000
);
// Summary // Summary
@ -153,8 +132,6 @@ module.exports = class MyLogger {
this.running = false; this.running = false;
this.debug('MyLogger', 'Ending.'); this.debug('MyLogger', 'Ending.');
this.db.off('error', this.onErrorListener);
clearInterval(this.flushInterval); clearInterval(this.flushInterval);
clearInterval(this.pingInterval); clearInterval(this.pingInterval);
clearInterval(this.flushTimeout); clearInterval(this.flushTimeout);
@ -195,13 +172,12 @@ module.exports = class MyLogger {
const zongjiOpts = this.zongjiOpts; const zongjiOpts = this.zongjiOpts;
if (this.binlogName) { if (this.binlogName) {
this.debug( this.debug('Zongji',
'Zongji',
`Starting: ${this.binlogName}, position: ${this.binlogPosition}` `Starting: ${this.binlogName}, position: ${this.binlogPosition}`
); );
Object.assign(zongjiOpts, { Object.assign(zongjiOpts, {
filename: this.binlogName, filename: this.binlogName,
position: this.binlogPosition, position: this.binlogPosition
}); });
} else { } else {
this.debug('Zongji', 'Starting at end.'); this.debug('Zongji', 'Starting at end.');
@ -215,47 +191,41 @@ module.exports = class MyLogger {
zongji.off('error', onError); zongji.off('error', onError);
resolve(); resolve();
}; };
const onError = (err) => { const onError = err => {
zongji.off('ready', onReady); zongji.off('ready', onReady);
zongji.off('binlog', this.onBinlogListener); zongji.off('binlog', this.onBinlogListener);
reject(err); reject(err);
}; }
zongji.once('ready', onReady); zongji.once('ready', onReady);
zongji.once('error', onError); zongji.once('error', onError);
zongji.start(zongjiOpts); zongji.start(zongjiOpts);
}); });
zongji.on('error', this.onErrorListener);
this.zongji = zongji; this.zongji = zongji;
this.debug('Zongji', 'Started.'); this.debug('Zongji', 'Started.');
} }
async zongjiStop() { async zongjiStop() {
if (!this.zongji) return; if (!this.zongji) return;
this.debug( this.debug('Zongji',
'Zongji',
`Stopping: ${this.binlogName}, position: ${this.binlogPosition}` `Stopping: ${this.binlogName}, position: ${this.binlogPosition}`
); );
const zongji = this.zongji; const zongji = this.zongji;
this.zongji = null; this.zongji = null;
zongji.off('binlog', this.onBinlogListener); zongji.off('binlog', this.onBinlogListener);
zongji.off('error', this.onErrorListener);
// FIXME: Cannot call Zongji.stop(), it doesn't wait to end connection // FIXME: Cannot call Zongji.stop(), it doesn't wait to end connection
zongji.connection.destroy(() => { zongji.connection.destroy(() => {
console.log('zongji.connection.destroy'); console.log('zongji.connection.destroy');
}); });
await new Promise((resolve) => { await new Promise(resolve => {
zongji.ctrlConnection.query( zongji.ctrlConnection.query('KILL ?', [zongji.connection.threadId],
'KILL ?', err => {
[zongji.connection.threadId], // if (err && err.code !== 'ER_NO_SUCH_THREAD');
(err) => { // console.error(err);
// if (err && err.code !== 'ER_NO_SUCH_THREAD'); resolve();
// console.error(err); });
resolve();
}
);
}); });
zongji.ctrlConnection.destroy(() => { zongji.ctrlConnection.destroy(() => {
console.log('zongji.ctrlConnection.destroy'); console.log('zongji.ctrlConnection.destroy');
@ -268,7 +238,7 @@ module.exports = class MyLogger {
try { try {
await this.init(); await this.init();
console.log('Process restarted.'); console.log('Process restarted.');
} catch (err) { } catch(err) {
setTimeout(() => this.tryRestart(), this.conf.restartTimeout * 1000); setTimeout(() => this.tryRestart(), this.conf.restartTimeout * 1000);
} }
} }
@ -280,17 +250,21 @@ module.exports = class MyLogger {
try { try {
await this.end(true); await this.end(true);
} catch (e) {} } catch(e) {}
switch (err.code) { // FIXME: Error of mysql2/promise
case 'PROTOCOL_CONNECTION_LOST': if (err.message === `Can't add new command when connection is in closed state`)
case 'ECONNRESET': await this.tryRestart();
else
switch (err.code) {
case 'PROTOCOL_CONNECTION_LOST':
case 'ECONNRESET':
console.log('Trying to restart process.'); console.log('Trying to restart process.');
await this.tryRestart(); await this.tryRestart();
break; break;
default: default:
process.exit(); process.exit();
} }
} }
async onBinlog(evt) { async onBinlog(evt) {
@ -312,7 +286,8 @@ module.exports = class MyLogger {
} else { } else {
shouldFlush = true; shouldFlush = true;
this.binlogPosition = evt.nextPosition; this.binlogPosition = evt.nextPosition;
if (catchEvents.has(eventName)) this.onRowEvent(evt, eventName); if (catchEvents.has(eventName))
this.onRowEvent(evt, eventName);
} }
if (shouldFlush) this.isFlushed = false; if (shouldFlush) this.isFlushed = false;
@ -321,7 +296,7 @@ module.exports = class MyLogger {
this.debug('MyLogger', 'Queue full, stopping Zongji.'); this.debug('MyLogger', 'Queue full, stopping Zongji.');
await this.zongjiStop(); await this.zongjiStop();
} }
} catch (err) { } catch(err) {
this.handleError(err); this.handleError(err);
} }
} }
@ -332,9 +307,9 @@ module.exports = class MyLogger {
const tableInfo = this.schemaMap.get(table.parentSchema, tableName); const tableInfo = this.schemaMap.get(table.parentSchema, tableName);
if (!tableInfo) return; if (!tableInfo) return;
const action = actions[eventName]; const action = actions[eventName];
const { rowExcludeField, ignoreSystem } = tableInfo; const {rowExcludeField, ignoreSystem} = tableInfo;
const changes = []; const changes = [];
function isExcluded(row) { function isExcluded(row) {
@ -343,16 +318,19 @@ module.exports = class MyLogger {
} }
function cast(value, type) { function cast(value, type) {
if (value == null || !type) return value; if (value == null || !type)
return value;
const fn = castFn[type]; const fn = castFn[type];
return fn ? fn(value) : value; return fn ? fn(value) : value;
} }
function equals(a, b) { function equals(a, b) {
if (a === b) return true; if (a === b)
return true;
const type = typeof a; const type = typeof a;
if (a == null || b == null || type !== typeof b) return false; if (a == null || b == null || type !== typeof b)
return false;
if (type === 'object' && a.constructor === b.constructor) { if (type === 'object' && a.constructor === b.constructor) {
if (a instanceof Date) { if (a instanceof Date) {
// FIXME: zongji creates invalid dates for NULL DATE // FIXME: zongji creates invalid dates for NULL DATE
@ -361,14 +339,14 @@ module.exports = class MyLogger {
if (isNaN(aTime)) aTime = null; if (isNaN(aTime)) aTime = null;
let bTime = b.getTime(); let bTime = b.getTime();
if (isNaN(bTime)) bTime = null; if (isNaN(bTime)) bTime = null;
return aTime === bTime; return aTime === bTime;
} }
} }
return false; return false;
} }
const { castTypes } = tableInfo; const {castTypes} = tableInfo;
if (action == 'update') { if (action == 'update') {
const cols = tableInfo.columns; const cols = tableInfo.columns;
@ -394,7 +372,8 @@ module.exports = class MyLogger {
} }
} }
if (nColsChanged) changes.push({ row: after, oldI, newI }); if (nColsChanged)
changes.push({row: after, oldI, newI});
} }
} else { } else {
const cols = tableInfo.instanceColumns; const cols = tableInfo.instanceColumns;
@ -409,7 +388,7 @@ module.exports = class MyLogger {
instance[col] = cast(row[col], type); instance[col] = cast(row[col], type);
} }
changes.push({ row, instance }); changes.push({row, instance});
} }
} }
@ -420,7 +399,7 @@ module.exports = class MyLogger {
action, action,
evt, evt,
changes, changes,
binlogName: this.binlogName, binlogName: this.binlogName
}); });
if (!this.flushTimeout) if (!this.flushTimeout)
@ -430,8 +409,7 @@ module.exports = class MyLogger {
); );
if (this.conf.debug) if (this.conf.debug)
console.debug( console.debug('Evt:'.blue,
'Evt:'.blue,
`[${action}]`[actionColor[action]], `[${action}]`[actionColor[action]],
`${tableName}: ${changes.length} changes, queue: ${this.queue.length} elements` `${tableName}: ${changes.length} changes, queue: ${this.queue.length} elements`
); );
@ -440,7 +418,7 @@ module.exports = class MyLogger {
async flushQueue() { async flushQueue() {
if (this.isFlushed || this.isFlushing || !this.isOk) return; if (this.isFlushed || this.isFlushing || !this.isOk) return;
this.isFlushing = true; this.isFlushing = true;
const { conf, db, queue } = this; const {conf, db, queue} = this;
let op; let op;
try { try {
@ -457,17 +435,15 @@ module.exports = class MyLogger {
await db.query('START TRANSACTION'); await db.query('START TRANSACTION');
txStarted = true; txStarted = true;
for (op of ops)
await this.applyOp(op);
for (op of ops) await this.applyOp(op); this.debug('Queue', `applied: ${ops.length}, remaining: ${queue.length}`);
this.debug(
'Queue',
`applied: ${ops.length}, remaining: ${queue.length}`
);
await this.savePosition(op.binlogName, op.evt.nextPosition); await this.savePosition(op.binlogName, op.evt.nextPosition);
await db.query('COMMIT'); await db.query('COMMIT');
} catch (err) { } catch(err) {
queue.unshift(...ops); queue.unshift(...ops);
if (txStarted) if (txStarted)
try { try {
@ -484,7 +460,7 @@ module.exports = class MyLogger {
} else { } else {
await this.savePosition(this.binlogName, this.binlogPosition); await this.savePosition(this.binlogName, this.binlogPosition);
} }
} catch (err) { } catch(err) {
this.handleError(err); this.handleError(err);
} finally { } finally {
this.flushTimeout = null; this.flushTimeout = null;
@ -493,9 +469,20 @@ module.exports = class MyLogger {
} }
async applyOp(op) { async applyOp(op) {
const { conf } = this; const {conf} = this;
const { tableInfo, action, evt, changes } = op; const {
const { logInfo, isMain, relation, modelName, logFields } = tableInfo; tableInfo,
action,
evt,
changes
} = op;
const {
logInfo,
isMain,
relation,
modelName,
logFields
} = tableInfo;
const isDelete = action == 'delete'; const isDelete = action == 'delete';
const isUpdate = action == 'update'; const isUpdate = action == 'update';
@ -506,13 +493,14 @@ module.exports = class MyLogger {
let newI, oldI; let newI, oldI;
const row = change.row; const row = change.row;
switch (action) { switch(action) {
case 'update': case 'update':
newI = change.newI; newI = change.newI;
oldI = change.oldI; oldI = change.oldI;
if (logFields) { if (logFields) {
for (const field of logFields) for (const field of logFields)
if (newI[field] === undefined) newI[field] = row[field]; if (newI[field] === undefined)
newI[field] = row[field];
} }
break; break;
case 'insert': case 'insert':
@ -527,12 +515,12 @@ module.exports = class MyLogger {
const modelValue = change.modelValue ?? null; const modelValue = change.modelValue ?? null;
const oldInstance = oldI ? JSON.stringify(oldI) : null; const oldInstance = oldI ? JSON.stringify(oldI) : null;
const originFk = !isMain ? row[relation] : modelId; const originFk = !isMain ? row[relation] : modelId;
const originChanged = isUpdate && !isMain && newI[relation] !== undefined; const originChanged = isUpdate && !isMain
&& newI[relation] !== undefined;
let deleteRow; let deleteRow;
if (conf.debug) if (conf.debug)
console.debug( console.debug('Log:'.blue,
'Log:'.blue,
`[${action}]`[actionColor[action]], `[${action}]`[actionColor[action]],
`${logInfo.name}: ${originFk}, ${modelName}: ${modelId}` `${logInfo.name}: ${originFk}, ${modelName}: ${modelId}`
); );
@ -540,9 +528,7 @@ module.exports = class MyLogger {
try { try {
if (isDelete) { if (isDelete) {
[[deleteRow]] = await logInfo.fetchStmt.execute([ [[deleteRow]] = await logInfo.fetchStmt.execute([
modelName, modelName, modelId, originFk
modelId,
originFk,
]); ]);
if (!conf.testMode && deleteRow) if (!conf.testMode && deleteRow)
await logInfo.updateStmt.execute([ await logInfo.updateStmt.execute([
@ -550,7 +536,8 @@ module.exports = class MyLogger {
created, created,
oldInstance, oldInstance,
modelValue, modelValue,
deleteRow.id, summaryId,
deleteRow.id
]); ]);
} }
if (!conf.testMode && (!isDelete || !deleteRow)) { if (!conf.testMode && (!isDelete || !deleteRow)) {
@ -566,34 +553,33 @@ module.exports = class MyLogger {
newI ? JSON.stringify(newI) : null, newI ? JSON.stringify(newI) : null,
modelId, modelId,
modelValue, modelValue,
summaryId
]); ]);
} }
if (originChanged) await log(oldI[relation]); if (originChanged)
await log(oldI[relation]);
await log(originFk); await log(originFk);
} }
} catch (err) { } catch (err) {
if (err.code == 'ER_NO_REFERENCED_ROW_2') { if (err.code == 'ER_NO_REFERENCED_ROW_2') {
this.debug('Log', `Ignored because of constraint failed.`); this.debug('Log', `Ignored because of constraint failed.`);
} else throw err; } else
throw err;
} }
} }
} }
async savePosition(binlogName, binlogPosition) { async savePosition(binlogName, binlogPosition) {
this.debug('Flush', `filename: ${binlogName}, position: ${binlogPosition}`); this.debug('Flush', `filename: ${binlogName}, position: ${binlogPosition}`);
const replaceQuery = const replaceQuery =
'REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?'; 'REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?';
if (!this.conf.testMode) if (!this.conf.testMode)
await this.db.query(replaceQuery, [ await this.db.query(replaceQuery, [this.conf.code, binlogName, binlogPosition]);
this.conf.code,
binlogName,
binlogPosition,
]);
this.isFlushed = this.isFlushed = this.binlogName == binlogName
this.binlogName == binlogName && this.binlogPosition == binlogPosition; && this.binlogPosition == binlogPosition;
} }
async connectionPing() { async connectionPing() {
@ -604,40 +590,45 @@ module.exports = class MyLogger {
if (this.zongji) { if (this.zongji) {
// FIXME: Should Zongji.connection be pinged? // FIXME: Should Zongji.connection be pinged?
await new Promise((resolve, reject) => { await new Promise((resolve, reject) => {
this.zongji.ctrlConnection.ping((err) => { this.zongji.ctrlConnection.ping(err => {
if (err) return reject(err); if (err) return reject(err);
resolve(); resolve();
}); });
}); })
} }
await this.db.ping(); await this.db.ping();
} catch (err) { } catch(err) {
this.handleError(err); this.handleError(err);
} }
} }
debug(namespace, message) { debug(namespace, message) {
if (this.conf.debug) console.debug(`${namespace}:`.blue, message.yellow); if (this.conf.debug)
console.debug(`${namespace}:`.blue, message.yellow);
} }
}; }
const catchEvents = new Set(['writerows', 'updaterows', 'deleterows']); const catchEvents = new Set([
'writerows',
'updaterows',
'deleterows'
]);
const actions = { const actions = {
writerows: 'insert', writerows: 'insert',
updaterows: 'update', updaterows: 'update',
deleterows: 'delete', deleterows: 'delete'
}; };
const actionColor = { const actionColor = {
insert: 'green', insert: 'green',
update: 'yellow', update: 'yellow',
delete: 'red', delete: 'red'
}; };
const castFn = { const castFn = {
boolean: function (value) { boolean: function(value) {
return !!value; return !!value;
}, }
}; };