refs #5563 Fist showValue implementation
gitea/mylogger/pipeline/head This commit looks good Details

This commit is contained in:
Juan Ferrer 2023-06-05 14:49:57 +02:00
parent 6e12a59497
commit 0b2a81e744
5 changed files with 234 additions and 76 deletions

View File

@ -1,12 +1,14 @@
const path = require('path'); const path = require('path');
const {loadConfig, toUpperCamelCase} = require('./util'); const {loadConfig, toUpperCamelCase} = require('./util');
const MultiMap = require('./multi-map');
module.exports = class ModelLoader { module.exports = class ModelLoader {
init(conf) { init(conf) {
const configDir = path.join(__dirname, '..'); const configDir = path.join(__dirname, '..');
const logsConf = this.logsConf = loadConfig(configDir, 'logs'); const logsConf = this.logsConf = loadConfig(configDir, 'logs');
const schemaMap = new Map(); const schemaMap = new MultiMap();
const logMap = new Map(); const logMap = new Map();
const showTables = new MultiMap();
for (const logName in logsConf.logs) { for (const logName in logsConf.logs) {
const logConf = logsConf.logs[logName]; const logConf = logsConf.logs[logName];
@ -40,19 +42,13 @@ module.exports = class ModelLoader {
tableConf = {name: tableConf}; tableConf = {name: tableConf};
const table = parseTable(tableConf.name, logInfo.schema); const table = parseTable(tableConf.name, logInfo.schema);
let tableMap = schemaMap.get(table.schema); let tableInfo = schemaMap.get(table.schema, table.name);
if (!tableMap) {
tableMap = new Map();
schemaMap.set(table.schema, tableMap);
}
let tableInfo = tableMap.get(table.name);
if (!tableInfo) { if (!tableInfo) {
tableInfo = { tableInfo = {
conf: tableConf, conf: tableConf,
log: logInfo log: logInfo
}; };
tableMap.set(table.name, tableInfo); schemaMap.set(table.schema, table.name, tableInfo);
} }
let modelName = tableConf.modelName; let modelName = tableConf.modelName;
@ -81,14 +77,13 @@ module.exports = class ModelLoader {
return tableInfo; return tableInfo;
} }
return {schemaMap, logMap}; return {schemaMap, logMap, showTables};
} }
async loadSchema(schemaMap, db) { async loadSchema(db, schemaMap, showTables) {
const {logsConf} = this; const {logsConf} = this;
for (const [schema, tableMap] of schemaMap) for (const [schema, table, tableInfo] of schemaMap) {
for (const [table, tableInfo] of tableMap) {
const tableConf = tableInfo.conf; const tableConf = tableInfo.conf;
// Fetch columns & types // Fetch columns & types
@ -156,14 +151,10 @@ module.exports = class ModelLoader {
// Fetch relation to main table // Fetch relation to main table
for (const [schema, tableMap] of schemaMap) for (const [schema, table, tableInfo] of schemaMap) {
for (const [table, tableInfo] of tableMap) {
if (!tableInfo.conf.relation && !tableInfo.isMain) { if (!tableInfo.conf.relation && !tableInfo.isMain) {
const mainTable = tableInfo.log.mainTable; const mainTable = tableInfo.log.mainTable;
const mainTableInfo = schemaMap const mainInfo = schemaMap.get(mainTable.schema, mainTable.name);
.get(mainTable.schema)
.get(mainTable.name);
const [mainRelations] = await db.query( const [mainRelations] = await db.query(
`SELECT COLUMN_NAME relation `SELECT COLUMN_NAME relation
@ -178,7 +169,7 @@ module.exports = class ModelLoader {
schema, schema,
mainTable.name, mainTable.name,
mainTable.schema, mainTable.schema,
mainTableInfo.idName mainInfo.idName
] ]
); );
@ -192,69 +183,108 @@ module.exports = class ModelLoader {
} }
} }
// Fetch relations and show values of related tables // Fetch relations with other tables
// TODO: #5563 Not used yet // TODO: #5563 Fetch relations and show values in fronted
const relatedList = []; showTables.clear();
const relatedMap = new Map();
for (const [schema, tableMap] of schemaMap) for (const [schema, table, tableInfo] of schemaMap) {
for (const [table, tableInfo] of tableMap) {
const [relations] = await db.query( const [relations] = await db.query(
`SELECT `SELECT
COLUMN_NAME \`col\`, COLUMN_NAME \`col\`,
REFERENCED_TABLE_SCHEMA \`schema\`, REFERENCED_TABLE_SCHEMA \`schema\`,
REFERENCED_TABLE_NAME \`table\`, REFERENCED_TABLE_NAME \`table\`
REFERENCED_COLUMN_NAME \`column\`
FROM information_schema.KEY_COLUMN_USAGE FROM information_schema.KEY_COLUMN_USAGE
WHERE TABLE_NAME = ? WHERE TABLE_NAME = ?
AND TABLE_SCHEMA = ? AND TABLE_SCHEMA = ?
AND COLUMN_NAME IN (?)
AND REFERENCED_TABLE_NAME IS NOT NULL`, AND REFERENCED_TABLE_NAME IS NOT NULL`,
[table, schema] [
table,
schema,
Array.from(tableInfo.columns.keys())
]
); );
tableInfo.relations = new Map(); tableInfo.relations = new Map();
for (const {col, schema, table, column} of relations) { for (const {col, schema, table} of relations) {
tableInfo.relations.set(col, {schema, table, column}); if (col == tableInfo.relation) continue;
relatedList.push([table, schema]); tableInfo.relations.set(col, {schema, table});
showTables.setIfEmpty(schema, table, {});
let tables = relatedMap.get(schema);
if (!tables) relatedMap.set(schema, tables = new Set());
if (!tables.has(table)) {
tables.add(table);
relatedList.push([table, schema]);
}
} }
} }
const relatedList = Array.from(showTables.keys());
// Fetch primary key of related tables
const [res] = await db.query(
`SELECT
TABLE_SCHEMA \`schema\`,
TABLE_NAME \`table\`,
COLUMN_NAME \`idName\`,
COUNT(*) nPks
FROM information_schema.\`COLUMNS\`
WHERE (TABLE_SCHEMA, TABLE_NAME) IN (?)
AND COLUMN_KEY = 'PRI'
GROUP BY TABLE_NAME, TABLE_SCHEMA
HAVING nPks = 1`,
[relatedList]
);
for (const {schema, table, idName} of res)
showTables.get(schema, table).idName = idName;
// Fetch show field of related tables
const showFields = logsConf.showFields; const showFields = logsConf.showFields;
const [result] = await db.query( const [result] = await db.query(
`SELECT `SELECT
TABLE_NAME \`table\`,
TABLE_SCHEMA \`schema\`, TABLE_SCHEMA \`schema\`,
TABLE_NAME \`table\`,
COLUMN_NAME \`col\` COLUMN_NAME \`col\`
FROM information_schema.\`COLUMNS\` FROM information_schema.\`COLUMNS\`
WHERE (TABLE_NAME, TABLE_SCHEMA) IN (?) WHERE (TABLE_SCHEMA, TABLE_NAME) IN (?)
AND COLUMN_NAME IN (?)`, AND COLUMN_NAME IN (?)
AND COLUMN_KEY <> 'PRI'`,
[relatedList, showFields] [relatedList, showFields]
); );
const showTables = new Map(); for (const {schema, table, col} of result) {
const tableInfo = showTables.get(schema, table);
for (const {table, schema, col} of result) {
let tables = showTables.get(schema);
if (!tables) showTables.set(schema, tables = new Map())
const showField = tables.get(table);
let save; let save;
if (showField) { if (tableInfo.showField) {
const newIndex = showFields.indexOf(col); const newIndex = showFields.indexOf(col);
const oldIndex = showFields.indexOf(showField); const oldIndex = showFields.indexOf(tableInfo.showField);
save = newIndex < oldIndex; save = newIndex < oldIndex;
} else } else
save = true; save = true;
if (save) tableInfo.showField = col;
if (save) tables.set(table, col); }
// Clean tables and relations without required information
for (const [schema, table] of relatedList) {
const tableInfo = showTables.get(schema, table);
const {idName, showField} = tableInfo;
if (!idName || !showField || idName == showField) {
showTables.delete(schema, table);
continue;
}
const sqlShowField = db.escapeId(showField);
const sqlIdName = db.escapeId(idName);
const sqlTable = `${db.escapeId(schema)}.${db.escapeId(table)}`;
tableInfo.selectStmt =
`SELECT ${sqlIdName} \`id\`, ${sqlShowField} \`val\`
FROM ${sqlTable}
WHERE ${sqlIdName} IN (?)`;
}
for (const tableInfo of schemaMap.values())
for (const [col, relation] of tableInfo.relations) {
if (!showTables.has(relation.schema, relation.table))
tableInfo.relations.delete(col);
} }
} }
} }

58
lib/multi-map.js Normal file
View File

@ -0,0 +1,58 @@
module.exports = class MultiMap {
constructor() {
this.map = new Map();
}
set(key, subKey, value) {
let subKeys = this.map.get(key);
if (!subKeys) this.map.set(key, subKeys = new Map());
subKeys.set(subKey, value);
}
setIfEmpty(key, subKey, value) {
if (!this.has(key, subKey))
this.set(key, subKey, value);
}
get(key, subKey) {
return this.map.get(key)?.get(subKey);
}
has(key, subKey) {
const subMap = this.map.get(key);
return subMap && subMap.has(subKey);
}
delete(key, subKey) {
const subMap = this.map.get(key);
if (subMap) subMap.delete(subKey);
}
clear() {
for (const subMap of this.map.values())
subMap.clear();
this.map.clear();
}
*keys() {
for (const [key, subMap] of this.map)
for (const subKey of subMap.keys())
yield [key, subKey];
}
*values() {
for (const subMap of this.map.values())
for (const value of subMap.values())
yield value;
}
*entries() {
for (const [key, subMap] of this.map)
for (const [subKey, value] of subMap)
yield [key, subKey, value];
}
[Symbol.iterator]() {
return this.entries();
};
}

View File

@ -4,6 +4,7 @@ const ZongJi = require('./zongji');
const mysql = require('mysql2/promise'); const mysql = require('mysql2/promise');
const {loadConfig} = require('./lib/util'); const {loadConfig} = require('./lib/util');
const ModelLoader = require('./lib/model-loader'); const ModelLoader = require('./lib/model-loader');
const MultiMap = require('./lib/multi-map');
module.exports = class MyLogger { module.exports = class MyLogger {
constructor() { constructor() {
@ -18,12 +19,10 @@ module.exports = class MyLogger {
async start() { async start() {
const conf = this.conf = loadConfig(__dirname, 'config'); const conf = this.conf = loadConfig(__dirname, 'config');
Object.assign(this, this.modelLoader.init(conf));
const {logMap, schemaMap} = this.modelLoader.init(conf);
Object.assign(this, {logMap, schemaMap});
const includeSchema = {}; const includeSchema = {};
for (const [schemaName, tableMap] of this.schemaMap) for (const [schemaName, tableMap] of this.schemaMap.map)
includeSchema[schemaName] = Array.from(tableMap.keys()); includeSchema[schemaName] = Array.from(tableMap.keys());
this.zongjiOpts = { this.zongjiOpts = {
@ -64,7 +63,7 @@ module.exports = class MyLogger {
for (const logInfo of this.logMap.values()) { for (const logInfo of this.logMap.values()) {
const table = logInfo.table; const table = logInfo.table;
const sqlTable = `${db.escapeId(table.schema)}.${db.escapeId(table.name)}` const sqlTable = `${db.escapeId(table.schema)}.${db.escapeId(table.name)}`;
logInfo.addStmt = await db.prepare( logInfo.addStmt = await db.prepare(
`INSERT INTO ${sqlTable} `INSERT INTO ${sqlTable}
SET originFk = ?, SET originFk = ?,
@ -95,7 +94,10 @@ module.exports = class MyLogger {
); );
} }
await this.modelLoader.loadSchema(this.schemaMap, db); await this.modelLoader.loadSchema(db,
this.schemaMap,
this.showTables
);
// Zongji // Zongji
@ -306,8 +308,7 @@ module.exports = class MyLogger {
onRowEvent(evt, eventName) { onRowEvent(evt, eventName) {
const table = evt.tableMap[evt.tableId]; const table = evt.tableMap[evt.tableId];
const tableName = table.tableName; const tableName = table.tableName;
const tableInfo = this.schemaMap const tableInfo = this.schemaMap.get(table.parentSchema, tableName);
.get(table.parentSchema)?.get(tableName);
if (!tableInfo) return; if (!tableInfo) return;
const action = actions[eventName]; const action = actions[eventName];
@ -412,17 +413,20 @@ module.exports = class MyLogger {
const ops = []; const ops = [];
let txStarted; let txStarted;
try { try {
for (let i = 0; i < conf.maxBulkLog && queue.length; i++)
ops.push(queue.shift());
await this.getShowValues(ops);
await db.query('START TRANSACTION'); await db.query('START TRANSACTION');
txStarted = true; txStarted = true;
for (let i = 0; i < conf.maxBulkLog && queue.length; i++) { for (op of ops)
op = queue.shift();
ops.push(op);
await this.applyOp(op); await this.applyOp(op);
}
this.debug('Queue', `applied: ${ops.length}, remaining: ${queue.length}`); this.debug('Queue', `applied: ${ops.length}, remaining: ${queue.length}`);
await this.savePosition(op.binlogName, op.evt.nextPosition) await this.savePosition(op.binlogName, op.evt.nextPosition);
await db.query('COMMIT'); await db.query('COMMIT');
} catch(err) { } catch(err) {
queue.unshift(...ops); queue.unshift(...ops);
@ -449,6 +453,72 @@ module.exports = class MyLogger {
} }
} }
async getShowValues(ops) {
const {db, showTables} = this;
const showValues = new MultiMap();
// Fetch relations id
for (const op of ops) {
if (op.hasShowValues) continue;
const {relations} = op.tableInfo;
for (const change of op.changes) {
if (op.action == 'update') {
getRelationsId(relations, change.newI);
getRelationsId(relations, change.oldI);
} else
getRelationsId(relations, change.instance);
}
}
function getRelationsId(relations, row) {
for (const col in row) {
const relation = relations.get(col);
if (!relation) continue;
const {schema, table} = relation;
let ids = showValues.get(schema, table);
if (!ids) showValues.set(schema, table, ids = new Map());
if (!ids.has(row[col])) ids.set(row[col], null);
}
}
// Fetch show values
for (const [schema, table, ids] of showValues) {
const tableInfo = showTables.get(schema, table);
const [res] = await db.query(
tableInfo.selectStmt,
[Array.from(ids.keys())]
);
for (const row of res)
ids.set(row.id, row.val);
}
// Fill rows with show values
for (const op of ops) {
const {relations} = op.tableInfo;
for (const change of op.changes) {
if (op.action == 'update') {
setShowValues(relations, change.newI);
setShowValues(relations, change.oldI);
} else
setShowValues(relations, change.instance);
}
op.hasShowValues = true;
}
function setShowValues(relations, row) {
for (const col in row) {
const relation = relations.get(col);
if (!relation) continue;
const {schema, table} = relation;
const showValue = showValues.get(schema, table).get(row[col]);
if (showValue) row[`${col}$`] = showValue;
}
}
}
async applyOp(op) { async applyOp(op) {
const {conf} = this; const {conf} = this;
const { const {
@ -461,7 +531,7 @@ module.exports = class MyLogger {
const logInfo = tableInfo.log; const logInfo = tableInfo.log;
const isDelete = action == 'delete'; const isDelete = action == 'delete';
const isUpdate = action == 'update'; const isUpdate = action == 'update';
const isMain = tableInfo.isMain; const isSecondary = !tableInfo.isMain;
const relation = tableInfo.relation; const relation = tableInfo.relation;
for (const change of changes) { for (const change of changes) {
@ -484,12 +554,12 @@ module.exports = class MyLogger {
const created = new Date(evt.timestamp); const created = new Date(evt.timestamp);
const modelName = tableInfo.modelName; const modelName = tableInfo.modelName;
const modelId = row[tableInfo.idName]; const modelId = row[tableInfo.idName];
const modelValue = tableInfo.showField && !isMain const modelValue = tableInfo.showField && isSecondary
? row[tableInfo.showField] || null ? row[tableInfo.showField] || null
: null; : null;
const oldInstance = oldI ? JSON.stringify(oldI) : null; const oldInstance = oldI ? JSON.stringify(oldI) : null;
const originFk = !isMain ? row[relation] : modelId; const originFk = isSecondary ? row[relation] : modelId;
const originChanged = isUpdate && !isMain const originChanged = isUpdate && isSecondary
&& newI[relation] !== undefined; && newI[relation] !== undefined;
let deleteRow; let deleteRow;

4
package-lock.json generated
View File

@ -1,12 +1,12 @@
{ {
"name": "mylogger", "name": "mylogger",
"version": "0.1.21", "version": "0.1.22",
"lockfileVersion": 2, "lockfileVersion": 2,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "mylogger", "name": "mylogger",
"version": "0.1.21", "version": "0.1.22",
"license": "GPL-3.0", "license": "GPL-3.0",
"dependencies": { "dependencies": {
"colors": "^1.4.0", "colors": "^1.4.0",

View File

@ -1,6 +1,6 @@
{ {
"name": "mylogger", "name": "mylogger",
"version": "0.1.21", "version": "0.1.22",
"author": "Verdnatura Levante SL", "author": "Verdnatura Levante SL",
"description": "MySQL and MariaDB logger using binary log", "description": "MySQL and MariaDB logger using binary log",
"license": "GPL-3.0", "license": "GPL-3.0",