refs #5563 maxQueueLengh, debug improved, test mode fixes
gitea/mylogger/pipeline/head This commit looks good Details

This commit is contained in:
Juan Ferrer 2023-05-21 14:16:01 +02:00
parent 16391f146d
commit a849049d00
4 changed files with 110 additions and 69 deletions

View File

@ -6,6 +6,7 @@ flushInterval: 30
restartTimeout: 30
queueFlushDelay: 200
maxBulkLog: 25
maxQueueEvents: 10000
upperCaseTable: true
serverId: 1
srcDb:

View File

@ -86,6 +86,7 @@ module.exports = class MyLogger {
const logConf = conf.logs[logName];
const schema = logConf.schema || conf.srcDb.database;
const logInfo = {
name: logName,
conf: logConf,
schema,
table: parseTable(logConf.logTable, schema),
@ -112,7 +113,7 @@ module.exports = class MyLogger {
for (const [schemaName, tableMap] of this.schemaMap)
includeSchema[schemaName] = Array.from(tableMap.keys());
this.opts = {
this.zongjiOpts = {
includeEvents: [
'rotate',
'tablemap',
@ -306,14 +307,9 @@ module.exports = class MyLogger {
tableInfo.relations.set(col, {schema, table, column});
}
// Zongji
const zongji = new ZongJi(conf.srcDb);
this.zongji = zongji;
this.onBinlogListener = evt => this.onBinlog(evt);
zongji.on('binlog', this.onBinlogListener);
const [res] = await db.query(
'SELECT `logName`, `position` FROM `util`.`binlogQueue` WHERE code = ?',
@ -323,33 +319,9 @@ module.exports = class MyLogger {
const [row] = res;
this.binlogName = row.logName;
this.binlogPosition = row.position;
Object.assign(this.opts, {
filename: row.logName,
position: row.position
});
} else
this.opts.startAtEnd = true;
}
this.debug('Zongji', 'Starting.');
await new Promise((resolve, reject) => {
const onReady = () => {
zongji.off('error', onError);
resolve();
};
const onError = err => {
this.zongji = null;
zongji.off('ready', onReady);
zongji.off('binlog', this.onBinlogListener);
reject(err);
}
zongji.once('ready', onReady);
zongji.once('error', onError);
zongji.start(this.opts);
});
this.debug('Zongji', 'Started.');
this.zongji.on('error', this.onErrorListener);
await this.zongjiStart();
this.flushInterval = setInterval(
() => this.flushQueue(), conf.flushInterval * 1000);
@ -386,12 +358,75 @@ module.exports = class MyLogger {
// Zongji
const zongji = this.zongji;
zongji.off('binlog', this.onBinlogListener);
zongji.off('error', this.onErrorListener);
await this.zongjiStop();
this.zongji = null;
this.debug('Zongji', 'Stopping.');
// DB connection
// FIXME: mysql2/promise bug, db.end() ends process
this.db.on('error', () => {});
try {
this.db.end();
} catch (err) {
logError(err);
}
// Summary
this.debug('MyLogger', 'Ended.');
}
async zongjiStart() {
await this.zongjiStop();
const zongji = new ZongJi(this.conf.srcDb);
const zongjiOpts = this.zongjiOpts;
if (this.binlogName) {
this.debug('Zongji',
`Starting: ${this.binlogName}, position: ${this.binlogPosition}`
);
Object.assign(zongjiOpts, {
filename: this.binlogName,
position: this.binlogPosition
});
} else {
this.debug('Zongji', 'Starting at end.');
zongjiOpts.startAtEnd = true;
}
zongji.on('binlog', this.onBinlogListener);
await new Promise((resolve, reject) => {
const onReady = () => {
zongji.off('error', onError);
resolve();
};
const onError = err => {
zongji.off('ready', onReady);
zongji.off('binlog', this.onBinlogListener);
reject(err);
}
zongji.once('ready', onReady);
zongji.once('error', onError);
zongji.start(zongjiOpts);
});
zongji.on('error', this.onErrorListener);
this.zongji = zongji;
this.debug('Zongji', 'Started.');
}
async zongjiStop() {
if (!this.zongji) return;
this.debug('Zongji',
`Stopping: ${this.binlogName}, position: ${this.binlogPosition}`
);
const zongji = this.zongji;
this.zongji = null;
zongji.off('binlog', this.onBinlogListener);
zongji.off('error', this.onErrorListener);
// FIXME: Cannot call Zongji.stop(), it doesn't wait to end connection
zongji.connection.destroy(() => {
console.log('zongji.connection.destroy');
@ -409,20 +444,6 @@ module.exports = class MyLogger {
});
zongji.emit('stopped');
this.debug('Zongji', 'Stopped.');
// DB connection
// FIXME: mysql2/promise bug, db.end() ends process
this.db.on('error', () => {});
try {
this.db.end();
} catch (err) {
logError(err);
}
// Summary
this.debug('MyLogger', 'Ended.');
}
async tryRestart() {
@ -482,6 +503,11 @@ module.exports = class MyLogger {
}
if (shouldFlush) this.isFlushed = false;
if (this.queue.length > this.conf.maxQueueEvents) {
this.debug('MyLogger', 'Queue full, stopping Zongji.');
await this.zongjiStop();
}
} catch(err) {
this.handleError(err);
}
@ -544,10 +570,6 @@ module.exports = class MyLogger {
if (!changes.length) return;
if (this.conf.debug)
console.debug('Evt:'.blue,
`[${action}]`[actionColor[action]], `${tableName}: ${changes.length} changes`);
this.queue.push({
tableInfo,
action,
@ -555,11 +577,18 @@ module.exports = class MyLogger {
changes,
binlogName: this.binlogName
});
if (!this.flushTimeout)
this.flushTimeout = setTimeout(
() => this.flushQueue(),
this.conf.queueFlushDelay
);
if (this.conf.debug)
console.debug('Evt:'.blue,
`[${action}]`[actionColor[action]],
`${tableName}: ${changes.length} changes, queue: ${this.queue.length} elements`
);
}
async flushQueue() {
@ -595,6 +624,11 @@ module.exports = class MyLogger {
throw err;
}
} while (queue.length);
if (!this.zongji) {
this.debug('MyLogger', 'Queue flushed, restarting Zongji.');
await this.zongjiStart();
}
} else {
await this.savePosition(this.binlogName, this.binlogPosition);
}
@ -607,6 +641,7 @@ module.exports = class MyLogger {
}
async applyOp(op) {
const {conf} = this;
const {
tableInfo,
action,
@ -637,7 +672,7 @@ module.exports = class MyLogger {
const created = new Date(evt.timestamp);
const modelName = tableInfo.modelName;
const modelId = row[tableInfo.idName];
const modelValue = tableInfo.showField
const modelValue = tableInfo.showField && !tableInfo.isMain
? row[tableInfo.showField] || null
: null;
const oldInstance = oldI ? JSON.stringify(oldI) : null;
@ -646,16 +681,18 @@ module.exports = class MyLogger {
: modelId;
let deleteRow;
if (this.conf.debug)
if (conf.debug)
console.debug('Log:'.blue,
`[${action}]`[actionColor[action]], `${modelName}: ${modelId}`);
`[${action}]`[actionColor[action]],
`${logInfo.name}: ${originFk}, ${modelName}: ${modelId}`
);
try {
if (isDelete) {
[[deleteRow]] = await logInfo.fetchStmt.execute([
modelName, modelId, originFk
]);
if (deleteRow)
if (!conf.testMode && deleteRow)
await logInfo.updateStmt.execute([
originFk,
created,
@ -664,7 +701,7 @@ module.exports = class MyLogger {
deleteRow.id
]);
}
if (!isDelete || !deleteRow) {
if (!conf.testMode && (!isDelete || !deleteRow)) {
await logInfo.addStmt.execute([
originFk,
row[tableInfo.userField] || null,
@ -703,13 +740,16 @@ module.exports = class MyLogger {
try {
this.debug('Ping', 'Sending ping to database.');
// FIXME: Should Zongji.connection be pinged?
await new Promise((resolve, reject) => {
this.zongji.ctrlConnection.ping(err => {
if (err) return reject(err);
resolve();
});
})
if (this.zongji) {
// FIXME: Should Zongji.connection be pinged?
await new Promise((resolve, reject) => {
this.zongji.ctrlConnection.ping(err => {
if (err) return reject(err);
resolve();
});
})
}
await this.db.ping();
} catch(err) {
this.handleError(err);

4
package-lock.json generated
View File

@ -1,12 +1,12 @@
{
"name": "mylogger",
"version": "0.1.15",
"version": "0.1.16",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "mylogger",
"version": "0.1.15",
"version": "0.1.16",
"license": "GPL-3.0",
"dependencies": {
"colors": "^1.4.0",

View File

@ -1,6 +1,6 @@
{
"name": "mylogger",
"version": "0.1.15",
"version": "0.1.16",
"author": "Verdnatura Levante SL",
"description": "MySQL and MariaDB logger using binary log",
"license": "GPL-3.0",