diff --git a/.gitignore b/.gitignore index f14ce54..7e02202 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ node_modules zongji -config/*.local.yml \ No newline at end of file +config.*.yml \ No newline at end of file diff --git a/config.yml b/config.yml index c65cdb9..21e3ca9 100644 --- a/config.yml +++ b/config.yml @@ -1,11 +1,12 @@ code: mylogger debug: false testMode: false -pingInterval: 60 -flushInterval: 10 +pingInterval: 3600 +flushInterval: 30 restartTimeout: 30 -queueFlushDelay: 100 +queueFlushDelay: 200 maxBulkLog: 100 +upperCaseTable: true srcDb: host: localhost port: 3306 diff --git a/mylogger.js b/mylogger.js index 08f6a37..b9e833d 100644 --- a/mylogger.js +++ b/mylogger.js @@ -5,18 +5,6 @@ const path = require('path'); const ZongJi = require('./zongji'); const mysql = require('mysql2/promise'); -const catchEvents = new Set([ - 'writerows', - 'updaterows', - 'deleterows' -]); - -const actions = { - writerows: 'insert', - updaterows: 'update', - deleterows: 'delete' -}; - module.exports = class MyLogger { constructor() { this.running = false; @@ -72,9 +60,14 @@ module.exports = class MyLogger { tableMap.set(table.name, tableInfo); } + const modelName = conf.upperCaseTable + ? toUpperCamelCase(table.name) + : table.name; + Object.assign(tableInfo, { conf: tableConf, - exclude: new Set(tableConf.exclude) + exclude: new Set(tableConf.exclude), + modelName }); return tableInfo; @@ -239,7 +232,7 @@ module.exports = class MyLogger { for (const [schema, tableMap] of this.schemaMap) for (const [table, tableInfo] of tableMap) { - // Fetch relation + // Fetch relation to main table if (!tableInfo.conf.relation && !tableInfo.isMain) { const mainTable = tableInfo.log.mainTable; @@ -247,7 +240,7 @@ module.exports = class MyLogger { .get(mainTable.schema) .get(mainTable.name); - const [relations] = await db.query( + const [mainRelations] = await db.query( `SELECT COLUMN_NAME relation FROM information_schema.KEY_COLUMN_USAGE WHERE TABLE_NAME = ? @@ -264,16 +257,40 @@ module.exports = class MyLogger { ] ); - if (!relations.length) + if (!mainRelations.length) throw new Error(`No relation to main table found for table: ${schema}.${table}`); - if (relations.length > 1) + if (mainRelations.length > 1) throw new Error(`Found more multiple relations to main table: ${schema}.${table}`); - for (const {relation} of relations) + for (const {relation} of mainRelations) tableInfo.relation = relation; } + + // Fetch relations + // TODO: Use relations to fetch names of related entities + + const [relations] = await db.query( + `SELECT + COLUMN_NAME \`col\`, + REFERENCED_TABLE_SCHEMA \`schema\`, + REFERENCED_TABLE_NAME \`table\`, + REFERENCED_COLUMN_NAME \`column\` + FROM information_schema.KEY_COLUMN_USAGE + WHERE TABLE_NAME = ? + AND TABLE_SCHEMA = ? + AND REFERENCED_TABLE_NAME IS NOT NULL`, + [ + table, + schema + ] + ); + + tableInfo.relations = new Map(); + for (const {col, schema, table, column} of relations) + tableInfo.relations.set(col, {schema, table, column}); } + // Zongji const zongji = new ZongJi(conf.srcDb); @@ -421,6 +438,10 @@ module.exports = class MyLogger { } } + handleError(err) { + console.error(err); + } + async onBinlog(evt) { //evt.dump(); try { @@ -508,8 +529,7 @@ module.exports = class MyLogger { if (!changes.length) return; if (this.debug) { - console.debug('Log:'.blue, - `${tableName}(${changes}) [${eventName}]`); + console.debug('Log:'.blue, `[${action}]`.yellow, tableName); } this.queue.push({ @@ -517,7 +537,6 @@ module.exports = class MyLogger { action, evt, changes, - tableName, binlogName: this.binlogName }); if (!this.flushTimeout) @@ -531,6 +550,7 @@ module.exports = class MyLogger { if (this.isFlushed || this.isFlushing || !this.isOk) return; this.isFlushing = true; const {conf, db} = this; + let op; try { if (this.queue.length) { @@ -538,7 +558,6 @@ module.exports = class MyLogger { let appliedOps; try { await db.query('START TRANSACTION'); - let op; appliedOps = []; for (let i = 0; i < conf.maxBulkLog && this.queue.length; i++) { @@ -566,29 +585,12 @@ module.exports = class MyLogger { } } - async savePosition(binlogName, binlogPosition) { - this.debug('Flush', `filename: ${binlogName}, position: ${binlogPosition}`); - - const replaceQuery = - 'REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?'; - if (!this.conf.testMode) - await this.db.query(replaceQuery, [this.conf.code, binlogName, binlogPosition]); - - this.isFlushed = this.binlogName == binlogName - && this.binlogPosition == binlogPosition; - } - - handleError(err) { - console.error(err); - } - async applyOp(op) { const { tableInfo, action, evt, - changes, - tableName + changes } = op; const logInfo = tableInfo.log; @@ -611,6 +613,7 @@ module.exports = class MyLogger { break; } + const modelName = tableInfo.modelName; const modelId = row[tableInfo.idName]; const modelValue = tableInfo.showField ? row[tableInfo.showField] || null @@ -625,7 +628,7 @@ module.exports = class MyLogger { if (isDelete) { [[deleteRow]] = await logInfo.fetchStmt.execute([ - tableName, modelId + modelName, modelId ]); if (deleteRow) await logInfo.updateStmt.execute([ @@ -642,7 +645,7 @@ module.exports = class MyLogger { row.editorFk || null, action, created, - tableName, + modelName, oldInstance, newI ? JSON.stringify(newI) : null, modelId, @@ -652,6 +655,18 @@ module.exports = class MyLogger { } } + async savePosition(binlogName, binlogPosition) { + this.debug('Flush', `filename: ${binlogName}, position: ${binlogPosition}`); + + const replaceQuery = + 'REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?'; + if (!this.conf.testMode) + await this.db.query(replaceQuery, [this.conf.code, binlogName, binlogPosition]); + + this.isFlushed = this.binlogName == binlogName + && this.binlogPosition == binlogPosition; + } + async connectionPing() { if (!this.isOk) return; try { @@ -676,6 +691,24 @@ module.exports = class MyLogger { } } +const catchEvents = new Set([ + 'writerows', + 'updaterows', + 'deleterows' +]); + +const actions = { + writerows: 'insert', + updaterows: 'update', + deleterows: 'delete' +}; + +function toUpperCamelCase(str) { + str = str.replace(/[-_ ][a-z]/g, + match => match.charAt(1).toUpperCase()); + return str.charAt(0).toUpperCase() + str.substr(1); +} + function equals(a, b) { if (a === b) return true;