diff --git a/Jenkinsfile b/Jenkinsfile index a0f43d3..9c7dc46 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -6,7 +6,7 @@ pipeline { disableConcurrentBuilds() } environment { - PROJECT_NAME = 'mycdc' + PROJECT_NAME = 'mylogger' STACK_NAME = "${env.PROJECT_NAME}-${env.BRANCH_NAME}" } stages { @@ -17,7 +17,7 @@ pipeline { env.VERSION = packageJson.version } configFileProvider([ - configFile(fileId: "mycdc.groovy", + configFile(fileId: "mylogger.groovy", variable: 'GROOVY_FILE') ]) { load env.GROOVY_FILE diff --git a/config.local.yml b/config.local.yml index a805586..21ff7a3 100644 --- a/config.local.yml +++ b/config.local.yml @@ -1,14 +1,17 @@ debug: true testMode: false -db: +srcDb: + host: localhost + port: 3306 + user: root + password: root + database: vn +dstDb: host: localhost port: 3306 user: root password: root database: vn -showField: - - name - description logs: ticket: logTable: clientLog diff --git a/config.yml b/config.yml index 7c632a9..8ce3ed9 100644 --- a/config.yml +++ b/config.yml @@ -3,12 +3,20 @@ debug: false testMode: false pingInterval: 60 flushInterval: 10 -db: +queueFlushDelay: 100 +maxBulkLog: 100 +srcDb: host: localhost port: 3306 user: zongji password: password database: util +dstDb: + host: localhost + port: 3306 + user: root + password: password + database: util showFields: - name - description @@ -24,7 +32,6 @@ logs: - image - supplyResponseFk types: - id: number isPrinted: boolean - name: itemTag relation: itemFk diff --git a/mylogger.js b/mylogger.js index bded49a..64b8eab 100644 --- a/mylogger.js +++ b/mylogger.js @@ -5,11 +5,11 @@ const path = require('path'); const ZongJi = require('./zongji'); const mysql = require('mysql2/promise'); -const allEvents = [ +const catchEvents = new Set([ 'writerows', 'updaterows', 'deleterows' -]; +]); const actions = { writerows: 'insert', @@ -20,10 +20,12 @@ const actions = { module.exports = class MyLogger { constructor() { this.running = false; - this.filename = null; - this.position = null; + this.binlogName = null; + this.binlogPosition = null; this.schemaMap = new Map(); this.logMap = new Map(); + this.isFlushed = true; + this.queue = []; } async start() { @@ -35,7 +37,7 @@ module.exports = class MyLogger { Object.assign(conf, localConfig); } - const defaultSchema = conf.db.database; + const defaultSchema = conf.srcDb.database; function parseTable(tableString) { let name, schema; const split = tableString.split('.'); @@ -145,7 +147,7 @@ module.exports = class MyLogger { // DB connection - const db = this.db = await mysql.createConnection(conf.db); + const db = this.db = await mysql.createConnection(conf.dstDb); db.on('error', this.onErrorListener); for (const logInfo of this.logMap.values()) { @@ -192,7 +194,7 @@ module.exports = class MyLogger { ); for (const {col, type, def} of dbCols) { - if (!tableInfo.exclude.has(col)) + if (!tableInfo.exclude.has(col) && col != 'editorFk') tableInfo.columns.set(col, {type, def}); const castType = conf.castTypes[type]; @@ -259,14 +261,6 @@ module.exports = class MyLogger { ] ); - console.debug( - table, - schema, - mainTable.name, - mainTable.schema, - mainTableInfo.idName - ); - if (!relations.length) throw new Error(`No relation to main table found for table: ${schema}.${table}`); if (relations.length > 1) @@ -279,7 +273,7 @@ module.exports = class MyLogger { // Zongji - const zongji = new ZongJi(conf.db); + const zongji = new ZongJi(conf.srcDb); this.zongji = zongji; this.onBinlogListener = evt => this.onBinlog(evt); @@ -291,11 +285,11 @@ module.exports = class MyLogger { ); if (res.length) { const [row] = res; - this.filename = row.logName; - this.position = row.position; + this.binlogName = row.logName; + this.binlogPosition = row.position; Object.assign(this.opts, { - filename: this.filename, - position: this.position + filename: row.logName, + position: row.position }); } else this.opts.startAtEnd = true; @@ -342,6 +336,9 @@ module.exports = class MyLogger { clearInterval(this.flushInterval); clearInterval(this.pingInterval); + clearInterval(this.flushTimeout); + await this.flushQueue(); + zongji.off('binlog', this.onBinlogListener); zongji.off('error', this.onErrorListener); this.zongji = null; @@ -409,29 +406,36 @@ module.exports = class MyLogger { } } - onBinlog(evt) { + async onBinlog(evt) { //evt.dump(); - const eventName = evt.getEventName(); - let position = evt.nextPosition; + try { + let shouldFlush; + const eventName = evt.getEventName(); - switch (eventName) { - case 'rotate': - this.filename = evt.binlogName; - position = evt.position; - console.log(`[${eventName}] filename: ${this.filename}`, `position: ${this.position}, nextPosition: ${evt.nextPosition}`); - break; - case 'writerows': - case 'deleterows': - case 'updaterows': - this.onRowEvent(evt, eventName); - break; + if (eventName == 'rotate') { + if (evt.binlogName !== this.binlogName) { + shouldFlush = true; + this.binlogName = evt.binlogName; + this.binlogPosition = evt.position; + console.log( + `[${eventName}] filename: ${this.binlogName}`, + `position: ${this.binlogPosition}` + ); + } + } else { + shouldFlush = true; + this.binlogPosition = evt.nextPosition; + if (catchEvents.has(eventName)) + this.onRowEvent(evt, eventName); + } + + if (shouldFlush) this.isFlushed = false; + } catch(err) { + this.handleError(err); } - - this.position = position; - this.flushed = false; } - async onRowEvent(evt, eventName) { + onRowEvent(evt, eventName) { const table = evt.tableMap[evt.tableId]; const tableName = table.tableName; const tableMap = this.schemaMap.get(table.parentSchema); @@ -450,7 +454,6 @@ module.exports = class MyLogger { return !!value; default: return value; - } } @@ -464,7 +467,6 @@ module.exports = class MyLogger { for (const col in before) { if (columns.has(col) - && after[col] !== undefined && !equals(after[col], before[col])) { if (before[col] !== null) oldI[col] = castValue(col, before[col]); @@ -480,12 +482,10 @@ module.exports = class MyLogger { for (const row of evt.rows) { const instance = {}; - for (const col of cols) { if (row[col] !== null) instance[col] = castValue(col, row[col]); } - changes.push({row, instance}); } } @@ -497,6 +497,85 @@ module.exports = class MyLogger { `${tableName}(${changes}) [${eventName}]`); } + this.queue.push({ + tableInfo, + action, + evt, + changes, + tableName, + binlogName: this.binlogName + }); + if (!this.flushTimeout) + this.flushTimeout = setTimeout( + () => this.flushQueue(), + this.conf.queueFlushDelay + ); + } + + async flushQueue() { + if (this.isFlushed || this.isFlushing) return; + this.isFlushing = true; + const {conf, db} = this; + + try { + if (this.queue.length) { + do { + let appliedOps; + try { + await db.query('START TRANSACTION'); + let op; + appliedOps = []; + + for (let i = 0; i < conf.maxBulkLog && this.queue.length; i++) { + op = this.queue.shift(); + appliedOps.push(op); + await this.applyOp(op); + } + + await this.savePosition(op.binlogName, op.evt.nextPosition) + await db.query('COMMIT'); + } catch(err) { + this.queue = appliedOps.concat(this.queue); + await db.query('ROLLBACK'); + throw err; + } + } while (this.queue.length); + } else { + await this.savePosition(this.binlogName, this.binlogPosition); + } + } catch(err) { + this.handleError(err); + } finally { + this.flushTimeout = null; + this.isFlushing = false; + } + } + + async savePosition(binlogName, binlogPosition) { + this.debug('Flush', `filename: ${binlogName}, position: ${binlogPosition}`); + + const replaceQuery = + 'REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?'; + if (!this.conf.testMode) + await this.db.query(replaceQuery, [this.conf.code, binlogName, binlogPosition]); + + this.isFlushed = this.binlogName == binlogName + && this.binlogPosition == binlogPosition; + } + + handleError(err) { + console.error('Super error:', err); + } + + async applyOp(op) { + const { + tableInfo, + action, + evt, + changes, + tableName + } = op; + const logInfo = tableInfo.log; const isDelete = action == 'delete'; @@ -558,26 +637,6 @@ module.exports = class MyLogger { } } - async flushQueue() { - if (this.flushed) return; - const position = this.nextPosition; - - if (position) { - const filename = this.nextFilename; - this.debug('Flush', `filename: ${filename}, position: ${position}`); - - const replaceQuery = - 'REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?'; - if (!this.conf.testMode) - await this.db.query(replaceQuery, [this.conf.code, filename, position]); - - this.flushed = true; - } - - this.nextFilename = this.filename; - this.nextPosition = this.position; - } - async connectionPing() { this.debug('Ping', 'Sending ping to database.');