feat #6533 use salix class

This commit is contained in:
Javier Segarra 2024-04-24 14:16:04 +02:00
parent ac21ae5b92
commit e836dc08ec
2 changed files with 169 additions and 170 deletions

View File

@ -6,7 +6,7 @@ const MultiMap = require('./multi-map');
* Loads model configuration. * Loads model configuration.
*/ */
module.exports = class ModelLoader { module.exports = class ModelLoader {
init(logger) { init(logger, logs = []) {
const configDir = path.join(__dirname, '..'); const configDir = path.join(__dirname, '..');
const conf = loadConfig(configDir, 'logs'); const conf = loadConfig(configDir, 'logs');
const schemaMap = new MultiMap(); const schemaMap = new MultiMap();

View File

@ -1,10 +1,11 @@
require('require-yaml'); require("require-yaml");
require('colors'); require("colors");
const ZongJi = require('./zongji'); const ZongJi = require("./zongji");
const mysql = require('mysql2/promise'); const mysql = require("mysql2/promise");
const {loadConfig} = require('./lib/util'); const { loadConfig } = require("./lib/util");
const ModelLoader = require('./lib/model-loader'); const ModelLoader = require("./lib/model-loader");
const ShowDb = require('./lib/show-db'); const ShowDb = require("./lib/show-db");
const Salix = require("./lib/salix");
module.exports = class MyLogger { module.exports = class MyLogger {
constructor() { constructor() {
@ -19,9 +20,11 @@ module.exports = class MyLogger {
} }
async start() { async start() {
const conf = this.conf = loadConfig(__dirname, 'config'); const conf = (this.conf = loadConfig(__dirname, "config"));
const salix = new Salix();
this.modelLoader.init(this); await salix.init(this);
// await salix.salixLogInfo();
this.modelLoader.init(this, salix.logInfo);
this.showDb.init(this); this.showDb.init(this);
const includeSchema = {}; const includeSchema = {};
@ -30,46 +33,48 @@ module.exports = class MyLogger {
this.zongjiOpts = { this.zongjiOpts = {
includeEvents: [ includeEvents: [
'rotate', "rotate",
'tablemap', "tablemap",
'writerows', "writerows",
'updaterows', "updaterows",
'deleterows' "deleterows",
], ],
includeSchema, includeSchema,
serverId: conf.serverId serverId: conf.serverId,
}; };
if (conf.testMode) if (conf.testMode)
console.log('Test mode enabled, just logging queries to console.'); console.log("Test mode enabled, just logging queries to console.");
console.log('Starting process.'); console.log("Starting process.");
await this.init(); await this.init();
console.log('Process started.'); console.log("Process started.");
} }
async stop() { async stop() {
console.log('Stopping process.'); console.log("Stopping process.");
await this.end(); await this.end();
console.log('Process stopped.'); console.log("Process stopped.");
} }
async init() { async init() {
const {conf} = this; const { conf } = this;
this.debug('MyLogger', 'Initializing.'); this.debug("MyLogger", "Initializing.");
this.onErrorListener = err => this.onError(err); this.onErrorListener = (err) => this.onError(err);
// DB connection // DB connection
const db = this.db = await mysql.createConnection(conf.dstDb); const db = (this.db = await mysql.createConnection(conf.dstDb));
db.on('error', this.onErrorListener); db.on("error", this.onErrorListener);
await this.modelLoader.loadSchema(); await this.modelLoader.loadSchema();
await this.showDb.loadSchema(); await this.showDb.loadSchema();
for (const logInfo of this.logMap.values()) { for (const logInfo of this.logMap.values()) {
const table = logInfo.table; const table = logInfo.table;
const sqlTable = `${db.escapeId(table.schema)}.${db.escapeId(table.name)}`; const sqlTable = `${db.escapeId(table.schema)}.${db.escapeId(
table.name
)}`;
logInfo.addStmt = await db.prepare( logInfo.addStmt = await db.prepare(
`INSERT INTO ${sqlTable} `INSERT INTO ${sqlTable}
SET originFk = ?, SET originFk = ?,
@ -102,10 +107,10 @@ module.exports = class MyLogger {
// Zongji // Zongji
this.onBinlogListener = evt => this.onBinlog(evt); this.onBinlogListener = (evt) => this.onBinlog(evt);
const [res] = await db.query( const [res] = await db.query(
'SELECT `logName`, `position` FROM `util`.`binlogQueue` WHERE code = ?', "SELECT `logName`, `position` FROM `util`.`binlogQueue` WHERE code = ?",
[conf.code] [conf.code]
); );
if (res.length) { if (res.length) {
@ -117,23 +122,27 @@ module.exports = class MyLogger {
await this.zongjiStart(); await this.zongjiStart();
this.flushInterval = setInterval( this.flushInterval = setInterval(
() => this.flushQueue(), conf.flushInterval * 1000); () => this.flushQueue(),
conf.flushInterval * 1000
);
this.pingInterval = setInterval( this.pingInterval = setInterval(
() => this.connectionPing(), conf.pingInterval * 1000); () => this.connectionPing(),
conf.pingInterval * 1000
);
// Summary // Summary
this.running = true; this.running = true;
this.isOk = true; this.isOk = true;
this.debug('MyLogger', 'Initialized.'); this.debug("MyLogger", "Initialized.");
} }
async end(silent) { async end(silent) {
if (!this.running) return; if (!this.running) return;
this.running = false; this.running = false;
this.debug('MyLogger', 'Ending.'); this.debug("MyLogger", "Ending.");
this.db.off('error', this.onErrorListener); this.db.off("error", this.onErrorListener);
clearInterval(this.flushInterval); clearInterval(this.flushInterval);
clearInterval(this.pingInterval); clearInterval(this.pingInterval);
@ -157,7 +166,7 @@ module.exports = class MyLogger {
// DB connection // DB connection
// FIXME: mysql2/promise bug, db.end() ends process // FIXME: mysql2/promise bug, db.end() ends process
this.db.on('error', () => {}); this.db.on("error", () => {});
try { try {
this.db.end(); this.db.end();
} catch (err) { } catch (err) {
@ -166,7 +175,7 @@ module.exports = class MyLogger {
// Summary // Summary
this.debug('MyLogger', 'Ended.'); this.debug("MyLogger", "Ended.");
} }
async zongjiStart() { async zongjiStart() {
@ -175,75 +184,80 @@ module.exports = class MyLogger {
const zongjiOpts = this.zongjiOpts; const zongjiOpts = this.zongjiOpts;
if (this.binlogName) { if (this.binlogName) {
this.debug('Zongji', this.debug(
"Zongji",
`Starting: ${this.binlogName}, position: ${this.binlogPosition}` `Starting: ${this.binlogName}, position: ${this.binlogPosition}`
); );
Object.assign(zongjiOpts, { Object.assign(zongjiOpts, {
filename: this.binlogName, filename: this.binlogName,
position: this.binlogPosition position: this.binlogPosition,
}); });
} else { } else {
this.debug('Zongji', 'Starting at end.'); this.debug("Zongji", "Starting at end.");
zongjiOpts.startAtEnd = true; zongjiOpts.startAtEnd = true;
} }
zongji.on('binlog', this.onBinlogListener); zongji.on("binlog", this.onBinlogListener);
await new Promise((resolve, reject) => { await new Promise((resolve, reject) => {
const onReady = () => { const onReady = () => {
zongji.off('error', onError); zongji.off("error", onError);
resolve(); resolve();
}; };
const onError = err => { const onError = (err) => {
zongji.off('ready', onReady); zongji.off("ready", onReady);
zongji.off('binlog', this.onBinlogListener); zongji.off("binlog", this.onBinlogListener);
reject(err); reject(err);
} };
zongji.once('ready', onReady); zongji.once("ready", onReady);
zongji.once('error', onError); zongji.once("error", onError);
zongji.start(zongjiOpts); zongji.start(zongjiOpts);
}); });
zongji.on('error', this.onErrorListener); zongji.on("error", this.onErrorListener);
this.zongji = zongji; this.zongji = zongji;
this.debug('Zongji', 'Started.'); this.debug("Zongji", "Started.");
} }
async zongjiStop() { async zongjiStop() {
if (!this.zongji) return; if (!this.zongji) return;
this.debug('Zongji', this.debug(
"Zongji",
`Stopping: ${this.binlogName}, position: ${this.binlogPosition}` `Stopping: ${this.binlogName}, position: ${this.binlogPosition}`
); );
const zongji = this.zongji; const zongji = this.zongji;
this.zongji = null; this.zongji = null;
zongji.off('binlog', this.onBinlogListener); zongji.off("binlog", this.onBinlogListener);
zongji.off('error', this.onErrorListener); zongji.off("error", this.onErrorListener);
// FIXME: Cannot call Zongji.stop(), it doesn't wait to end connection // FIXME: Cannot call Zongji.stop(), it doesn't wait to end connection
zongji.connection.destroy(() => { zongji.connection.destroy(() => {
console.log('zongji.connection.destroy'); console.log("zongji.connection.destroy");
}); });
await new Promise(resolve => { await new Promise((resolve) => {
zongji.ctrlConnection.query('KILL ?', [zongji.connection.threadId], zongji.ctrlConnection.query(
err => { "KILL ?",
// if (err && err.code !== 'ER_NO_SUCH_THREAD'); [zongji.connection.threadId],
// console.error(err); (err) => {
resolve(); // if (err && err.code !== 'ER_NO_SUCH_THREAD');
}); // console.error(err);
resolve();
}
);
}); });
zongji.ctrlConnection.destroy(() => { zongji.ctrlConnection.destroy(() => {
console.log('zongji.ctrlConnection.destroy'); console.log("zongji.ctrlConnection.destroy");
}); });
zongji.emit('stopped'); zongji.emit("stopped");
this.debug('Zongji', 'Stopped.'); this.debug("Zongji", "Stopped.");
} }
async tryRestart() { async tryRestart() {
try { try {
await this.init(); await this.init();
console.log('Process restarted.'); console.log("Process restarted.");
} catch(err) { } catch (err) {
setTimeout(() => this.tryRestart(), this.conf.restartTimeout * 1000); setTimeout(() => this.tryRestart(), this.conf.restartTimeout * 1000);
} }
} }
@ -255,16 +269,16 @@ module.exports = class MyLogger {
try { try {
await this.end(true); await this.end(true);
} catch(e) {} } catch (e) {}
switch (err.code) { switch (err.code) {
case 'PROTOCOL_CONNECTION_LOST': case "PROTOCOL_CONNECTION_LOST":
case 'ECONNRESET': case "ECONNRESET":
console.log('Trying to restart process.'); console.log("Trying to restart process.");
await this.tryRestart(); await this.tryRestart();
break; break;
default: default:
process.exit(); process.exit();
} }
} }
@ -278,7 +292,7 @@ module.exports = class MyLogger {
let shouldFlush; let shouldFlush;
const eventName = evt.getEventName(); const eventName = evt.getEventName();
if (eventName == 'rotate') { if (eventName == "rotate") {
if (evt.binlogName !== this.binlogName) { if (evt.binlogName !== this.binlogName) {
shouldFlush = true; shouldFlush = true;
this.binlogName = evt.binlogName; this.binlogName = evt.binlogName;
@ -291,17 +305,16 @@ module.exports = class MyLogger {
} else { } else {
shouldFlush = true; shouldFlush = true;
this.binlogPosition = evt.nextPosition; this.binlogPosition = evt.nextPosition;
if (catchEvents.has(eventName)) if (catchEvents.has(eventName)) this.onRowEvent(evt, eventName);
this.onRowEvent(evt, eventName);
} }
if (shouldFlush) this.isFlushed = false; if (shouldFlush) this.isFlushed = false;
if (this.queue.length > this.conf.maxQueueEvents) { if (this.queue.length > this.conf.maxQueueEvents) {
this.debug('MyLogger', 'Queue full, stopping Zongji.'); this.debug("MyLogger", "Queue full, stopping Zongji.");
await this.zongjiStop(); await this.zongjiStop();
} }
} catch(err) { } catch (err) {
this.handleError(err); this.handleError(err);
} }
} }
@ -313,7 +326,7 @@ module.exports = class MyLogger {
if (!tableInfo) return; if (!tableInfo) return;
const action = actions[eventName]; const action = actions[eventName];
const {rowExcludeField} = tableInfo; const { rowExcludeField } = tableInfo;
const changes = []; const changes = [];
function isExcluded(row) { function isExcluded(row) {
@ -321,20 +334,17 @@ module.exports = class MyLogger {
} }
function cast(value, type) { function cast(value, type) {
if (value == null || !type) if (value == null || !type) return value;
return value;
const fn = castFn[type]; const fn = castFn[type];
return fn ? fn(value) : value; return fn ? fn(value) : value;
} }
function equals(a, b) { function equals(a, b) {
if (a === b) if (a === b) return true;
return true;
const type = typeof a; const type = typeof a;
if (a == null || b == null || type !== typeof b) if (a == null || b == null || type !== typeof b) return false;
return false; if (type === "object" && a.constructor === b.constructor) {
if (type === 'object' && a.constructor === b.constructor) {
if (a instanceof Date) { if (a instanceof Date) {
// FIXME: zongji creates invalid dates for NULL DATE // FIXME: zongji creates invalid dates for NULL DATE
// Error is somewhere here: zongji/lib/rows_event.js:129 // Error is somewhere here: zongji/lib/rows_event.js:129
@ -349,9 +359,9 @@ module.exports = class MyLogger {
return false; return false;
} }
const {castTypes} = tableInfo; const { castTypes } = tableInfo;
if (action == 'update') { if (action == "update") {
const cols = tableInfo.columns; const cols = tableInfo.columns;
for (const row of evt.rows) { for (const row of evt.rows) {
@ -375,8 +385,7 @@ module.exports = class MyLogger {
} }
} }
if (nColsChanged) if (nColsChanged) changes.push({ row: after, oldI, newI });
changes.push({row: after, oldI, newI});
} }
} else { } else {
const cols = tableInfo.instanceColumns; const cols = tableInfo.instanceColumns;
@ -391,7 +400,7 @@ module.exports = class MyLogger {
instance[col] = cast(row[col], type); instance[col] = cast(row[col], type);
} }
changes.push({row, instance}); changes.push({ row, instance });
} }
} }
@ -402,7 +411,7 @@ module.exports = class MyLogger {
action, action,
evt, evt,
changes, changes,
binlogName: this.binlogName binlogName: this.binlogName,
}); });
if (!this.flushTimeout) if (!this.flushTimeout)
@ -412,7 +421,8 @@ module.exports = class MyLogger {
); );
if (this.conf.debug) if (this.conf.debug)
console.debug('Evt:'.blue, console.debug(
"Evt:".blue,
`[${action}]`[actionColor[action]], `[${action}]`[actionColor[action]],
`${tableName}: ${changes.length} changes, queue: ${this.queue.length} elements` `${tableName}: ${changes.length} changes, queue: ${this.queue.length} elements`
); );
@ -421,7 +431,7 @@ module.exports = class MyLogger {
async flushQueue() { async flushQueue() {
if (this.isFlushed || this.isFlushing || !this.isOk) return; if (this.isFlushed || this.isFlushing || !this.isOk) return;
this.isFlushing = true; this.isFlushing = true;
const {conf, db, queue} = this; const { conf, db, queue } = this;
let op; let op;
try { try {
@ -436,34 +446,36 @@ module.exports = class MyLogger {
await this.showDb.getValues(db, ops); await this.showDb.getValues(db, ops);
await db.query('START TRANSACTION'); await db.query("START TRANSACTION");
txStarted = true; txStarted = true;
for (op of ops) for (op of ops) await this.applyOp(op);
await this.applyOp(op);
this.debug('Queue', `applied: ${ops.length}, remaining: ${queue.length}`); this.debug(
"Queue",
`applied: ${ops.length}, remaining: ${queue.length}`
);
await this.savePosition(op.binlogName, op.evt.nextPosition); await this.savePosition(op.binlogName, op.evt.nextPosition);
await db.query('COMMIT'); await db.query("COMMIT");
} catch(err) { } catch (err) {
queue.unshift(...ops); queue.unshift(...ops);
if (txStarted) if (txStarted)
try { try {
await db.query('ROLLBACK'); await db.query("ROLLBACK");
} catch (err) {} } catch (err) {}
throw err; throw err;
} }
} while (queue.length); } while (queue.length);
if (!this.zongji) { if (!this.zongji) {
this.debug('MyLogger', 'Queue flushed, restarting Zongji.'); this.debug("MyLogger", "Queue flushed, restarting Zongji.");
await this.zongjiStart(); await this.zongjiStart();
} }
} else { } else {
await this.savePosition(this.binlogName, this.binlogPosition); await this.savePosition(this.binlogName, this.binlogPosition);
} }
} catch(err) { } catch (err) {
this.handleError(err); this.handleError(err);
} finally { } finally {
this.flushTimeout = null; this.flushTimeout = null;
@ -472,43 +484,31 @@ module.exports = class MyLogger {
} }
async applyOp(op) { async applyOp(op) {
const {conf} = this; const { conf } = this;
const { const { tableInfo, action, evt, changes } = op;
tableInfo, const { logInfo, isMain, relation, modelName, logFields } = tableInfo;
action,
evt,
changes
} = op;
const {
logInfo,
isMain,
relation,
modelName,
logFields
} = tableInfo;
const isDelete = action == 'delete'; const isDelete = action == "delete";
const isUpdate = action == 'update'; const isUpdate = action == "update";
const created = new Date(evt.timestamp); const created = new Date(evt.timestamp);
for (const change of changes) { for (const change of changes) {
let newI, oldI; let newI, oldI;
const row = change.row; const row = change.row;
switch(action) { switch (action) {
case 'update': case "update":
newI = change.newI; newI = change.newI;
oldI = change.oldI; oldI = change.oldI;
if (logFields) { if (logFields) {
for (const field of logFields) for (const field of logFields)
if (newI[field] === undefined) if (newI[field] === undefined) newI[field] = row[field];
newI[field] = row[field];
} }
break; break;
case 'insert': case "insert":
newI = change.instance; newI = change.instance;
break; break;
case 'delete': case "delete":
oldI = change.instance; oldI = change.instance;
break; break;
} }
@ -517,12 +517,12 @@ module.exports = class MyLogger {
const modelValue = change.modelValue ?? null; const modelValue = change.modelValue ?? null;
const oldInstance = oldI ? JSON.stringify(oldI) : null; const oldInstance = oldI ? JSON.stringify(oldI) : null;
const originFk = !isMain ? row[relation] : modelId; const originFk = !isMain ? row[relation] : modelId;
const originChanged = isUpdate && !isMain const originChanged = isUpdate && !isMain && newI[relation] !== undefined;
&& newI[relation] !== undefined;
let deleteRow; let deleteRow;
if (conf.debug) if (conf.debug)
console.debug('Log:'.blue, console.debug(
"Log:".blue,
`[${action}]`[actionColor[action]], `[${action}]`[actionColor[action]],
`${logInfo.name}: ${originFk}, ${modelName}: ${modelId}` `${logInfo.name}: ${originFk}, ${modelName}: ${modelId}`
); );
@ -530,7 +530,9 @@ module.exports = class MyLogger {
try { try {
if (isDelete) { if (isDelete) {
[[deleteRow]] = await logInfo.fetchStmt.execute([ [[deleteRow]] = await logInfo.fetchStmt.execute([
modelName, modelId, originFk modelName,
modelId,
originFk,
]); ]);
if (!conf.testMode && deleteRow) if (!conf.testMode && deleteRow)
await logInfo.updateStmt.execute([ await logInfo.updateStmt.execute([
@ -538,7 +540,7 @@ module.exports = class MyLogger {
created, created,
oldInstance, oldInstance,
modelValue, modelValue,
deleteRow.id deleteRow.id,
]); ]);
} }
if (!conf.testMode && (!isDelete || !deleteRow)) { if (!conf.testMode && (!isDelete || !deleteRow)) {
@ -553,82 +555,79 @@ module.exports = class MyLogger {
oldInstance, oldInstance,
newI ? JSON.stringify(newI) : null, newI ? JSON.stringify(newI) : null,
modelId, modelId,
modelValue modelValue,
]); ]);
} }
if (originChanged) if (originChanged) await log(oldI[relation]);
await log(oldI[relation]);
await log(originFk); await log(originFk);
} }
} catch (err) { } catch (err) {
if (err.code == 'ER_NO_REFERENCED_ROW_2') { if (err.code == "ER_NO_REFERENCED_ROW_2") {
this.debug('Log', `Ignored because of constraint failed.`); this.debug("Log", `Ignored because of constraint failed.`);
} else } else throw err;
throw err;
} }
} }
} }
async savePosition(binlogName, binlogPosition) { async savePosition(binlogName, binlogPosition) {
this.debug('Flush', `filename: ${binlogName}, position: ${binlogPosition}`); this.debug("Flush", `filename: ${binlogName}, position: ${binlogPosition}`);
const replaceQuery = const replaceQuery =
'REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?'; "REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?";
if (!this.conf.testMode) if (!this.conf.testMode)
await this.db.query(replaceQuery, [this.conf.code, binlogName, binlogPosition]); await this.db.query(replaceQuery, [
this.conf.code,
binlogName,
binlogPosition,
]);
this.isFlushed = this.binlogName == binlogName this.isFlushed =
&& this.binlogPosition == binlogPosition; this.binlogName == binlogName && this.binlogPosition == binlogPosition;
} }
async connectionPing() { async connectionPing() {
if (!this.isOk) return; if (!this.isOk) return;
try { try {
this.debug('Ping', 'Sending ping to database.'); this.debug("Ping", "Sending ping to database.");
if (this.zongji) { if (this.zongji) {
// FIXME: Should Zongji.connection be pinged? // FIXME: Should Zongji.connection be pinged?
await new Promise((resolve, reject) => { await new Promise((resolve, reject) => {
this.zongji.ctrlConnection.ping(err => { this.zongji.ctrlConnection.ping((err) => {
if (err) return reject(err); if (err) return reject(err);
resolve(); resolve();
}); });
}) });
} }
await this.db.ping(); await this.db.ping();
} catch(err) { } catch (err) {
this.handleError(err); this.handleError(err);
} }
} }
debug(namespace, message) { debug(namespace, message) {
if (this.conf.debug) if (this.conf.debug) console.debug(`${namespace}:`.blue, message.yellow);
console.debug(`${namespace}:`.blue, message.yellow);
} }
} };
const catchEvents = new Set([ const catchEvents = new Set(["writerows", "updaterows", "deleterows"]);
'writerows',
'updaterows',
'deleterows'
]);
const actions = { const actions = {
writerows: 'insert', writerows: "insert",
updaterows: 'update', updaterows: "update",
deleterows: 'delete' deleterows: "delete",
}; };
const actionColor = { const actionColor = {
insert: 'green', insert: "green",
update: 'yellow', update: "yellow",
delete: 'red' delete: "red",
}; };
const castFn = { const castFn = {
boolean: function(value) { boolean: function (value) {
return !!value; return !!value;
} },
}; };