From a849049d009a9144e06fbea761c54f8a3ff2523a Mon Sep 17 00:00:00 2001 From: Juan Ferrer Toribio Date: Sun, 21 May 2023 14:16:01 +0200 Subject: [PATCH] refs #5563 maxQueueLengh, debug improved, test mode fixes --- config.yml | 1 + mylogger.js | 172 ++++++++++++++++++++++++++++------------------ package-lock.json | 4 +- package.json | 2 +- 4 files changed, 110 insertions(+), 69 deletions(-) diff --git a/config.yml b/config.yml index 8dcea0a..3f312e7 100644 --- a/config.yml +++ b/config.yml @@ -6,6 +6,7 @@ flushInterval: 30 restartTimeout: 30 queueFlushDelay: 200 maxBulkLog: 25 +maxQueueEvents: 10000 upperCaseTable: true serverId: 1 srcDb: diff --git a/mylogger.js b/mylogger.js index 1129f57..cdc05e8 100644 --- a/mylogger.js +++ b/mylogger.js @@ -86,6 +86,7 @@ module.exports = class MyLogger { const logConf = conf.logs[logName]; const schema = logConf.schema || conf.srcDb.database; const logInfo = { + name: logName, conf: logConf, schema, table: parseTable(logConf.logTable, schema), @@ -112,7 +113,7 @@ module.exports = class MyLogger { for (const [schemaName, tableMap] of this.schemaMap) includeSchema[schemaName] = Array.from(tableMap.keys()); - this.opts = { + this.zongjiOpts = { includeEvents: [ 'rotate', 'tablemap', @@ -306,14 +307,9 @@ module.exports = class MyLogger { tableInfo.relations.set(col, {schema, table, column}); } - // Zongji - const zongji = new ZongJi(conf.srcDb); - this.zongji = zongji; - this.onBinlogListener = evt => this.onBinlog(evt); - zongji.on('binlog', this.onBinlogListener); const [res] = await db.query( 'SELECT `logName`, `position` FROM `util`.`binlogQueue` WHERE code = ?', @@ -323,33 +319,9 @@ module.exports = class MyLogger { const [row] = res; this.binlogName = row.logName; this.binlogPosition = row.position; - Object.assign(this.opts, { - filename: row.logName, - position: row.position - }); - } else - this.opts.startAtEnd = true; + } - this.debug('Zongji', 'Starting.'); - await new Promise((resolve, reject) => { - const onReady = () => { - zongji.off('error', onError); - resolve(); - }; - const onError = err => { - this.zongji = null; - zongji.off('ready', onReady); - zongji.off('binlog', this.onBinlogListener); - reject(err); - } - - zongji.once('ready', onReady); - zongji.once('error', onError); - zongji.start(this.opts); - }); - this.debug('Zongji', 'Started.'); - - this.zongji.on('error', this.onErrorListener); + await this.zongjiStart(); this.flushInterval = setInterval( () => this.flushQueue(), conf.flushInterval * 1000); @@ -386,12 +358,75 @@ module.exports = class MyLogger { // Zongji - const zongji = this.zongji; - zongji.off('binlog', this.onBinlogListener); - zongji.off('error', this.onErrorListener); + await this.zongjiStop(); this.zongji = null; - this.debug('Zongji', 'Stopping.'); + // DB connection + + // FIXME: mysql2/promise bug, db.end() ends process + this.db.on('error', () => {}); + try { + this.db.end(); + } catch (err) { + logError(err); + } + + // Summary + + this.debug('MyLogger', 'Ended.'); + } + + async zongjiStart() { + await this.zongjiStop(); + const zongji = new ZongJi(this.conf.srcDb); + const zongjiOpts = this.zongjiOpts; + + if (this.binlogName) { + this.debug('Zongji', + `Starting: ${this.binlogName}, position: ${this.binlogPosition}` + ); + Object.assign(zongjiOpts, { + filename: this.binlogName, + position: this.binlogPosition + }); + } else { + this.debug('Zongji', 'Starting at end.'); + zongjiOpts.startAtEnd = true; + } + + zongji.on('binlog', this.onBinlogListener); + + await new Promise((resolve, reject) => { + const onReady = () => { + zongji.off('error', onError); + resolve(); + }; + const onError = err => { + zongji.off('ready', onReady); + zongji.off('binlog', this.onBinlogListener); + reject(err); + } + + zongji.once('ready', onReady); + zongji.once('error', onError); + zongji.start(zongjiOpts); + }); + zongji.on('error', this.onErrorListener); + this.zongji = zongji; + this.debug('Zongji', 'Started.'); + } + + async zongjiStop() { + if (!this.zongji) return; + this.debug('Zongji', + `Stopping: ${this.binlogName}, position: ${this.binlogPosition}` + ); + const zongji = this.zongji; + this.zongji = null; + + zongji.off('binlog', this.onBinlogListener); + zongji.off('error', this.onErrorListener); + // FIXME: Cannot call Zongji.stop(), it doesn't wait to end connection zongji.connection.destroy(() => { console.log('zongji.connection.destroy'); @@ -409,20 +444,6 @@ module.exports = class MyLogger { }); zongji.emit('stopped'); this.debug('Zongji', 'Stopped.'); - - // DB connection - - // FIXME: mysql2/promise bug, db.end() ends process - this.db.on('error', () => {}); - try { - this.db.end(); - } catch (err) { - logError(err); - } - - // Summary - - this.debug('MyLogger', 'Ended.'); } async tryRestart() { @@ -482,6 +503,11 @@ module.exports = class MyLogger { } if (shouldFlush) this.isFlushed = false; + + if (this.queue.length > this.conf.maxQueueEvents) { + this.debug('MyLogger', 'Queue full, stopping Zongji.'); + await this.zongjiStop(); + } } catch(err) { this.handleError(err); } @@ -544,10 +570,6 @@ module.exports = class MyLogger { if (!changes.length) return; - if (this.conf.debug) - console.debug('Evt:'.blue, - `[${action}]`[actionColor[action]], `${tableName}: ${changes.length} changes`); - this.queue.push({ tableInfo, action, @@ -555,11 +577,18 @@ module.exports = class MyLogger { changes, binlogName: this.binlogName }); + if (!this.flushTimeout) this.flushTimeout = setTimeout( () => this.flushQueue(), this.conf.queueFlushDelay ); + + if (this.conf.debug) + console.debug('Evt:'.blue, + `[${action}]`[actionColor[action]], + `${tableName}: ${changes.length} changes, queue: ${this.queue.length} elements` + ); } async flushQueue() { @@ -595,6 +624,11 @@ module.exports = class MyLogger { throw err; } } while (queue.length); + + if (!this.zongji) { + this.debug('MyLogger', 'Queue flushed, restarting Zongji.'); + await this.zongjiStart(); + } } else { await this.savePosition(this.binlogName, this.binlogPosition); } @@ -607,6 +641,7 @@ module.exports = class MyLogger { } async applyOp(op) { + const {conf} = this; const { tableInfo, action, @@ -637,7 +672,7 @@ module.exports = class MyLogger { const created = new Date(evt.timestamp); const modelName = tableInfo.modelName; const modelId = row[tableInfo.idName]; - const modelValue = tableInfo.showField + const modelValue = tableInfo.showField && !tableInfo.isMain ? row[tableInfo.showField] || null : null; const oldInstance = oldI ? JSON.stringify(oldI) : null; @@ -646,16 +681,18 @@ module.exports = class MyLogger { : modelId; let deleteRow; - if (this.conf.debug) + if (conf.debug) console.debug('Log:'.blue, - `[${action}]`[actionColor[action]], `${modelName}: ${modelId}`); + `[${action}]`[actionColor[action]], + `${logInfo.name}: ${originFk}, ${modelName}: ${modelId}` + ); try { if (isDelete) { [[deleteRow]] = await logInfo.fetchStmt.execute([ modelName, modelId, originFk ]); - if (deleteRow) + if (!conf.testMode && deleteRow) await logInfo.updateStmt.execute([ originFk, created, @@ -664,7 +701,7 @@ module.exports = class MyLogger { deleteRow.id ]); } - if (!isDelete || !deleteRow) { + if (!conf.testMode && (!isDelete || !deleteRow)) { await logInfo.addStmt.execute([ originFk, row[tableInfo.userField] || null, @@ -703,13 +740,16 @@ module.exports = class MyLogger { try { this.debug('Ping', 'Sending ping to database.'); - // FIXME: Should Zongji.connection be pinged? - await new Promise((resolve, reject) => { - this.zongji.ctrlConnection.ping(err => { - if (err) return reject(err); - resolve(); - }); - }) + if (this.zongji) { + // FIXME: Should Zongji.connection be pinged? + await new Promise((resolve, reject) => { + this.zongji.ctrlConnection.ping(err => { + if (err) return reject(err); + resolve(); + }); + }) + } + await this.db.ping(); } catch(err) { this.handleError(err); diff --git a/package-lock.json b/package-lock.json index cda9814..6cd3455 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "mylogger", - "version": "0.1.15", + "version": "0.1.16", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "mylogger", - "version": "0.1.15", + "version": "0.1.16", "license": "GPL-3.0", "dependencies": { "colors": "^1.4.0", diff --git a/package.json b/package.json index e7ae23b..70becff 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "mylogger", - "version": "0.1.15", + "version": "0.1.16", "author": "Verdnatura Levante SL", "description": "MySQL and MariaDB logger using binary log", "license": "GPL-3.0",