WIP: 6533 myLogger_salix #2
|
@ -0,0 +1 @@
|
|||
salixVersion: 24.40.0
|
|
@ -0,0 +1,99 @@
|
|||
const assert = require("assert");
|
||||
const fs = require("fs");
|
||||
const path = require("path");
|
||||
const Salix = require("./Salix"); // Ajusta la ruta si está en otro directorio
|
||||
|
||||
// Mock del sistema de archivos
|
||||
const mockFs = {
|
||||
files: {},
|
||||
|
||||
existsSync(filePath) {
|
||||
return !!this.files[filePath];
|
||||
},
|
||||
|
||||
writeFileSync(filePath, content) {
|
||||
this.files[filePath] = content;
|
||||
},
|
||||
|
||||
readFileSync(filePath) {
|
||||
if (!this.existsSync(filePath)) {
|
||||
throw { code: "ENOENT" };
|
||||
}
|
||||
return this.files[filePath];
|
||||
},
|
||||
|
||||
reset() {
|
||||
this.files = {};
|
||||
},
|
||||
};
|
||||
|
||||
// Reemplaza los métodos reales de fs con el mock
|
||||
const originalFs = { ...fs };
|
||||
Object.assign(fs, mockFs);
|
||||
|
||||
(async () => {
|
||||
try {
|
||||
// Configuración inicial
|
||||
const loggerMock = {
|
||||
conf: {
|
||||
salix: {
|
||||
url: "http://example.com",
|
||||
api: {
|
||||
status: "status",
|
||||
logInfo: "logInfo",
|
||||
},
|
||||
renewInterval: 10,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
// Test: Inicialización básica
|
||||
const salix = new Salix();
|
||||
await salix.init(loggerMock);
|
||||
|
||||
assert.strictEqual(salix.conf, loggerMock.conf, "La configuración no se inicializó correctamente");
|
||||
console.log("✅ Salix.init configuró correctamente los valores iniciales.");
|
||||
|
||||
// Test: Cargar configuración
|
||||
const mockFilePath = path.join(__dirname, "..", "config/salix.local.yml");
|
||||
mockFs.files[mockFilePath] = JSON.stringify({ salixVersion: "1.0.0" });
|
||||
|
||||
const loadedConfig = await salix.loadConfig();
|
||||
assert.deepStrictEqual(
|
||||
loadedConfig,
|
||||
{ salixVersion: "1.0.0" },
|
||||
"La configuración cargada no coincide con lo esperado"
|
||||
);
|
||||
console.log("✅ Salix.loadConfig cargó correctamente el archivo YAML.");
|
||||
|
||||
// Test: Guardar configuración
|
||||
const newConfig = { log: "testLog" };
|
||||
await salix.saveConfig(newConfig);
|
||||
|
||||
const savedContent = JSON.parse(mockFs.files[mockFilePath]);
|
||||
assert.strictEqual(
|
||||
savedContent.log,
|
||||
"testLog",
|
||||
"La configuración no se guardó correctamente"
|
||||
);
|
||||
console.log("✅ Salix.saveConfig guardó correctamente el archivo YAML.");
|
||||
|
||||
// Test: Comparar versiones
|
||||
const isDeprecated = salix.compareVersion("2.0.0", "1.0.0");
|
||||
assert.strictEqual(
|
||||
isDeprecated,
|
||||
true,
|
||||
"La comparación de versiones no detectó correctamente la versión como desactualizada"
|
||||
);
|
||||
console.log("✅ Salix.compareVersion funciona correctamente.");
|
||||
|
||||
// Limpia el mock del sistema de archivos
|
||||
mockFs.reset();
|
||||
|
||||
} catch (error) {
|
||||
console.error("❌ Error en las pruebas:", error);
|
||||
} finally {
|
||||
// Restaura los métodos originales de fs
|
||||
Object.assign(fs, originalFs);
|
||||
}
|
||||
})();
|
|
@ -6,7 +6,7 @@ const MultiMap = require('./multi-map');
|
|||
* Loads model configuration.
|
||||
*/
|
||||
module.exports = class ModelLoader {
|
||||
init(logger) {
|
||||
init(logger, logs = []) {
|
||||
const configDir = path.join(__dirname, '..');
|
||||
const conf = loadConfig(configDir, 'logs');
|
||||
const schemaMap = new MultiMap();
|
||||
|
@ -21,6 +21,11 @@ module.exports = class ModelLoader {
|
|||
logMap
|
||||
});
|
||||
|
||||
if(!conf.logs ) conf.logs = {}
|
||||
Object.keys(conf.logs??{}).length > 0 && console.warn('‼️ - Existen modelos NO exportados')
|
||||
Object.assign(conf.logs, logs)
|
||||
|
||||
|
||||
for (const logName in conf.logs) {
|
||||
const logConf = conf.logs[logName];
|
||||
const schema = logConf.schema || logger.conf.dstDb.database;
|
||||
|
|
|
@ -0,0 +1,242 @@
|
|||
const fs = require("fs");
|
||||
const path = require("path");
|
||||
const SALIX_VERSION_HEADER = "Salix-version";
|
||||
const CONFIG_FILENAME = "/config/salix.local.yml";
|
||||
const i18n = {
|
||||
versionChanged: "La versión ha cambiado",
|
||||
deprecatedVersion: "La configuracion guardada está desactualizada",
|
||||
noDeprecatedVersion: "La configuración guardada no requiere actualización",
|
||||
erroFs: {
|
||||
noExists: "El archivo no existe",
|
||||
noWrite: "Error al escribir el archivo YAML:",
|
||||
noRead: "Error al leer el archivo YAML:",
|
||||
},
|
||||
};
|
||||
module.exports = class Salix {
|
||||
constructor() {
|
||||
this.conf = null;
|
||||
this._logInfo = [];
|
||||
this._salixVersion = null;
|
||||
this._salixConfig = null;
|
||||
this._token = null;
|
||||
}
|
||||
emit() {}
|
||||
subscribe(externalListenerFunction) {
|
||||
this.emit = externalListenerFunction;
|
||||
}
|
||||
|
||||
async getToken() {
|
||||
const { url: path, user, password } = this.conf.salix.api.login;
|
||||
let response;
|
||||
try {
|
||||
response = await fetch(`${this.conf.salix.url}/${path}`, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Accept: "application/json",
|
||||
},
|
||||
body: JSON.stringify({
|
||||
user,
|
||||
password,
|
||||
}),
|
||||
});
|
||||
response = await response.json();
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
}
|
||||
this.token = response.token;
|
||||
}
|
||||
|
||||
async init(logger) {
|
||||
this.conf = logger.conf;
|
||||
const salixFileConfig = await this.loadConfig();
|
||||
try {
|
||||
// No exite fichero almacenado. Solicito config y version
|
||||
// else {
|
||||
|
||||
this._salixConfig = salixFileConfig;
|
||||
//Obtengo la version
|
||||
await this.getSalixVersion(false);
|
||||
this.salixVersion && (await this.handleVersion(salixFileConfig));
|
||||
if (!salixFileConfig) await this.salixLogInfo();
|
||||
this._logInfo = salixFileConfig.log;
|
||||
// }
|
||||
} catch (error) {}
|
||||
|
||||
setInterval(
|
||||
() => this.getSalixVersion(),
|
||||
this.conf.salix.renewInterval * 1000
|
||||
);
|
||||
}
|
||||
get filePath() {
|
||||
// Leer el contenido del archivo YAML
|
||||
const configDir = path.join(__dirname, "..");
|
||||
return `${configDir}${CONFIG_FILENAME}`;
|
||||
}
|
||||
async loadConfig() {
|
||||
// try {
|
||||
if (fs.existsSync(this.filePath)) {
|
||||
const contenidoYAML = require(this.filePath);
|
||||
return Object.assign({}, contenidoYAML);
|
||||
} else {
|
||||
new Error(`No existe el archivo de configuración`);
|
||||
this.log("Creando archivo de configuración");
|
||||
fs.writeFileSync(this.filePath, require("js-yaml").dump({}), "utf-8");
|
||||
this.log("Archivo de configuración creado");
|
||||
}
|
||||
// } catch (error) {
|
||||
// if (error.code === "ENOENT") {
|
||||
// console.error(i18n.erroFs.noExists);
|
||||
// } else {
|
||||
// console.error(i18n.erroFs.noRead, error);
|
||||
// }
|
||||
// return null;
|
||||
// }
|
||||
}
|
||||
|
||||
async saveConfig(configValue) {
|
||||
const defaultConfig = {
|
||||
salixVersion: this.salixVersion,
|
||||
};
|
||||
try {
|
||||
const contenidoYAML = fs.writeFileSync(
|
||||
this.filePath,
|
||||
require("js-yaml").dump(
|
||||
Object.assign(defaultConfig, { log: configValue })
|
||||
),
|
||||
"utf-8"
|
||||
);
|
||||
return Object.assign({}, contenidoYAML);
|
||||
} catch (error) {
|
||||
if (error.code === "ENOENT") {
|
||||
console.error(i18n.erroFs.noExists);
|
||||
} else {
|
||||
console.error(i18n.erroFs.noWrite, error);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
parseVersion(value) {
|
||||
return value.split(".").map(Number);
|
||||
}
|
||||
|
||||
async handleVersion(salixFileConfig) {
|
||||
const isDeprecated = this.compareVersion(
|
||||
this.salixVersion,
|
||||
salixFileConfig.salixVersion
|
||||
);
|
||||
isDeprecated && (await this.salixLogInfo());
|
||||
}
|
||||
compareVersion(v1, v2) {
|
||||
let deprecatedConfig = false;
|
||||
const version1 = this.parseVersion(v1);
|
||||
const version2 = this.parseVersion(v2);
|
||||
for (let i = 0; i < 3; i++) {
|
||||
if (version1[i] > version2[i]) {
|
||||
console.log(
|
||||
// `La versión ${v1} es mayor que la versión ${v2}`
|
||||
i18n.deprecatedVersion
|
||||
);
|
||||
deprecatedConfig = true;
|
||||
break;
|
||||
} else if (version1[i] < version2[i]) {
|
||||
console.log(
|
||||
// `La versión ${v1} es menor que la versión ${v2}`
|
||||
i18n.noDeprecatedVersion
|
||||
);
|
||||
deprecatedConfig = false;
|
||||
}
|
||||
}
|
||||
return deprecatedConfig;
|
||||
}
|
||||
|
||||
async fetch(path) {
|
||||
try {
|
||||
// Configuración de la llamada fetch
|
||||
const fetchConfig = {
|
||||
method: "GET",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Authorization: `${this.token}`,
|
||||
},
|
||||
};
|
||||
|
||||
const salixCall = await fetch(
|
||||
`${this.conf.salix.url}/${path}`,
|
||||
fetchConfig
|
||||
);
|
||||
if (salixCall.status !== 200) {
|
||||
throw new Error(response);
|
||||
}
|
||||
const responseText = await salixCall.text();
|
||||
const response = JSON.parse(responseText);
|
||||
|
||||
const newVersion = salixCall.headers.get(SALIX_VERSION_HEADER);
|
||||
if (!this.salixVersion) {
|
||||
this.salixVersion = newVersion;
|
||||
this.saveConfig();
|
||||
await this.salixLogInfo();
|
||||
}
|
||||
if (this.salixVersion !== newVersion) {
|
||||
const isDeprecated = this.compareVersion(newVersion, this.salixVersion);
|
||||
if (isDeprecated) {
|
||||
this.salixVersion = newVersion;
|
||||
console.info(i18n.versionChanged);
|
||||
await this.salixLogInfo();
|
||||
}
|
||||
}
|
||||
return response;
|
||||
} catch (response) {
|
||||
const message = response?.error?.message ?? response.message;
|
||||
this.log(message, 'error');
|
||||
}
|
||||
}
|
||||
|
||||
async getSalixVersion() {
|
||||
this.log("CHECK VERSION");
|
||||
|
||||
await this.fetch(this.conf.salix.api.status);
|
||||
|
||||
this.log("VERSION CHECKED");
|
||||
}
|
||||
|
||||
async salixLogInfo() {
|
||||
this.log("REQUEST LOGINFO");
|
||||
await this.getToken();
|
||||
let salixLogConfig = await this.fetch(this.conf.salix.api.logInfo);
|
||||
|
||||
this._logInfo = salixLogConfig;
|
||||
this.saveConfig(salixLogConfig);
|
||||
this.log("LOGINFO REQUESTED");
|
||||
}
|
||||
|
||||
log(param, type = 'log') {
|
||||
const colors = {
|
||||
log: '\x1b[37m',
|
||||
error: '\x1b[31m',
|
||||
warn: '\x1b[33m' ,
|
||||
}
|
||||
console[type](colors[type], `${Salix.name} - ${type} - ${param}`);
|
||||
}
|
||||
|
||||
get logInfo() {
|
||||
return this._logInfo;
|
||||
}
|
||||
|
||||
get token() {
|
||||
return this._token;
|
||||
}
|
||||
set token(token) {
|
||||
this._token = token;
|
||||
}
|
||||
get salixVersion() {
|
||||
return this._salixVersion;
|
||||
}
|
||||
set salixVersion(newVersion) {
|
||||
if (this._salixVersion && this._salixVersion !== newVersion)
|
||||
this.emit(newVersion);
|
||||
|
||||
this._salixVersion = newVersion;
|
||||
}
|
||||
};
|
201
mylogger.js
201
mylogger.js
|
@ -2,9 +2,10 @@ require('require-yaml');
|
|||
require('colors');
|
||||
const ZongJi = require('./zongji');
|
||||
const mysql = require('mysql2/promise');
|
||||
const {loadConfig} = require('./lib/util');
|
||||
const { loadConfig } = require('./lib/util');
|
||||
const ModelLoader = require('./lib/model-loader');
|
||||
const ShowDb = require('./lib/show-db');
|
||||
const Salix = require('./lib/salix');
|
||||
|
||||
module.exports = class MyLogger {
|
||||
constructor() {
|
||||
|
@ -19,11 +20,16 @@ module.exports = class MyLogger {
|
|||
}
|
||||
|
||||
async start() {
|
||||
const conf = this.conf = loadConfig(__dirname, 'config');
|
||||
|
||||
this.modelLoader.init(this);
|
||||
const conf = (this.conf = loadConfig(__dirname, 'config'));
|
||||
const salix = new Salix();
|
||||
salix.subscribe(( ) => {
|
||||
this.stop();
|
||||
this.start();
|
||||
});
|
||||
await salix.init(this);
|
||||
// await salix.salixLogInfo();
|
||||
this.modelLoader.init(this, salix.logInfo);
|
||||
this.showDb.init(this);
|
||||
|
||||
const includeSchema = {};
|
||||
for (const [schemaName, tableMap] of this.schemaMap.map)
|
||||
includeSchema[schemaName] = Array.from(tableMap.keys());
|
||||
|
@ -34,18 +40,24 @@ module.exports = class MyLogger {
|
|||
'tablemap',
|
||||
'writerows',
|
||||
'updaterows',
|
||||
'deleterows'
|
||||
'deleterows',
|
||||
],
|
||||
includeSchema,
|
||||
serverId: conf.serverId
|
||||
serverId: conf.serverId,
|
||||
};
|
||||
|
||||
if (conf.testMode)
|
||||
console.log('Test mode enabled, just logging queries to console.');
|
||||
|
||||
console.log('Starting process.');
|
||||
try {
|
||||
|
||||
await this.init();
|
||||
console.log('Process started.');
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
async stop() {
|
||||
|
@ -55,18 +67,23 @@ module.exports = class MyLogger {
|
|||
}
|
||||
|
||||
async init() {
|
||||
const {conf} = this;
|
||||
const { conf } = this;
|
||||
this.debug('MyLogger', 'Initializing.');
|
||||
this.onErrorListener = (err) => this.onError(err);
|
||||
|
||||
// DB connection
|
||||
const db = this.db = await mysql.createConnection(conf.dstDb);
|
||||
|
||||
const db = (this.db = await mysql.createConnection(conf.dstDb));
|
||||
db.on('error', this.onErrorListener);
|
||||
|
||||
await this.modelLoader.loadSchema();
|
||||
await this.showDb.loadSchema();
|
||||
|
||||
for (const logInfo of this.logMap.values()) {
|
||||
const table = logInfo.table;
|
||||
const sqlTable = `${db.escapeId(table.schema)}.${db.escapeId(table.name)}`;
|
||||
const sqlTable = `${db.escapeId(table.schema)}.${db.escapeId(
|
||||
table.name
|
||||
)}`;
|
||||
logInfo.addStmt = await db.prepare(
|
||||
`INSERT INTO ${sqlTable}
|
||||
SET originFk = ?,
|
||||
|
@ -101,7 +118,7 @@ module.exports = class MyLogger {
|
|||
|
||||
// Zongji
|
||||
|
||||
this.onBinlogListener = evt => this.onBinlog(evt);
|
||||
this.onBinlogListener = (evt) => this.onBinlog(evt);
|
||||
|
||||
const [res] = await db.query(
|
||||
'SELECT `logName`, `position` FROM `util`.`binlogQueue` WHERE code = ?',
|
||||
|
@ -116,9 +133,13 @@ module.exports = class MyLogger {
|
|||
await this.zongjiStart();
|
||||
|
||||
this.flushInterval = setInterval(
|
||||
() => this.flushQueue(), conf.flushInterval * 1000);
|
||||
() => this.flushQueue(),
|
||||
conf.flushInterval * 1000
|
||||
);
|
||||
this.pingInterval = setInterval(
|
||||
() => this.connectionPing(), conf.pingInterval * 1000);
|
||||
() => this.connectionPing(),
|
||||
conf.pingInterval * 1000
|
||||
);
|
||||
|
||||
// Summary
|
||||
|
||||
|
@ -132,6 +153,8 @@ module.exports = class MyLogger {
|
|||
this.running = false;
|
||||
this.debug('MyLogger', 'Ending.');
|
||||
|
||||
this.db.off('error', this.onErrorListener);
|
||||
|
||||
clearInterval(this.flushInterval);
|
||||
clearInterval(this.pingInterval);
|
||||
clearInterval(this.flushTimeout);
|
||||
|
@ -172,12 +195,13 @@ module.exports = class MyLogger {
|
|||
const zongjiOpts = this.zongjiOpts;
|
||||
|
||||
if (this.binlogName) {
|
||||
this.debug('Zongji',
|
||||
this.debug(
|
||||
'Zongji',
|
||||
`Starting: ${this.binlogName}, position: ${this.binlogPosition}`
|
||||
);
|
||||
Object.assign(zongjiOpts, {
|
||||
filename: this.binlogName,
|
||||
position: this.binlogPosition
|
||||
position: this.binlogPosition,
|
||||
});
|
||||
} else {
|
||||
this.debug('Zongji', 'Starting at end.');
|
||||
|
@ -191,41 +215,47 @@ module.exports = class MyLogger {
|
|||
zongji.off('error', onError);
|
||||
resolve();
|
||||
};
|
||||
const onError = err => {
|
||||
const onError = (err) => {
|
||||
zongji.off('ready', onReady);
|
||||
zongji.off('binlog', this.onBinlogListener);
|
||||
reject(err);
|
||||
}
|
||||
};
|
||||
|
||||
zongji.once('ready', onReady);
|
||||
zongji.once('error', onError);
|
||||
zongji.start(zongjiOpts);
|
||||
});
|
||||
zongji.on('error', this.onErrorListener);
|
||||
this.zongji = zongji;
|
||||
this.debug('Zongji', 'Started.');
|
||||
}
|
||||
|
||||
async zongjiStop() {
|
||||
if (!this.zongji) return;
|
||||
this.debug('Zongji',
|
||||
this.debug(
|
||||
'Zongji',
|
||||
`Stopping: ${this.binlogName}, position: ${this.binlogPosition}`
|
||||
);
|
||||
const zongji = this.zongji;
|
||||
this.zongji = null;
|
||||
|
||||
zongji.off('binlog', this.onBinlogListener);
|
||||
zongji.off('error', this.onErrorListener);
|
||||
|
||||
// FIXME: Cannot call Zongji.stop(), it doesn't wait to end connection
|
||||
zongji.connection.destroy(() => {
|
||||
console.log('zongji.connection.destroy');
|
||||
});
|
||||
await new Promise(resolve => {
|
||||
zongji.ctrlConnection.query('KILL ?', [zongji.connection.threadId],
|
||||
err => {
|
||||
await new Promise((resolve) => {
|
||||
zongji.ctrlConnection.query(
|
||||
'KILL ?',
|
||||
[zongji.connection.threadId],
|
||||
(err) => {
|
||||
// if (err && err.code !== 'ER_NO_SUCH_THREAD');
|
||||
// console.error(err);
|
||||
resolve();
|
||||
});
|
||||
}
|
||||
);
|
||||
});
|
||||
zongji.ctrlConnection.destroy(() => {
|
||||
console.log('zongji.ctrlConnection.destroy');
|
||||
|
@ -238,7 +268,7 @@ module.exports = class MyLogger {
|
|||
try {
|
||||
await this.init();
|
||||
console.log('Process restarted.');
|
||||
} catch(err) {
|
||||
} catch (err) {
|
||||
setTimeout(() => this.tryRestart(), this.conf.restartTimeout * 1000);
|
||||
}
|
||||
}
|
||||
|
@ -250,12 +280,8 @@ module.exports = class MyLogger {
|
|||
|
||||
try {
|
||||
await this.end(true);
|
||||
} catch(e) {}
|
||||
} catch (e) {}
|
||||
|
||||
// FIXME: Error of mysql2/promise
|
||||
if (err.message === `Can't add new command when connection is in closed state`)
|
||||
await this.tryRestart();
|
||||
else
|
||||
switch (err.code) {
|
||||
case 'PROTOCOL_CONNECTION_LOST':
|
||||
case 'ECONNRESET':
|
||||
|
@ -286,8 +312,7 @@ module.exports = class MyLogger {
|
|||
} else {
|
||||
shouldFlush = true;
|
||||
this.binlogPosition = evt.nextPosition;
|
||||
if (catchEvents.has(eventName))
|
||||
this.onRowEvent(evt, eventName);
|
||||
if (catchEvents.has(eventName)) this.onRowEvent(evt, eventName);
|
||||
}
|
||||
|
||||
if (shouldFlush) this.isFlushed = false;
|
||||
|
@ -296,7 +321,7 @@ module.exports = class MyLogger {
|
|||
this.debug('MyLogger', 'Queue full, stopping Zongji.');
|
||||
await this.zongjiStop();
|
||||
}
|
||||
} catch(err) {
|
||||
} catch (err) {
|
||||
this.handleError(err);
|
||||
}
|
||||
}
|
||||
|
@ -309,7 +334,7 @@ module.exports = class MyLogger {
|
|||
if (!tableInfo) return;
|
||||
|
||||
const action = actions[eventName];
|
||||
const {rowExcludeField, ignoreSystem} = tableInfo;
|
||||
const { rowExcludeField, ignoreSystem } = tableInfo;
|
||||
const changes = [];
|
||||
|
||||
function isExcluded(row) {
|
||||
|
@ -318,19 +343,16 @@ module.exports = class MyLogger {
|
|||
}
|
||||
|
||||
function cast(value, type) {
|
||||
if (value == null || !type)
|
||||
return value;
|
||||
if (value == null || !type) return value;
|
||||
|
||||
const fn = castFn[type];
|
||||
return fn ? fn(value) : value;
|
||||
}
|
||||
|
||||
function equals(a, b) {
|
||||
if (a === b)
|
||||
return true;
|
||||
if (a === b) return true;
|
||||
const type = typeof a;
|
||||
if (a == null || b == null || type !== typeof b)
|
||||
return false;
|
||||
if (a == null || b == null || type !== typeof b) return false;
|
||||
if (type === 'object' && a.constructor === b.constructor) {
|
||||
if (a instanceof Date) {
|
||||
// FIXME: zongji creates invalid dates for NULL DATE
|
||||
|
@ -346,7 +368,7 @@ module.exports = class MyLogger {
|
|||
return false;
|
||||
}
|
||||
|
||||
const {castTypes} = tableInfo;
|
||||
const { castTypes } = tableInfo;
|
||||
|
||||
if (action == 'update') {
|
||||
const cols = tableInfo.columns;
|
||||
|
@ -372,8 +394,7 @@ module.exports = class MyLogger {
|
|||
}
|
||||
}
|
||||
|
||||
if (nColsChanged)
|
||||
changes.push({row: after, oldI, newI});
|
||||
if (nColsChanged) changes.push({ row: after, oldI, newI });
|
||||
}
|
||||
} else {
|
||||
const cols = tableInfo.instanceColumns;
|
||||
|
@ -388,7 +409,7 @@ module.exports = class MyLogger {
|
|||
instance[col] = cast(row[col], type);
|
||||
}
|
||||
|
||||
changes.push({row, instance});
|
||||
changes.push({ row, instance });
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -399,7 +420,7 @@ module.exports = class MyLogger {
|
|||
action,
|
||||
evt,
|
||||
changes,
|
||||
binlogName: this.binlogName
|
||||
binlogName: this.binlogName,
|
||||
});
|
||||
|
||||
if (!this.flushTimeout)
|
||||
|
@ -409,7 +430,8 @@ module.exports = class MyLogger {
|
|||
);
|
||||
|
||||
if (this.conf.debug)
|
||||
console.debug('Evt:'.blue,
|
||||
console.debug(
|
||||
'Evt:'.blue,
|
||||
`[${action}]`[actionColor[action]],
|
||||
`${tableName}: ${changes.length} changes, queue: ${this.queue.length} elements`
|
||||
);
|
||||
|
@ -418,7 +440,7 @@ module.exports = class MyLogger {
|
|||
async flushQueue() {
|
||||
if (this.isFlushed || this.isFlushing || !this.isOk) return;
|
||||
this.isFlushing = true;
|
||||
const {conf, db, queue} = this;
|
||||
const { conf, db, queue } = this;
|
||||
let op;
|
||||
|
||||
try {
|
||||
|
@ -436,14 +458,16 @@ module.exports = class MyLogger {
|
|||
await db.query('START TRANSACTION');
|
||||
txStarted = true;
|
||||
|
||||
for (op of ops)
|
||||
await this.applyOp(op);
|
||||
for (op of ops) await this.applyOp(op);
|
||||
|
||||
this.debug('Queue', `applied: ${ops.length}, remaining: ${queue.length}`);
|
||||
this.debug(
|
||||
'Queue',
|
||||
`applied: ${ops.length}, remaining: ${queue.length}`
|
||||
);
|
||||
await this.savePosition(op.binlogName, op.evt.nextPosition);
|
||||
|
||||
await db.query('COMMIT');
|
||||
} catch(err) {
|
||||
} catch (err) {
|
||||
queue.unshift(...ops);
|
||||
if (txStarted)
|
||||
try {
|
||||
|
@ -460,7 +484,7 @@ module.exports = class MyLogger {
|
|||
} else {
|
||||
await this.savePosition(this.binlogName, this.binlogPosition);
|
||||
}
|
||||
} catch(err) {
|
||||
} catch (err) {
|
||||
this.handleError(err);
|
||||
} finally {
|
||||
this.flushTimeout = null;
|
||||
|
@ -469,20 +493,9 @@ module.exports = class MyLogger {
|
|||
}
|
||||
|
||||
async applyOp(op) {
|
||||
const {conf} = this;
|
||||
const {
|
||||
tableInfo,
|
||||
action,
|
||||
evt,
|
||||
changes
|
||||
} = op;
|
||||
const {
|
||||
logInfo,
|
||||
isMain,
|
||||
relation,
|
||||
modelName,
|
||||
logFields
|
||||
} = tableInfo;
|
||||
const { conf } = this;
|
||||
const { tableInfo, action, evt, changes } = op;
|
||||
const { logInfo, isMain, relation, modelName, logFields } = tableInfo;
|
||||
|
||||
const isDelete = action == 'delete';
|
||||
const isUpdate = action == 'update';
|
||||
|
@ -493,14 +506,13 @@ module.exports = class MyLogger {
|
|||
let newI, oldI;
|
||||
const row = change.row;
|
||||
|
||||
switch(action) {
|
||||
switch (action) {
|
||||
case 'update':
|
||||
newI = change.newI;
|
||||
oldI = change.oldI;
|
||||
if (logFields) {
|
||||
for (const field of logFields)
|
||||
if (newI[field] === undefined)
|
||||
newI[field] = row[field];
|
||||
if (newI[field] === undefined) newI[field] = row[field];
|
||||
}
|
||||
break;
|
||||
case 'insert':
|
||||
|
@ -515,12 +527,12 @@ module.exports = class MyLogger {
|
|||
const modelValue = change.modelValue ?? null;
|
||||
const oldInstance = oldI ? JSON.stringify(oldI) : null;
|
||||
const originFk = !isMain ? row[relation] : modelId;
|
||||
const originChanged = isUpdate && !isMain
|
||||
&& newI[relation] !== undefined;
|
||||
const originChanged = isUpdate && !isMain && newI[relation] !== undefined;
|
||||
|
||||
let deleteRow;
|
||||
if (conf.debug)
|
||||
console.debug('Log:'.blue,
|
||||
console.debug(
|
||||
'Log:'.blue,
|
||||
`[${action}]`[actionColor[action]],
|
||||
`${logInfo.name}: ${originFk}, ${modelName}: ${modelId}`
|
||||
);
|
||||
|
@ -528,7 +540,9 @@ module.exports = class MyLogger {
|
|||
try {
|
||||
if (isDelete) {
|
||||
[[deleteRow]] = await logInfo.fetchStmt.execute([
|
||||
modelName, modelId, originFk
|
||||
modelName,
|
||||
modelId,
|
||||
originFk,
|
||||
]);
|
||||
if (!conf.testMode && deleteRow)
|
||||
await logInfo.updateStmt.execute([
|
||||
|
@ -536,8 +550,7 @@ module.exports = class MyLogger {
|
|||
created,
|
||||
oldInstance,
|
||||
modelValue,
|
||||
summaryId,
|
||||
deleteRow.id
|
||||
deleteRow.id,
|
||||
]);
|
||||
}
|
||||
if (!conf.testMode && (!isDelete || !deleteRow)) {
|
||||
|
@ -553,19 +566,16 @@ module.exports = class MyLogger {
|
|||
newI ? JSON.stringify(newI) : null,
|
||||
modelId,
|
||||
modelValue,
|
||||
summaryId
|
||||
]);
|
||||
}
|
||||
|
||||
if (originChanged)
|
||||
await log(oldI[relation]);
|
||||
if (originChanged) await log(oldI[relation]);
|
||||
await log(originFk);
|
||||
}
|
||||
} catch (err) {
|
||||
if (err.code == 'ER_NO_REFERENCED_ROW_2') {
|
||||
this.debug('Log', `Ignored because of constraint failed.`);
|
||||
} else
|
||||
throw err;
|
||||
} else throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -576,10 +586,14 @@ module.exports = class MyLogger {
|
|||
const replaceQuery =
|
||||
'REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?';
|
||||
if (!this.conf.testMode)
|
||||
await this.db.query(replaceQuery, [this.conf.code, binlogName, binlogPosition]);
|
||||
await this.db.query(replaceQuery, [
|
||||
this.conf.code,
|
||||
binlogName,
|
||||
binlogPosition,
|
||||
]);
|
||||
|
||||
this.isFlushed = this.binlogName == binlogName
|
||||
&& this.binlogPosition == binlogPosition;
|
||||
this.isFlushed =
|
||||
this.binlogName == binlogName && this.binlogPosition == binlogPosition;
|
||||
}
|
||||
|
||||
async connectionPing() {
|
||||
|
@ -590,45 +604,40 @@ module.exports = class MyLogger {
|
|||
if (this.zongji) {
|
||||
// FIXME: Should Zongji.connection be pinged?
|
||||
await new Promise((resolve, reject) => {
|
||||
this.zongji.ctrlConnection.ping(err => {
|
||||
this.zongji.ctrlConnection.ping((err) => {
|
||||
if (err) return reject(err);
|
||||
resolve();
|
||||
});
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
await this.db.ping();
|
||||
} catch(err) {
|
||||
} catch (err) {
|
||||
this.handleError(err);
|
||||
}
|
||||
}
|
||||
|
||||
debug(namespace, message) {
|
||||
if (this.conf.debug)
|
||||
console.debug(`${namespace}:`.blue, message.yellow);
|
||||
if (this.conf.debug) console.debug(`${namespace}:`.blue, message.yellow);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const catchEvents = new Set([
|
||||
'writerows',
|
||||
'updaterows',
|
||||
'deleterows'
|
||||
]);
|
||||
const catchEvents = new Set(['writerows', 'updaterows', 'deleterows']);
|
||||
|
||||
const actions = {
|
||||
writerows: 'insert',
|
||||
updaterows: 'update',
|
||||
deleterows: 'delete'
|
||||
deleterows: 'delete',
|
||||
};
|
||||
|
||||
const actionColor = {
|
||||
insert: 'green',
|
||||
update: 'yellow',
|
||||
delete: 'red'
|
||||
delete: 'red',
|
||||
};
|
||||
|
||||
const castFn = {
|
||||
boolean: function(value) {
|
||||
boolean: function (value) {
|
||||
return !!value;
|
||||
}
|
||||
},
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue