diff --git a/config/config.yml b/config/config.yml index 6392cfe..0b0158c 100644 --- a/config/config.yml +++ b/config/config.yml @@ -7,6 +7,9 @@ restartTimeout: 30 queueFlushDelay: 200 maxBulkLog: 25 maxQueueEvents: 10000 +showCache: + maxLoops: 100 + life: 300 serverId: 1 srcDb: host: localhost diff --git a/lib/model-loader.js b/lib/model-loader.js index 9e8bd10..3c2f4b2 100644 --- a/lib/model-loader.js +++ b/lib/model-loader.js @@ -3,16 +3,24 @@ const {loadConfig, toUpperCamelCase} = require('./util'); const MultiMap = require('./multi-map'); module.exports = class ModelLoader { - init(conf) { + init(logger) { const configDir = path.join(__dirname, '..'); - const logsConf = this.logsConf = loadConfig(configDir, 'logs'); + const conf = loadConfig(configDir, 'logs'); const schemaMap = new MultiMap(); const logMap = new Map(); - const showTables = new MultiMap(); + + Object.assign(this, { + logger, + conf + }); + Object.assign(logger, { + schemaMap, + logMap + }); - for (const logName in logsConf.logs) { - const logConf = logsConf.logs[logName]; - const schema = logConf.schema || conf.srcDb.database; + for (const logName in conf.logs) { + const logConf = conf.logs[logName]; + const schema = logConf.schema || logger.conf.srcDb.database; const logInfo = { name: logName, conf: logConf, @@ -53,7 +61,7 @@ module.exports = class ModelLoader { let modelName = tableConf.modelName; if (!modelName) { - modelName = logsConf.upperCaseTable + modelName = conf.upperCaseTable ? toUpperCamelCase(table.name) : table.name; } @@ -71,17 +79,16 @@ module.exports = class ModelLoader { showField, relation, idName, - userField: tableConf.userField || logsConf.userField + userField: tableConf.userField || conf.userField }); return tableInfo; } - - return {schemaMap, logMap, showTables}; } - async loadSchema(db, schemaMap, showTables) { - const {logsConf} = this; + async loadSchema() { + const {db, schemaMap} = this.logger; + const {conf} = this; for (const [schema, table, tableInfo] of schemaMap) { const tableConf = tableInfo.conf; @@ -111,7 +118,7 @@ module.exports = class ModelLoader { if (!tableInfo.exclude.has(col) && col != tableInfo.userField) tableInfo.columns.set(col, {type, def}); - const castType = logsConf.castTypes[type]; + const castType = conf.castTypes[type]; if (castType && !tableInfo.castTypes.has(col)) tableInfo.castTypes.set(col, castType); } @@ -139,14 +146,21 @@ module.exports = class ModelLoader { // Get show field - if (!tableConf.showField) { - for (const showField of logsConf.showFields) { - if (tableInfo.columns.has(showField)) { - tableInfo.showField = showField; - break; + if (!tableInfo.isMain) { + const {showField} = tableInfo; + if (!showField) { + for (const field of conf.showFields) { + if (tableInfo.columns.has(field)) { + tableInfo.showField = field; + break; + } } + } else { + const match = showField.match(/(^.*)\$$/); + if (match) tableInfo.showRelation = match[1]; } - } + } else + tableInfo.showField = null; } // Fetch relation to main table @@ -182,110 +196,6 @@ module.exports = class ModelLoader { tableInfo.relation = relation; } } - - // Fetch relations with other tables - // TODO: #5563 Fetch relations and show values in fronted - - showTables.clear(); - - for (const [schema, table, tableInfo] of schemaMap) { - const [relations] = await db.query( - `SELECT - COLUMN_NAME \`col\`, - REFERENCED_TABLE_SCHEMA \`schema\`, - REFERENCED_TABLE_NAME \`table\` - FROM information_schema.KEY_COLUMN_USAGE - WHERE TABLE_NAME = ? - AND TABLE_SCHEMA = ? - AND COLUMN_NAME IN (?) - AND REFERENCED_TABLE_NAME IS NOT NULL`, - [ - table, - schema, - Array.from(tableInfo.columns.keys()) - ] - ); - - tableInfo.relations = new Map(); - for (const {col, schema, table} of relations) { - if (col == tableInfo.relation) continue; - tableInfo.relations.set(col, {schema, table}); - showTables.setIfEmpty(schema, table, {}); - } - } - - const relatedList = Array.from(showTables.keys()); - - // Fetch primary key of related tables - - const [res] = await db.query( - `SELECT - TABLE_SCHEMA \`schema\`, - TABLE_NAME \`table\`, - COLUMN_NAME \`idName\`, - COUNT(*) nPks - FROM information_schema.\`COLUMNS\` - WHERE (TABLE_SCHEMA, TABLE_NAME) IN (?) - AND COLUMN_KEY = 'PRI' - GROUP BY TABLE_NAME, TABLE_SCHEMA - HAVING nPks = 1`, - [relatedList] - ); - for (const {schema, table, idName} of res) - showTables.get(schema, table).idName = idName; - - // Fetch show field of related tables - - const showFields = logsConf.showFields; - const [result] = await db.query( - `SELECT - TABLE_SCHEMA \`schema\`, - TABLE_NAME \`table\`, - COLUMN_NAME \`col\` - FROM information_schema.\`COLUMNS\` - WHERE (TABLE_SCHEMA, TABLE_NAME) IN (?) - AND COLUMN_NAME IN (?) - AND COLUMN_KEY <> 'PRI'`, - [relatedList, showFields] - ); - - for (const {schema, table, col} of result) { - const tableInfo = showTables.get(schema, table); - let save; - if (tableInfo.showField) { - const newIndex = showFields.indexOf(col); - const oldIndex = showFields.indexOf(tableInfo.showField); - save = newIndex < oldIndex; - } else - save = true; - if (save) tableInfo.showField = col; - } - - // Clean tables and relations without required information - - for (const [schema, table] of relatedList) { - const tableInfo = showTables.get(schema, table); - const {idName, showField} = tableInfo; - if (!idName || !showField || idName == showField) { - showTables.delete(schema, table); - continue; - } - - const sqlShowField = db.escapeId(showField); - const sqlIdName = db.escapeId(idName); - const sqlTable = `${db.escapeId(schema)}.${db.escapeId(table)}`; - - tableInfo.selectStmt = - `SELECT ${sqlIdName} \`id\`, ${sqlShowField} \`val\` - FROM ${sqlTable} - WHERE ${sqlIdName} IN (?)`; - } - - for (const tableInfo of schemaMap.values()) - for (const [col, relation] of tableInfo.relations) { - if (!showTables.has(relation.schema, relation.table)) - tableInfo.relations.delete(col); - } } } diff --git a/lib/show-db.js b/lib/show-db.js new file mode 100644 index 0000000..b02e120 --- /dev/null +++ b/lib/show-db.js @@ -0,0 +1,232 @@ +const MultiMap = require("./multi-map"); + +/** + * TODO: #5563 Fetch relations and show values in fronted + */ +module.exports = class ShowDb { + init(logger) { + Object.assign(this, { + logger, + conf: logger.conf.showCache, + tables: new MultiMap(), + valueDb: new MultiMap() + }); + } + + checkDb() { + const {conf, valueDb} = this; + const dbOutdated = this.loops % conf.maxLoops == 0 + || this.lastFlush > Date.now() + conf.life * 1000 + + if (dbOutdated) { + valueDb.clear(); + this.loops = 0; + this.lastFlush = Date.now(); + } + this.loops++; + } + + async loadSchema() { + const {logger, tables} = this; + const {db, schemaMap} = logger; + tables.clear(); + + // Fetch relations with other tables + + for (const [schema, table, tableInfo] of schemaMap) { + const [relations] = await db.query( + `SELECT + COLUMN_NAME \`col\`, + REFERENCED_TABLE_SCHEMA \`schema\`, + REFERENCED_TABLE_NAME \`table\` + FROM information_schema.KEY_COLUMN_USAGE + WHERE TABLE_NAME = ? + AND TABLE_SCHEMA = ? + AND COLUMN_NAME IN (?) + AND REFERENCED_TABLE_NAME IS NOT NULL`, + [ + table, + schema, + Array.from(tableInfo.columns.keys()) + ] + ); + + tableInfo.relations = new Map(); + for (const {col, schema, table} of relations) { + if (col == tableInfo.relation) continue; + tableInfo.relations.set(col, {schema, table}); + tables.setIfEmpty(schema, table, {}); + } + } + + const relatedList = Array.from(tables.keys()); + + // Fetch primary key of related tables + + const [res] = await db.query( + `SELECT + TABLE_SCHEMA \`schema\`, + TABLE_NAME \`table\`, + COLUMN_NAME \`idName\`, + COUNT(*) nPks + FROM information_schema.\`COLUMNS\` + WHERE (TABLE_SCHEMA, TABLE_NAME) IN (?) + AND COLUMN_KEY = 'PRI' + GROUP BY TABLE_NAME, TABLE_SCHEMA + HAVING nPks = 1`, + [relatedList] + ); + for (const {schema, table, idName} of res) + tables.get(schema, table).idName = idName; + + // Fetch show field of related tables + + const showFields = logger.modelLoader.conf.showFields; + const [result] = await db.query( + `SELECT + TABLE_SCHEMA \`schema\`, + TABLE_NAME \`table\`, + COLUMN_NAME \`col\` + FROM information_schema.\`COLUMNS\` + WHERE (TABLE_SCHEMA, TABLE_NAME) IN (?) + AND COLUMN_NAME IN (?) + AND COLUMN_KEY <> 'PRI'`, + [relatedList, showFields] + ); + + for (const {schema, table, col} of result) { + const tableInfo = tables.get(schema, table); + let save; + if (tableInfo.showField) { + const newIndex = showFields.indexOf(col); + const oldIndex = showFields.indexOf(tableInfo.showField); + save = newIndex < oldIndex; + } else + save = true; + if (save) tableInfo.showField = col; + } + + // Clean tables and relations without required information + + for (const [schema, table] of relatedList) { + const tableInfo = tables.get(schema, table); + const {idName, showField} = tableInfo; + if (!idName || !showField || idName == showField) { + tables.delete(schema, table); + continue; + } + + const sqlShowField = db.escapeId(showField); + const sqlIdName = db.escapeId(idName); + const sqlTable = `${db.escapeId(schema)}.${db.escapeId(table)}`; + + tableInfo.selectStmt = + `SELECT ${sqlIdName} \`id\`, ${sqlShowField} \`val\` + FROM ${sqlTable} + WHERE ${sqlIdName} IN (?)`; + } + + for (const tableInfo of schemaMap.values()) + for (const [col, relation] of tableInfo.relations) { + if (!tables.has(relation.schema, relation.table)) + tableInfo.relations.delete(col); + } + } + + async getValues(db, ops) { + const {tables, valueDb} = this; + const showIds = new MultiMap(); + + this.checkDb(); + + // Fetch relations ids + + for (const op of ops) { + const { + relations, + showRelation + } = op.tableInfo; + + for (const change of op.changes) { + let rows; + if (op.action == 'update') + rows = [change.newI, change.oldI]; + else + rows = [change.instance]; + + if (showRelation) + rows.push({[showRelation]: change.row[showRelation]}); + + for (const row of rows) + for (const col in row) { + const relation = relations.get(col); + if (!relation) continue; + const {schema, table} = relation; + const id = row[col]; + + let ids = valueDb.get(schema, table); + if (ids && ids.has(id)) continue; + + ids = showIds.get(schema, table); + if (!ids) showIds.set(schema, table, ids = new Set()); + ids.add(id); + } + } + } + + // Query show values to database + + for (const [schema, table, ids] of showIds) { + const tableInfo = tables.get(schema, table); + const [res] = await db.query( + tableInfo.selectStmt, + [Array.from(ids.keys())] + ); + + let cacheIds = valueDb.get(schema, table); + if (!cacheIds) valueDb.set(schema, table, cacheIds = new Map()); + + for (const row of res) + cacheIds.set(row.id, row.val); + } + + // Fill rows with show values + + for (const op of ops) { + const { + relations, + showRelation, + showField + } = op.tableInfo; + + for (const change of op.changes) { + let rows; + if (op.action == 'update') + rows = [change.newI, change.oldI]; + else + rows = [change.instance]; + + for (const row of rows) + for (const col in row) { + const relation = relations.get(col); + if (!relation) continue; + const showValue = getValue(relation, row, col); + if (showValue) row[col +'$'] = showValue; + } + + const {row} = change; + if (showRelation) { + const relation = relations.get(showRelation); + change.modelValue = getValue(relation, row, showRelation); + } else if (showField) + change.modelValue = row[showField]; + } + } + + function getValue(relation, row, col) { + const {schema, table} = relation; + const ids = valueDb.get(schema, table); + return ids && ids.get(row[col]) + } + } +} diff --git a/mylogger.js b/mylogger.js index f359137..efb5d88 100644 --- a/mylogger.js +++ b/mylogger.js @@ -4,7 +4,7 @@ const ZongJi = require('./zongji'); const mysql = require('mysql2/promise'); const {loadConfig} = require('./lib/util'); const ModelLoader = require('./lib/model-loader'); -const MultiMap = require('./lib/multi-map'); +const ShowDb = require('./lib/show-db'); module.exports = class MyLogger { constructor() { @@ -15,11 +15,14 @@ module.exports = class MyLogger { this.isFlushed = true; this.queue = []; this.modelLoader = new ModelLoader(); + this.showDb = new ShowDb(); } async start() { const conf = this.conf = loadConfig(__dirname, 'config'); - Object.assign(this, this.modelLoader.init(conf)); + + this.modelLoader.init(this); + this.showDb.init(this); const includeSchema = {}; for (const [schemaName, tableMap] of this.schemaMap.map) @@ -61,6 +64,9 @@ module.exports = class MyLogger { const db = this.db = await mysql.createConnection(conf.dstDb); db.on('error', this.onErrorListener); + await this.modelLoader.loadSchema(); + await this.showDb.loadSchema(); + for (const logInfo of this.logMap.values()) { const table = logInfo.table; const sqlTable = `${db.escapeId(table.schema)}.${db.escapeId(table.name)}`; @@ -94,11 +100,6 @@ module.exports = class MyLogger { ); } - await this.modelLoader.loadSchema(db, - this.schemaMap, - this.showTables - ); - // Zongji this.onBinlogListener = evt => this.onBinlog(evt); @@ -412,11 +413,12 @@ module.exports = class MyLogger { do { const ops = []; let txStarted; + try { for (let i = 0; i < conf.maxBulkLog && queue.length; i++) ops.push(queue.shift()); - await this.getShowValues(ops); + await this.showDb.getValues(db, ops); await db.query('START TRANSACTION'); txStarted = true; @@ -453,72 +455,6 @@ module.exports = class MyLogger { } } - async getShowValues(ops) { - const {db, showTables} = this; - const showValues = new MultiMap(); - - // Fetch relations id - - for (const op of ops) { - if (op.hasShowValues) continue; - const {relations} = op.tableInfo; - for (const change of op.changes) { - if (op.action == 'update') { - getRelationsId(relations, change.newI); - getRelationsId(relations, change.oldI); - } else - getRelationsId(relations, change.instance); - } - } - - function getRelationsId(relations, row) { - for (const col in row) { - const relation = relations.get(col); - if (!relation) continue; - const {schema, table} = relation; - let ids = showValues.get(schema, table); - if (!ids) showValues.set(schema, table, ids = new Map()); - if (!ids.has(row[col])) ids.set(row[col], null); - } - } - - // Fetch show values - - for (const [schema, table, ids] of showValues) { - const tableInfo = showTables.get(schema, table); - const [res] = await db.query( - tableInfo.selectStmt, - [Array.from(ids.keys())] - ); - for (const row of res) - ids.set(row.id, row.val); - } - - // Fill rows with show values - - for (const op of ops) { - const {relations} = op.tableInfo; - for (const change of op.changes) { - if (op.action == 'update') { - setShowValues(relations, change.newI); - setShowValues(relations, change.oldI); - } else - setShowValues(relations, change.instance); - } - op.hasShowValues = true; - } - - function setShowValues(relations, row) { - for (const col in row) { - const relation = relations.get(col); - if (!relation) continue; - const {schema, table} = relation; - const showValue = showValues.get(schema, table).get(row[col]); - if (showValue) row[`${col}$`] = showValue; - } - } - } - async applyOp(op) { const {conf} = this; const { @@ -533,6 +469,8 @@ module.exports = class MyLogger { const isUpdate = action == 'update'; const isSecondary = !tableInfo.isMain; const relation = tableInfo.relation; + const modelName = tableInfo.modelName; + const created = new Date(evt.timestamp); for (const change of changes) { let newI, oldI; @@ -551,12 +489,8 @@ module.exports = class MyLogger { break; } - const created = new Date(evt.timestamp); - const modelName = tableInfo.modelName; const modelId = row[tableInfo.idName]; - const modelValue = tableInfo.showField && isSecondary - ? row[tableInfo.showField] || null - : null; + const modelValue = change.modelValue; const oldInstance = oldI ? JSON.stringify(oldI) : null; const originFk = isSecondary ? row[relation] : modelId; const originChanged = isUpdate && isSecondary diff --git a/package-lock.json b/package-lock.json index 4765ca3..2602d86 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "mylogger", - "version": "0.1.22", + "version": "0.1.23", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "mylogger", - "version": "0.1.22", + "version": "0.1.23", "license": "GPL-3.0", "dependencies": { "colors": "^1.4.0", diff --git a/package.json b/package.json index 6455595..dc55172 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "mylogger", - "version": "0.1.22", + "version": "0.1.23", "author": "Verdnatura Levante SL", "description": "MySQL and MariaDB logger using binary log", "license": "GPL-3.0",