Compare commits
8 Commits
master
...
6533_myLog
Author | SHA1 | Date |
---|---|---|
Javier Segarra | b53b03b81c | |
Javier Segarra | bd48afa684 | |
Javier Segarra | 206b279ced | |
Javier Segarra | 61d01b56c5 | |
Javier Segarra | a376c888d9 | |
Javier Segarra | be89b664aa | |
Javier Segarra | e836dc08ec | |
Javier Segarra | ac21ae5b92 |
|
@ -23,7 +23,6 @@ logs:
|
||||||
- itemTag
|
- itemTag
|
||||||
- name: item
|
- name: item
|
||||||
showField: name
|
showField: name
|
||||||
showId: id
|
|
||||||
logFields:
|
logFields:
|
||||||
- size
|
- size
|
||||||
exclude:
|
exclude:
|
||||||
|
|
|
@ -6,7 +6,7 @@ const MultiMap = require('./multi-map');
|
||||||
* Loads model configuration.
|
* Loads model configuration.
|
||||||
*/
|
*/
|
||||||
module.exports = class ModelLoader {
|
module.exports = class ModelLoader {
|
||||||
init(logger) {
|
init(logger, logs = []) {
|
||||||
const configDir = path.join(__dirname, '..');
|
const configDir = path.join(__dirname, '..');
|
||||||
const conf = loadConfig(configDir, 'logs');
|
const conf = loadConfig(configDir, 'logs');
|
||||||
const schemaMap = new MultiMap();
|
const schemaMap = new MultiMap();
|
||||||
|
@ -20,6 +20,11 @@ module.exports = class ModelLoader {
|
||||||
schemaMap,
|
schemaMap,
|
||||||
logMap
|
logMap
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if(!conf.logs ) conf.logs = {}
|
||||||
|
Object.keys(conf.logs??{}).length > 0 && console.warn('‼️ - Existen modelos NO exportados')
|
||||||
|
Object.assign(conf.logs, logs)
|
||||||
|
|
||||||
|
|
||||||
for (const logName in conf.logs) {
|
for (const logName in conf.logs) {
|
||||||
const logConf = conf.logs[logName];
|
const logConf = conf.logs[logName];
|
||||||
|
|
|
@ -0,0 +1,187 @@
|
||||||
|
const fs = require("fs");
|
||||||
|
const path = require("path");
|
||||||
|
const SALIX_VERSION_HEADER = "Salix-version";
|
||||||
|
const CONFIG_FILENAME = "/config/salix.local.yml";
|
||||||
|
const i18n = {
|
||||||
|
versionChanged: "La versión ha cambiado",
|
||||||
|
deprecatedVersion: "La configuracion guardada está desactualizada",
|
||||||
|
noDeprecatedVersion: "La configuración guardada no requiere actualización",
|
||||||
|
erroFs: {
|
||||||
|
noExists: "El archivo no existe",
|
||||||
|
noWrite: "Error al escribir el archivo YAML:",
|
||||||
|
noRead: "Error al leer el archivo YAML:",
|
||||||
|
},
|
||||||
|
};
|
||||||
|
module.exports = class Salix {
|
||||||
|
constructor() {
|
||||||
|
this.conf = null;
|
||||||
|
this._logInfo = [];
|
||||||
|
this._salixVersion = null;
|
||||||
|
this._salixConfig = null;
|
||||||
|
}
|
||||||
|
emit() {}
|
||||||
|
subscribe(externalListenerFunction) {
|
||||||
|
this.emit = externalListenerFunction;
|
||||||
|
}
|
||||||
|
async init(logger) {
|
||||||
|
this.conf = logger.conf;
|
||||||
|
const salixFileConfig = await this.loadConfig();
|
||||||
|
try {
|
||||||
|
// No exite fichero almacenado. Solicito config y version
|
||||||
|
if (!salixFileConfig) await this.salixLogInfo();
|
||||||
|
else {
|
||||||
|
this._salixConfig = salixFileConfig;
|
||||||
|
//Obtengo la version
|
||||||
|
await this.getSalixVersion(false);
|
||||||
|
this.salixVersion && this.handleVersion(salixFileConfig);
|
||||||
|
this._logInfo = salixFileConfig.log;
|
||||||
|
}
|
||||||
|
} catch (error) {}
|
||||||
|
|
||||||
|
setInterval(
|
||||||
|
() => this.getSalixVersion(),
|
||||||
|
this.conf.salix.renewInterval * 1000
|
||||||
|
);
|
||||||
|
}
|
||||||
|
get filePath() {
|
||||||
|
// Leer el contenido del archivo YAML
|
||||||
|
const configDir = path.join(__dirname, "..");
|
||||||
|
return `${configDir}${CONFIG_FILENAME}`;
|
||||||
|
}
|
||||||
|
async loadConfig() {
|
||||||
|
try {
|
||||||
|
if (fs.existsSync(this.filePath)) {
|
||||||
|
const contenidoYAML = require(this.filePath);
|
||||||
|
return Object.assign({}, contenidoYAML);
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
if (error.code === "ENOENT") {
|
||||||
|
console.error(i18n.erroFs.noExists);
|
||||||
|
} else {
|
||||||
|
console.error(i18n.erroFs.noRead, error);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async saveConfig(configValue) {
|
||||||
|
const defaultConfig = {
|
||||||
|
salixVersion: this.salixVersion,
|
||||||
|
};
|
||||||
|
try {
|
||||||
|
const contenidoYAML = fs.writeFileSync(
|
||||||
|
this.filePath,
|
||||||
|
require("js-yaml").dump(
|
||||||
|
Object.assign(defaultConfig, { log: configValue })
|
||||||
|
),
|
||||||
|
"utf-8"
|
||||||
|
);
|
||||||
|
return Object.assign({}, contenidoYAML);
|
||||||
|
} catch (error) {
|
||||||
|
if (error.code === "ENOENT") {
|
||||||
|
console.error(i18n.erroFs.noExists);
|
||||||
|
} else {
|
||||||
|
console.error(i18n.erroFs.noWrite, error);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
parseVersion(value) {
|
||||||
|
return value.split(".").map(Number);
|
||||||
|
}
|
||||||
|
|
||||||
|
async handleVersion(salixFileConfig) {
|
||||||
|
const isDeprecated = this.compareVersion(
|
||||||
|
this.salixVersion,
|
||||||
|
salixFileConfig.salixVersion
|
||||||
|
);
|
||||||
|
isDeprecated && (await this.salixLogInfo());
|
||||||
|
}
|
||||||
|
compareVersion(v1, v2) {
|
||||||
|
let deprecatedConfig = false;
|
||||||
|
const version1 = this.parseVersion(v1);
|
||||||
|
const version2 = this.parseVersion(v2);
|
||||||
|
for (let i = 0; i < 3; i++) {
|
||||||
|
if (version1[i] > version2[i]) {
|
||||||
|
console.log(
|
||||||
|
// `La versión ${v1} es mayor que la versión ${v2}`
|
||||||
|
i18n.deprecatedVersion
|
||||||
|
);
|
||||||
|
deprecatedConfig = true;
|
||||||
|
break;
|
||||||
|
} else if (version1[i] < version2[i]) {
|
||||||
|
console.log(
|
||||||
|
// `La versión ${v1} es menor que la versión ${v2}`
|
||||||
|
i18n.noDeprecatedVersion
|
||||||
|
);
|
||||||
|
deprecatedConfig = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return deprecatedConfig;
|
||||||
|
}
|
||||||
|
|
||||||
|
async fetch(path) {
|
||||||
|
try {
|
||||||
|
let salixCall = await fetch(`${this.conf.salix.url}/${path}`, {
|
||||||
|
method: "GET",
|
||||||
|
});
|
||||||
|
const response = JSON.parse(await salixCall.text());
|
||||||
|
if (salixCall.status !== 200) {
|
||||||
|
console.error(response.error);
|
||||||
|
throw new Error(response.error.message);
|
||||||
|
}
|
||||||
|
const newVersion = salixCall.headers.get(SALIX_VERSION_HEADER);
|
||||||
|
if(!this.salixVersion) this.salixVersion = newVersion;
|
||||||
|
if (this.salixVersion!==newVersion) {
|
||||||
|
const isDeprecated = this.compareVersion(
|
||||||
|
newVersion,
|
||||||
|
this.salixVersion
|
||||||
|
);
|
||||||
|
if (isDeprecated) {
|
||||||
|
this.salixVersion = newVersion;
|
||||||
|
console.info(i18n.versionChanged);
|
||||||
|
await this.salixLogInfo();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return response;
|
||||||
|
} catch ({ message }) {
|
||||||
|
console.error(message);
|
||||||
|
if (!this._salixConfig) throw new Error(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async getSalixVersion() {
|
||||||
|
this.log("CHECK VERSION");
|
||||||
|
|
||||||
|
await this.fetch(this.conf.salix.api.status);
|
||||||
|
|
||||||
|
this.log("VERSION CHECKED");
|
||||||
|
}
|
||||||
|
|
||||||
|
async salixLogInfo() {
|
||||||
|
this.log("REQUEST LOGINFO");
|
||||||
|
let salixLogConfig = await this.fetch(this.conf.salix.api.logInfo);
|
||||||
|
|
||||||
|
this._logInfo = salixLogConfig;
|
||||||
|
this.saveConfig(salixLogConfig);
|
||||||
|
this.log("LOGINFO REQUESTED");
|
||||||
|
}
|
||||||
|
|
||||||
|
log(param) {
|
||||||
|
console.log(`${Salix.name} - ${param}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
get logInfo() {
|
||||||
|
return this._logInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
get salixVersion() {
|
||||||
|
return this._salixVersion;
|
||||||
|
}
|
||||||
|
set salixVersion(newVersion) {
|
||||||
|
if(this._salixVersion && (this._salixVersion!== newVersion)) this.emit(newVersion);
|
||||||
|
|
||||||
|
this._salixVersion = newVersion;
|
||||||
|
}
|
||||||
|
};
|
234
mylogger.js
234
mylogger.js
|
@ -2,9 +2,10 @@ require('require-yaml');
|
||||||
require('colors');
|
require('colors');
|
||||||
const ZongJi = require('./zongji');
|
const ZongJi = require('./zongji');
|
||||||
const mysql = require('mysql2/promise');
|
const mysql = require('mysql2/promise');
|
||||||
const {loadConfig} = require('./lib/util');
|
const { loadConfig } = require('./lib/util');
|
||||||
const ModelLoader = require('./lib/model-loader');
|
const ModelLoader = require('./lib/model-loader');
|
||||||
const ShowDb = require('./lib/show-db');
|
const ShowDb = require('./lib/show-db');
|
||||||
|
const Salix = require('./lib/salix');
|
||||||
|
|
||||||
module.exports = class MyLogger {
|
module.exports = class MyLogger {
|
||||||
constructor() {
|
constructor() {
|
||||||
|
@ -19,25 +20,30 @@ module.exports = class MyLogger {
|
||||||
}
|
}
|
||||||
|
|
||||||
async start() {
|
async start() {
|
||||||
const conf = this.conf = loadConfig(__dirname, 'config');
|
const conf = (this.conf = loadConfig(__dirname, 'config'));
|
||||||
|
const salix = new Salix();
|
||||||
this.modelLoader.init(this);
|
salix.subscribe(( ) => {
|
||||||
|
this.stop();
|
||||||
|
this.start();
|
||||||
|
});
|
||||||
|
await salix.init(this);
|
||||||
|
// await salix.salixLogInfo();
|
||||||
|
this.modelLoader.init(this, salix.logInfo);
|
||||||
this.showDb.init(this);
|
this.showDb.init(this);
|
||||||
|
|
||||||
const includeSchema = {};
|
const includeSchema = {};
|
||||||
for (const [schemaName, tableMap] of this.schemaMap.map)
|
for (const [schemaName, tableMap] of this.schemaMap.map)
|
||||||
includeSchema[schemaName] = Array.from(tableMap.keys());
|
includeSchema[schemaName] = Array.from(tableMap.keys());
|
||||||
|
|
||||||
this.zongjiOpts = {
|
this.zongjiOpts = {
|
||||||
includeEvents: [
|
includeEvents: [
|
||||||
'rotate',
|
'rotate',
|
||||||
'tablemap',
|
'tablemap',
|
||||||
'writerows',
|
'writerows',
|
||||||
'updaterows',
|
'updaterows',
|
||||||
'deleterows'
|
'deleterows',
|
||||||
],
|
],
|
||||||
includeSchema,
|
includeSchema,
|
||||||
serverId: conf.serverId
|
serverId: conf.serverId,
|
||||||
};
|
};
|
||||||
|
|
||||||
if (conf.testMode)
|
if (conf.testMode)
|
||||||
|
@ -55,18 +61,23 @@ module.exports = class MyLogger {
|
||||||
}
|
}
|
||||||
|
|
||||||
async init() {
|
async init() {
|
||||||
const {conf} = this;
|
const { conf } = this;
|
||||||
this.debug('MyLogger', 'Initializing.');
|
this.debug('MyLogger', 'Initializing.');
|
||||||
|
this.onErrorListener = (err) => this.onError(err);
|
||||||
|
|
||||||
// DB connection
|
// DB connection
|
||||||
const db = this.db = await mysql.createConnection(conf.dstDb);
|
|
||||||
|
const db = (this.db = await mysql.createConnection(conf.dstDb));
|
||||||
|
db.on('error', this.onErrorListener);
|
||||||
|
|
||||||
await this.modelLoader.loadSchema();
|
await this.modelLoader.loadSchema();
|
||||||
await this.showDb.loadSchema();
|
await this.showDb.loadSchema();
|
||||||
|
|
||||||
for (const logInfo of this.logMap.values()) {
|
for (const logInfo of this.logMap.values()) {
|
||||||
const table = logInfo.table;
|
const table = logInfo.table;
|
||||||
const sqlTable = `${db.escapeId(table.schema)}.${db.escapeId(table.name)}`;
|
const sqlTable = `${db.escapeId(table.schema)}.${db.escapeId(
|
||||||
|
table.name
|
||||||
|
)}`;
|
||||||
logInfo.addStmt = await db.prepare(
|
logInfo.addStmt = await db.prepare(
|
||||||
`INSERT INTO ${sqlTable}
|
`INSERT INTO ${sqlTable}
|
||||||
SET originFk = ?,
|
SET originFk = ?,
|
||||||
|
@ -77,8 +88,7 @@ module.exports = class MyLogger {
|
||||||
oldInstance = ?,
|
oldInstance = ?,
|
||||||
newInstance = ?,
|
newInstance = ?,
|
||||||
changedModelId = ?,
|
changedModelId = ?,
|
||||||
changedModelValue = ?,
|
changedModelValue = ?`
|
||||||
summaryId = ?`
|
|
||||||
);
|
);
|
||||||
logInfo.fetchStmt = await db.prepare(
|
logInfo.fetchStmt = await db.prepare(
|
||||||
`SELECT id FROM ${sqlTable}
|
`SELECT id FROM ${sqlTable}
|
||||||
|
@ -93,15 +103,14 @@ module.exports = class MyLogger {
|
||||||
SET originFk = ?,
|
SET originFk = ?,
|
||||||
creationDate = ?,
|
creationDate = ?,
|
||||||
oldInstance = ?,
|
oldInstance = ?,
|
||||||
changedModelValue = ?,
|
changedModelValue = ?
|
||||||
summaryId = ?
|
|
||||||
WHERE id = ?`
|
WHERE id = ?`
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Zongji
|
// Zongji
|
||||||
|
|
||||||
this.onBinlogListener = evt => this.onBinlog(evt);
|
this.onBinlogListener = (evt) => this.onBinlog(evt);
|
||||||
|
|
||||||
const [res] = await db.query(
|
const [res] = await db.query(
|
||||||
'SELECT `logName`, `position` FROM `util`.`binlogQueue` WHERE code = ?',
|
'SELECT `logName`, `position` FROM `util`.`binlogQueue` WHERE code = ?',
|
||||||
|
@ -116,9 +125,13 @@ module.exports = class MyLogger {
|
||||||
await this.zongjiStart();
|
await this.zongjiStart();
|
||||||
|
|
||||||
this.flushInterval = setInterval(
|
this.flushInterval = setInterval(
|
||||||
() => this.flushQueue(), conf.flushInterval * 1000);
|
() => this.flushQueue(),
|
||||||
|
conf.flushInterval * 1000
|
||||||
|
);
|
||||||
this.pingInterval = setInterval(
|
this.pingInterval = setInterval(
|
||||||
() => this.connectionPing(), conf.pingInterval * 1000);
|
() => this.connectionPing(),
|
||||||
|
conf.pingInterval * 1000
|
||||||
|
);
|
||||||
|
|
||||||
// Summary
|
// Summary
|
||||||
|
|
||||||
|
@ -132,6 +145,8 @@ module.exports = class MyLogger {
|
||||||
this.running = false;
|
this.running = false;
|
||||||
this.debug('MyLogger', 'Ending.');
|
this.debug('MyLogger', 'Ending.');
|
||||||
|
|
||||||
|
this.db.off('error', this.onErrorListener);
|
||||||
|
|
||||||
clearInterval(this.flushInterval);
|
clearInterval(this.flushInterval);
|
||||||
clearInterval(this.pingInterval);
|
clearInterval(this.pingInterval);
|
||||||
clearInterval(this.flushTimeout);
|
clearInterval(this.flushTimeout);
|
||||||
|
@ -172,12 +187,13 @@ module.exports = class MyLogger {
|
||||||
const zongjiOpts = this.zongjiOpts;
|
const zongjiOpts = this.zongjiOpts;
|
||||||
|
|
||||||
if (this.binlogName) {
|
if (this.binlogName) {
|
||||||
this.debug('Zongji',
|
this.debug(
|
||||||
|
'Zongji',
|
||||||
`Starting: ${this.binlogName}, position: ${this.binlogPosition}`
|
`Starting: ${this.binlogName}, position: ${this.binlogPosition}`
|
||||||
);
|
);
|
||||||
Object.assign(zongjiOpts, {
|
Object.assign(zongjiOpts, {
|
||||||
filename: this.binlogName,
|
filename: this.binlogName,
|
||||||
position: this.binlogPosition
|
position: this.binlogPosition,
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
this.debug('Zongji', 'Starting at end.');
|
this.debug('Zongji', 'Starting at end.');
|
||||||
|
@ -191,41 +207,47 @@ module.exports = class MyLogger {
|
||||||
zongji.off('error', onError);
|
zongji.off('error', onError);
|
||||||
resolve();
|
resolve();
|
||||||
};
|
};
|
||||||
const onError = err => {
|
const onError = (err) => {
|
||||||
zongji.off('ready', onReady);
|
zongji.off('ready', onReady);
|
||||||
zongji.off('binlog', this.onBinlogListener);
|
zongji.off('binlog', this.onBinlogListener);
|
||||||
reject(err);
|
reject(err);
|
||||||
}
|
};
|
||||||
|
|
||||||
zongji.once('ready', onReady);
|
zongji.once('ready', onReady);
|
||||||
zongji.once('error', onError);
|
zongji.once('error', onError);
|
||||||
zongji.start(zongjiOpts);
|
zongji.start(zongjiOpts);
|
||||||
});
|
});
|
||||||
|
zongji.on('error', this.onErrorListener);
|
||||||
this.zongji = zongji;
|
this.zongji = zongji;
|
||||||
this.debug('Zongji', 'Started.');
|
this.debug('Zongji', 'Started.');
|
||||||
}
|
}
|
||||||
|
|
||||||
async zongjiStop() {
|
async zongjiStop() {
|
||||||
if (!this.zongji) return;
|
if (!this.zongji) return;
|
||||||
this.debug('Zongji',
|
this.debug(
|
||||||
|
'Zongji',
|
||||||
`Stopping: ${this.binlogName}, position: ${this.binlogPosition}`
|
`Stopping: ${this.binlogName}, position: ${this.binlogPosition}`
|
||||||
);
|
);
|
||||||
const zongji = this.zongji;
|
const zongji = this.zongji;
|
||||||
this.zongji = null;
|
this.zongji = null;
|
||||||
|
|
||||||
zongji.off('binlog', this.onBinlogListener);
|
zongji.off('binlog', this.onBinlogListener);
|
||||||
|
zongji.off('error', this.onErrorListener);
|
||||||
|
|
||||||
// FIXME: Cannot call Zongji.stop(), it doesn't wait to end connection
|
// FIXME: Cannot call Zongji.stop(), it doesn't wait to end connection
|
||||||
zongji.connection.destroy(() => {
|
zongji.connection.destroy(() => {
|
||||||
console.log('zongji.connection.destroy');
|
console.log('zongji.connection.destroy');
|
||||||
});
|
});
|
||||||
await new Promise(resolve => {
|
await new Promise((resolve) => {
|
||||||
zongji.ctrlConnection.query('KILL ?', [zongji.connection.threadId],
|
zongji.ctrlConnection.query(
|
||||||
err => {
|
'KILL ?',
|
||||||
// if (err && err.code !== 'ER_NO_SUCH_THREAD');
|
[zongji.connection.threadId],
|
||||||
// console.error(err);
|
(err) => {
|
||||||
resolve();
|
// if (err && err.code !== 'ER_NO_SUCH_THREAD');
|
||||||
});
|
// console.error(err);
|
||||||
|
resolve();
|
||||||
|
}
|
||||||
|
);
|
||||||
});
|
});
|
||||||
zongji.ctrlConnection.destroy(() => {
|
zongji.ctrlConnection.destroy(() => {
|
||||||
console.log('zongji.ctrlConnection.destroy');
|
console.log('zongji.ctrlConnection.destroy');
|
||||||
|
@ -238,33 +260,33 @@ module.exports = class MyLogger {
|
||||||
try {
|
try {
|
||||||
await this.init();
|
await this.init();
|
||||||
console.log('Process restarted.');
|
console.log('Process restarted.');
|
||||||
} catch(err) {
|
} catch (err) {
|
||||||
setTimeout(() => this.tryRestart(), this.conf.restartTimeout * 1000);
|
setTimeout(() => this.tryRestart(), this.conf.restartTimeout * 1000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async handleError(err) {
|
async onError(err) {
|
||||||
if (!this.isOk) return;
|
if (!this.isOk) return;
|
||||||
this.isOk = false;
|
this.isOk = false;
|
||||||
console.log(`Error: ${err.code}: ${err.message}`);
|
console.log(`Error: ${err.code}: ${err.message}`);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await this.end(true);
|
await this.end(true);
|
||||||
} catch(e) {}
|
} catch (e) {}
|
||||||
|
|
||||||
// FIXME: Error of mysql2/promise
|
switch (err.code) {
|
||||||
if (err.message === `Can't add new command when connection is in closed state`)
|
case 'PROTOCOL_CONNECTION_LOST':
|
||||||
await this.tryRestart();
|
case 'ECONNRESET':
|
||||||
else
|
|
||||||
switch (err.code) {
|
|
||||||
case 'PROTOCOL_CONNECTION_LOST':
|
|
||||||
case 'ECONNRESET':
|
|
||||||
console.log('Trying to restart process.');
|
console.log('Trying to restart process.');
|
||||||
await this.tryRestart();
|
await this.tryRestart();
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
process.exit();
|
process.exit();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
handleError(err) {
|
||||||
|
console.error(err);
|
||||||
}
|
}
|
||||||
|
|
||||||
async onBinlog(evt) {
|
async onBinlog(evt) {
|
||||||
|
@ -286,8 +308,7 @@ module.exports = class MyLogger {
|
||||||
} else {
|
} else {
|
||||||
shouldFlush = true;
|
shouldFlush = true;
|
||||||
this.binlogPosition = evt.nextPosition;
|
this.binlogPosition = evt.nextPosition;
|
||||||
if (catchEvents.has(eventName))
|
if (catchEvents.has(eventName)) this.onRowEvent(evt, eventName);
|
||||||
this.onRowEvent(evt, eventName);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (shouldFlush) this.isFlushed = false;
|
if (shouldFlush) this.isFlushed = false;
|
||||||
|
@ -296,7 +317,7 @@ module.exports = class MyLogger {
|
||||||
this.debug('MyLogger', 'Queue full, stopping Zongji.');
|
this.debug('MyLogger', 'Queue full, stopping Zongji.');
|
||||||
await this.zongjiStop();
|
await this.zongjiStop();
|
||||||
}
|
}
|
||||||
} catch(err) {
|
} catch (err) {
|
||||||
this.handleError(err);
|
this.handleError(err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -308,7 +329,7 @@ module.exports = class MyLogger {
|
||||||
if (!tableInfo) return;
|
if (!tableInfo) return;
|
||||||
|
|
||||||
const action = actions[eventName];
|
const action = actions[eventName];
|
||||||
const {rowExcludeField} = tableInfo;
|
const { rowExcludeField } = tableInfo;
|
||||||
const changes = [];
|
const changes = [];
|
||||||
|
|
||||||
function isExcluded(row) {
|
function isExcluded(row) {
|
||||||
|
@ -316,19 +337,16 @@ module.exports = class MyLogger {
|
||||||
}
|
}
|
||||||
|
|
||||||
function cast(value, type) {
|
function cast(value, type) {
|
||||||
if (value == null || !type)
|
if (value == null || !type) return value;
|
||||||
return value;
|
|
||||||
|
|
||||||
const fn = castFn[type];
|
const fn = castFn[type];
|
||||||
return fn ? fn(value) : value;
|
return fn ? fn(value) : value;
|
||||||
}
|
}
|
||||||
|
|
||||||
function equals(a, b) {
|
function equals(a, b) {
|
||||||
if (a === b)
|
if (a === b) return true;
|
||||||
return true;
|
|
||||||
const type = typeof a;
|
const type = typeof a;
|
||||||
if (a == null || b == null || type !== typeof b)
|
if (a == null || b == null || type !== typeof b) return false;
|
||||||
return false;
|
|
||||||
if (type === 'object' && a.constructor === b.constructor) {
|
if (type === 'object' && a.constructor === b.constructor) {
|
||||||
if (a instanceof Date) {
|
if (a instanceof Date) {
|
||||||
// FIXME: zongji creates invalid dates for NULL DATE
|
// FIXME: zongji creates invalid dates for NULL DATE
|
||||||
|
@ -337,14 +355,14 @@ module.exports = class MyLogger {
|
||||||
if (isNaN(aTime)) aTime = null;
|
if (isNaN(aTime)) aTime = null;
|
||||||
let bTime = b.getTime();
|
let bTime = b.getTime();
|
||||||
if (isNaN(bTime)) bTime = null;
|
if (isNaN(bTime)) bTime = null;
|
||||||
|
|
||||||
return aTime === bTime;
|
return aTime === bTime;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
const {castTypes} = tableInfo;
|
const { castTypes } = tableInfo;
|
||||||
|
|
||||||
if (action == 'update') {
|
if (action == 'update') {
|
||||||
const cols = tableInfo.columns;
|
const cols = tableInfo.columns;
|
||||||
|
@ -370,8 +388,7 @@ module.exports = class MyLogger {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nColsChanged)
|
if (nColsChanged) changes.push({ row: after, oldI, newI });
|
||||||
changes.push({row: after, oldI, newI});
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
const cols = tableInfo.instanceColumns;
|
const cols = tableInfo.instanceColumns;
|
||||||
|
@ -386,7 +403,7 @@ module.exports = class MyLogger {
|
||||||
instance[col] = cast(row[col], type);
|
instance[col] = cast(row[col], type);
|
||||||
}
|
}
|
||||||
|
|
||||||
changes.push({row, instance});
|
changes.push({ row, instance });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -397,7 +414,7 @@ module.exports = class MyLogger {
|
||||||
action,
|
action,
|
||||||
evt,
|
evt,
|
||||||
changes,
|
changes,
|
||||||
binlogName: this.binlogName
|
binlogName: this.binlogName,
|
||||||
});
|
});
|
||||||
|
|
||||||
if (!this.flushTimeout)
|
if (!this.flushTimeout)
|
||||||
|
@ -407,7 +424,8 @@ module.exports = class MyLogger {
|
||||||
);
|
);
|
||||||
|
|
||||||
if (this.conf.debug)
|
if (this.conf.debug)
|
||||||
console.debug('Evt:'.blue,
|
console.debug(
|
||||||
|
'Evt:'.blue,
|
||||||
`[${action}]`[actionColor[action]],
|
`[${action}]`[actionColor[action]],
|
||||||
`${tableName}: ${changes.length} changes, queue: ${this.queue.length} elements`
|
`${tableName}: ${changes.length} changes, queue: ${this.queue.length} elements`
|
||||||
);
|
);
|
||||||
|
@ -416,7 +434,7 @@ module.exports = class MyLogger {
|
||||||
async flushQueue() {
|
async flushQueue() {
|
||||||
if (this.isFlushed || this.isFlushing || !this.isOk) return;
|
if (this.isFlushed || this.isFlushing || !this.isOk) return;
|
||||||
this.isFlushing = true;
|
this.isFlushing = true;
|
||||||
const {conf, db, queue} = this;
|
const { conf, db, queue } = this;
|
||||||
let op;
|
let op;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -433,15 +451,17 @@ module.exports = class MyLogger {
|
||||||
|
|
||||||
await db.query('START TRANSACTION');
|
await db.query('START TRANSACTION');
|
||||||
txStarted = true;
|
txStarted = true;
|
||||||
|
|
||||||
for (op of ops)
|
|
||||||
await this.applyOp(op);
|
|
||||||
|
|
||||||
this.debug('Queue', `applied: ${ops.length}, remaining: ${queue.length}`);
|
for (op of ops) await this.applyOp(op);
|
||||||
|
|
||||||
|
this.debug(
|
||||||
|
'Queue',
|
||||||
|
`applied: ${ops.length}, remaining: ${queue.length}`
|
||||||
|
);
|
||||||
await this.savePosition(op.binlogName, op.evt.nextPosition);
|
await this.savePosition(op.binlogName, op.evt.nextPosition);
|
||||||
|
|
||||||
await db.query('COMMIT');
|
await db.query('COMMIT');
|
||||||
} catch(err) {
|
} catch (err) {
|
||||||
queue.unshift(...ops);
|
queue.unshift(...ops);
|
||||||
if (txStarted)
|
if (txStarted)
|
||||||
try {
|
try {
|
||||||
|
@ -458,7 +478,7 @@ module.exports = class MyLogger {
|
||||||
} else {
|
} else {
|
||||||
await this.savePosition(this.binlogName, this.binlogPosition);
|
await this.savePosition(this.binlogName, this.binlogPosition);
|
||||||
}
|
}
|
||||||
} catch(err) {
|
} catch (err) {
|
||||||
this.handleError(err);
|
this.handleError(err);
|
||||||
} finally {
|
} finally {
|
||||||
this.flushTimeout = null;
|
this.flushTimeout = null;
|
||||||
|
@ -467,38 +487,25 @@ module.exports = class MyLogger {
|
||||||
}
|
}
|
||||||
|
|
||||||
async applyOp(op) {
|
async applyOp(op) {
|
||||||
const {conf} = this;
|
const { conf } = this;
|
||||||
const {
|
const { tableInfo, action, evt, changes } = op;
|
||||||
tableInfo,
|
const { logInfo, isMain, relation, modelName, logFields } = tableInfo;
|
||||||
action,
|
|
||||||
evt,
|
|
||||||
changes
|
|
||||||
} = op;
|
|
||||||
const {
|
|
||||||
logInfo,
|
|
||||||
isMain,
|
|
||||||
relation,
|
|
||||||
modelName,
|
|
||||||
logFields
|
|
||||||
} = tableInfo;
|
|
||||||
|
|
||||||
const isDelete = action == 'delete';
|
const isDelete = action == 'delete';
|
||||||
const isUpdate = action == 'update';
|
const isUpdate = action == 'update';
|
||||||
const created = new Date(evt.timestamp);
|
const created = new Date(evt.timestamp);
|
||||||
const showId = tableInfo.conf.showId;
|
|
||||||
|
|
||||||
for (const change of changes) {
|
for (const change of changes) {
|
||||||
let newI, oldI;
|
let newI, oldI;
|
||||||
const row = change.row;
|
const row = change.row;
|
||||||
|
|
||||||
switch(action) {
|
switch (action) {
|
||||||
case 'update':
|
case 'update':
|
||||||
newI = change.newI;
|
newI = change.newI;
|
||||||
oldI = change.oldI;
|
oldI = change.oldI;
|
||||||
if (logFields) {
|
if (logFields) {
|
||||||
for (const field of logFields)
|
for (const field of logFields)
|
||||||
if (newI[field] === undefined)
|
if (newI[field] === undefined) newI[field] = row[field];
|
||||||
newI[field] = row[field];
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case 'insert':
|
case 'insert':
|
||||||
|
@ -508,17 +515,17 @@ module.exports = class MyLogger {
|
||||||
oldI = change.instance;
|
oldI = change.instance;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
const summaryId = showId ? row[showId] : null;
|
|
||||||
const modelId = row[tableInfo.idName];
|
const modelId = row[tableInfo.idName];
|
||||||
const modelValue = change.modelValue ?? null;
|
const modelValue = change.modelValue ?? null;
|
||||||
const oldInstance = oldI ? JSON.stringify(oldI) : null;
|
const oldInstance = oldI ? JSON.stringify(oldI) : null;
|
||||||
const originFk = !isMain ? row[relation] : modelId;
|
const originFk = !isMain ? row[relation] : modelId;
|
||||||
const originChanged = isUpdate && !isMain
|
const originChanged = isUpdate && !isMain && newI[relation] !== undefined;
|
||||||
&& newI[relation] !== undefined;
|
|
||||||
|
|
||||||
let deleteRow;
|
let deleteRow;
|
||||||
if (conf.debug)
|
if (conf.debug)
|
||||||
console.debug('Log:'.blue,
|
console.debug(
|
||||||
|
'Log:'.blue,
|
||||||
`[${action}]`[actionColor[action]],
|
`[${action}]`[actionColor[action]],
|
||||||
`${logInfo.name}: ${originFk}, ${modelName}: ${modelId}`
|
`${logInfo.name}: ${originFk}, ${modelName}: ${modelId}`
|
||||||
);
|
);
|
||||||
|
@ -526,7 +533,9 @@ module.exports = class MyLogger {
|
||||||
try {
|
try {
|
||||||
if (isDelete) {
|
if (isDelete) {
|
||||||
[[deleteRow]] = await logInfo.fetchStmt.execute([
|
[[deleteRow]] = await logInfo.fetchStmt.execute([
|
||||||
modelName, modelId, originFk
|
modelName,
|
||||||
|
modelId,
|
||||||
|
originFk,
|
||||||
]);
|
]);
|
||||||
if (!conf.testMode && deleteRow)
|
if (!conf.testMode && deleteRow)
|
||||||
await logInfo.updateStmt.execute([
|
await logInfo.updateStmt.execute([
|
||||||
|
@ -534,8 +543,7 @@ module.exports = class MyLogger {
|
||||||
created,
|
created,
|
||||||
oldInstance,
|
oldInstance,
|
||||||
modelValue,
|
modelValue,
|
||||||
summaryId,
|
deleteRow.id,
|
||||||
deleteRow.id
|
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
if (!conf.testMode && (!isDelete || !deleteRow)) {
|
if (!conf.testMode && (!isDelete || !deleteRow)) {
|
||||||
|
@ -551,33 +559,34 @@ module.exports = class MyLogger {
|
||||||
newI ? JSON.stringify(newI) : null,
|
newI ? JSON.stringify(newI) : null,
|
||||||
modelId,
|
modelId,
|
||||||
modelValue,
|
modelValue,
|
||||||
summaryId
|
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (originChanged)
|
if (originChanged) await log(oldI[relation]);
|
||||||
await log(oldI[relation]);
|
|
||||||
await log(originFk);
|
await log(originFk);
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
if (err.code == 'ER_NO_REFERENCED_ROW_2') {
|
if (err.code == 'ER_NO_REFERENCED_ROW_2') {
|
||||||
this.debug('Log', `Ignored because of constraint failed.`);
|
this.debug('Log', `Ignored because of constraint failed.`);
|
||||||
} else
|
} else throw err;
|
||||||
throw err;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async savePosition(binlogName, binlogPosition) {
|
async savePosition(binlogName, binlogPosition) {
|
||||||
this.debug('Flush', `filename: ${binlogName}, position: ${binlogPosition}`);
|
this.debug('Flush', `filename: ${binlogName}, position: ${binlogPosition}`);
|
||||||
|
|
||||||
const replaceQuery =
|
const replaceQuery =
|
||||||
'REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?';
|
'REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?';
|
||||||
if (!this.conf.testMode)
|
if (!this.conf.testMode)
|
||||||
await this.db.query(replaceQuery, [this.conf.code, binlogName, binlogPosition]);
|
await this.db.query(replaceQuery, [
|
||||||
|
this.conf.code,
|
||||||
|
binlogName,
|
||||||
|
binlogPosition,
|
||||||
|
]);
|
||||||
|
|
||||||
this.isFlushed = this.binlogName == binlogName
|
this.isFlushed =
|
||||||
&& this.binlogPosition == binlogPosition;
|
this.binlogName == binlogName && this.binlogPosition == binlogPosition;
|
||||||
}
|
}
|
||||||
|
|
||||||
async connectionPing() {
|
async connectionPing() {
|
||||||
|
@ -588,45 +597,40 @@ module.exports = class MyLogger {
|
||||||
if (this.zongji) {
|
if (this.zongji) {
|
||||||
// FIXME: Should Zongji.connection be pinged?
|
// FIXME: Should Zongji.connection be pinged?
|
||||||
await new Promise((resolve, reject) => {
|
await new Promise((resolve, reject) => {
|
||||||
this.zongji.ctrlConnection.ping(err => {
|
this.zongji.ctrlConnection.ping((err) => {
|
||||||
if (err) return reject(err);
|
if (err) return reject(err);
|
||||||
resolve();
|
resolve();
|
||||||
});
|
});
|
||||||
})
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
await this.db.ping();
|
await this.db.ping();
|
||||||
} catch(err) {
|
} catch (err) {
|
||||||
this.handleError(err);
|
this.handleError(err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
debug(namespace, message) {
|
debug(namespace, message) {
|
||||||
if (this.conf.debug)
|
if (this.conf.debug) console.debug(`${namespace}:`.blue, message.yellow);
|
||||||
console.debug(`${namespace}:`.blue, message.yellow);
|
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
|
|
||||||
const catchEvents = new Set([
|
const catchEvents = new Set(['writerows', 'updaterows', 'deleterows']);
|
||||||
'writerows',
|
|
||||||
'updaterows',
|
|
||||||
'deleterows'
|
|
||||||
]);
|
|
||||||
|
|
||||||
const actions = {
|
const actions = {
|
||||||
writerows: 'insert',
|
writerows: 'insert',
|
||||||
updaterows: 'update',
|
updaterows: 'update',
|
||||||
deleterows: 'delete'
|
deleterows: 'delete',
|
||||||
};
|
};
|
||||||
|
|
||||||
const actionColor = {
|
const actionColor = {
|
||||||
insert: 'green',
|
insert: 'green',
|
||||||
update: 'yellow',
|
update: 'yellow',
|
||||||
delete: 'red'
|
delete: 'red',
|
||||||
};
|
};
|
||||||
|
|
||||||
const castFn = {
|
const castFn = {
|
||||||
boolean: function(value) {
|
boolean: function (value) {
|
||||||
return !!value;
|
return !!value;
|
||||||
}
|
},
|
||||||
};
|
};
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{
|
{
|
||||||
"name": "mylogger",
|
"name": "mylogger",
|
||||||
"version": "1.1.5",
|
"version": "1.1.4",
|
||||||
"author": "Verdnatura Levante SL",
|
"author": "Verdnatura Levante SL",
|
||||||
"description": "MySQL and MariaDB logger using binary log",
|
"description": "MySQL and MariaDB logger using binary log",
|
||||||
"license": "GPL-3.0",
|
"license": "GPL-3.0",
|
||||||
|
|
Loading…
Reference in New Issue