refs #5563 Code refactor
gitea/mylogger/pipeline/head This commit looks good Details

This commit is contained in:
Juan Ferrer 2023-06-06 10:55:56 +02:00
parent 0b2a81e744
commit 3fbf48b845
6 changed files with 284 additions and 205 deletions

View File

@ -7,6 +7,9 @@ restartTimeout: 30
queueFlushDelay: 200
maxBulkLog: 25
maxQueueEvents: 10000
showCache:
maxLoops: 100
life: 300
serverId: 1
srcDb:
host: localhost

View File

@ -3,16 +3,24 @@ const {loadConfig, toUpperCamelCase} = require('./util');
const MultiMap = require('./multi-map');
module.exports = class ModelLoader {
init(conf) {
init(logger) {
const configDir = path.join(__dirname, '..');
const logsConf = this.logsConf = loadConfig(configDir, 'logs');
const conf = loadConfig(configDir, 'logs');
const schemaMap = new MultiMap();
const logMap = new Map();
const showTables = new MultiMap();
Object.assign(this, {
logger,
conf
});
Object.assign(logger, {
schemaMap,
logMap
});
for (const logName in logsConf.logs) {
const logConf = logsConf.logs[logName];
const schema = logConf.schema || conf.srcDb.database;
for (const logName in conf.logs) {
const logConf = conf.logs[logName];
const schema = logConf.schema || logger.conf.srcDb.database;
const logInfo = {
name: logName,
conf: logConf,
@ -53,7 +61,7 @@ module.exports = class ModelLoader {
let modelName = tableConf.modelName;
if (!modelName) {
modelName = logsConf.upperCaseTable
modelName = conf.upperCaseTable
? toUpperCamelCase(table.name)
: table.name;
}
@ -71,17 +79,16 @@ module.exports = class ModelLoader {
showField,
relation,
idName,
userField: tableConf.userField || logsConf.userField
userField: tableConf.userField || conf.userField
});
return tableInfo;
}
return {schemaMap, logMap, showTables};
}
async loadSchema(db, schemaMap, showTables) {
const {logsConf} = this;
async loadSchema() {
const {db, schemaMap} = this.logger;
const {conf} = this;
for (const [schema, table, tableInfo] of schemaMap) {
const tableConf = tableInfo.conf;
@ -111,7 +118,7 @@ module.exports = class ModelLoader {
if (!tableInfo.exclude.has(col) && col != tableInfo.userField)
tableInfo.columns.set(col, {type, def});
const castType = logsConf.castTypes[type];
const castType = conf.castTypes[type];
if (castType && !tableInfo.castTypes.has(col))
tableInfo.castTypes.set(col, castType);
}
@ -139,14 +146,21 @@ module.exports = class ModelLoader {
// Get show field
if (!tableConf.showField) {
for (const showField of logsConf.showFields) {
if (tableInfo.columns.has(showField)) {
tableInfo.showField = showField;
break;
if (!tableInfo.isMain) {
const {showField} = tableInfo;
if (!showField) {
for (const field of conf.showFields) {
if (tableInfo.columns.has(field)) {
tableInfo.showField = field;
break;
}
}
} else {
const match = showField.match(/(^.*)\$$/);
if (match) tableInfo.showRelation = match[1];
}
}
} else
tableInfo.showField = null;
}
// Fetch relation to main table
@ -182,110 +196,6 @@ module.exports = class ModelLoader {
tableInfo.relation = relation;
}
}
// Fetch relations with other tables
// TODO: #5563 Fetch relations and show values in fronted
showTables.clear();
for (const [schema, table, tableInfo] of schemaMap) {
const [relations] = await db.query(
`SELECT
COLUMN_NAME \`col\`,
REFERENCED_TABLE_SCHEMA \`schema\`,
REFERENCED_TABLE_NAME \`table\`
FROM information_schema.KEY_COLUMN_USAGE
WHERE TABLE_NAME = ?
AND TABLE_SCHEMA = ?
AND COLUMN_NAME IN (?)
AND REFERENCED_TABLE_NAME IS NOT NULL`,
[
table,
schema,
Array.from(tableInfo.columns.keys())
]
);
tableInfo.relations = new Map();
for (const {col, schema, table} of relations) {
if (col == tableInfo.relation) continue;
tableInfo.relations.set(col, {schema, table});
showTables.setIfEmpty(schema, table, {});
}
}
const relatedList = Array.from(showTables.keys());
// Fetch primary key of related tables
const [res] = await db.query(
`SELECT
TABLE_SCHEMA \`schema\`,
TABLE_NAME \`table\`,
COLUMN_NAME \`idName\`,
COUNT(*) nPks
FROM information_schema.\`COLUMNS\`
WHERE (TABLE_SCHEMA, TABLE_NAME) IN (?)
AND COLUMN_KEY = 'PRI'
GROUP BY TABLE_NAME, TABLE_SCHEMA
HAVING nPks = 1`,
[relatedList]
);
for (const {schema, table, idName} of res)
showTables.get(schema, table).idName = idName;
// Fetch show field of related tables
const showFields = logsConf.showFields;
const [result] = await db.query(
`SELECT
TABLE_SCHEMA \`schema\`,
TABLE_NAME \`table\`,
COLUMN_NAME \`col\`
FROM information_schema.\`COLUMNS\`
WHERE (TABLE_SCHEMA, TABLE_NAME) IN (?)
AND COLUMN_NAME IN (?)
AND COLUMN_KEY <> 'PRI'`,
[relatedList, showFields]
);
for (const {schema, table, col} of result) {
const tableInfo = showTables.get(schema, table);
let save;
if (tableInfo.showField) {
const newIndex = showFields.indexOf(col);
const oldIndex = showFields.indexOf(tableInfo.showField);
save = newIndex < oldIndex;
} else
save = true;
if (save) tableInfo.showField = col;
}
// Clean tables and relations without required information
for (const [schema, table] of relatedList) {
const tableInfo = showTables.get(schema, table);
const {idName, showField} = tableInfo;
if (!idName || !showField || idName == showField) {
showTables.delete(schema, table);
continue;
}
const sqlShowField = db.escapeId(showField);
const sqlIdName = db.escapeId(idName);
const sqlTable = `${db.escapeId(schema)}.${db.escapeId(table)}`;
tableInfo.selectStmt =
`SELECT ${sqlIdName} \`id\`, ${sqlShowField} \`val\`
FROM ${sqlTable}
WHERE ${sqlIdName} IN (?)`;
}
for (const tableInfo of schemaMap.values())
for (const [col, relation] of tableInfo.relations) {
if (!showTables.has(relation.schema, relation.table))
tableInfo.relations.delete(col);
}
}
}

232
lib/show-db.js Normal file
View File

@ -0,0 +1,232 @@
const MultiMap = require("./multi-map");
/**
* TODO: #5563 Fetch relations and show values in fronted
*/
module.exports = class ShowDb {
init(logger) {
Object.assign(this, {
logger,
conf: logger.conf.showCache,
tables: new MultiMap(),
valueDb: new MultiMap()
});
}
checkDb() {
const {conf, valueDb} = this;
const dbOutdated = this.loops % conf.maxLoops == 0
|| this.lastFlush > Date.now() + conf.life * 1000
if (dbOutdated) {
valueDb.clear();
this.loops = 0;
this.lastFlush = Date.now();
}
this.loops++;
}
async loadSchema() {
const {logger, tables} = this;
const {db, schemaMap} = logger;
tables.clear();
// Fetch relations with other tables
for (const [schema, table, tableInfo] of schemaMap) {
const [relations] = await db.query(
`SELECT
COLUMN_NAME \`col\`,
REFERENCED_TABLE_SCHEMA \`schema\`,
REFERENCED_TABLE_NAME \`table\`
FROM information_schema.KEY_COLUMN_USAGE
WHERE TABLE_NAME = ?
AND TABLE_SCHEMA = ?
AND COLUMN_NAME IN (?)
AND REFERENCED_TABLE_NAME IS NOT NULL`,
[
table,
schema,
Array.from(tableInfo.columns.keys())
]
);
tableInfo.relations = new Map();
for (const {col, schema, table} of relations) {
if (col == tableInfo.relation) continue;
tableInfo.relations.set(col, {schema, table});
tables.setIfEmpty(schema, table, {});
}
}
const relatedList = Array.from(tables.keys());
// Fetch primary key of related tables
const [res] = await db.query(
`SELECT
TABLE_SCHEMA \`schema\`,
TABLE_NAME \`table\`,
COLUMN_NAME \`idName\`,
COUNT(*) nPks
FROM information_schema.\`COLUMNS\`
WHERE (TABLE_SCHEMA, TABLE_NAME) IN (?)
AND COLUMN_KEY = 'PRI'
GROUP BY TABLE_NAME, TABLE_SCHEMA
HAVING nPks = 1`,
[relatedList]
);
for (const {schema, table, idName} of res)
tables.get(schema, table).idName = idName;
// Fetch show field of related tables
const showFields = logger.modelLoader.conf.showFields;
const [result] = await db.query(
`SELECT
TABLE_SCHEMA \`schema\`,
TABLE_NAME \`table\`,
COLUMN_NAME \`col\`
FROM information_schema.\`COLUMNS\`
WHERE (TABLE_SCHEMA, TABLE_NAME) IN (?)
AND COLUMN_NAME IN (?)
AND COLUMN_KEY <> 'PRI'`,
[relatedList, showFields]
);
for (const {schema, table, col} of result) {
const tableInfo = tables.get(schema, table);
let save;
if (tableInfo.showField) {
const newIndex = showFields.indexOf(col);
const oldIndex = showFields.indexOf(tableInfo.showField);
save = newIndex < oldIndex;
} else
save = true;
if (save) tableInfo.showField = col;
}
// Clean tables and relations without required information
for (const [schema, table] of relatedList) {
const tableInfo = tables.get(schema, table);
const {idName, showField} = tableInfo;
if (!idName || !showField || idName == showField) {
tables.delete(schema, table);
continue;
}
const sqlShowField = db.escapeId(showField);
const sqlIdName = db.escapeId(idName);
const sqlTable = `${db.escapeId(schema)}.${db.escapeId(table)}`;
tableInfo.selectStmt =
`SELECT ${sqlIdName} \`id\`, ${sqlShowField} \`val\`
FROM ${sqlTable}
WHERE ${sqlIdName} IN (?)`;
}
for (const tableInfo of schemaMap.values())
for (const [col, relation] of tableInfo.relations) {
if (!tables.has(relation.schema, relation.table))
tableInfo.relations.delete(col);
}
}
async getValues(db, ops) {
const {tables, valueDb} = this;
const showIds = new MultiMap();
this.checkDb();
// Fetch relations ids
for (const op of ops) {
const {
relations,
showRelation
} = op.tableInfo;
for (const change of op.changes) {
let rows;
if (op.action == 'update')
rows = [change.newI, change.oldI];
else
rows = [change.instance];
if (showRelation)
rows.push({[showRelation]: change.row[showRelation]});
for (const row of rows)
for (const col in row) {
const relation = relations.get(col);
if (!relation) continue;
const {schema, table} = relation;
const id = row[col];
let ids = valueDb.get(schema, table);
if (ids && ids.has(id)) continue;
ids = showIds.get(schema, table);
if (!ids) showIds.set(schema, table, ids = new Set());
ids.add(id);
}
}
}
// Query show values to database
for (const [schema, table, ids] of showIds) {
const tableInfo = tables.get(schema, table);
const [res] = await db.query(
tableInfo.selectStmt,
[Array.from(ids.keys())]
);
let cacheIds = valueDb.get(schema, table);
if (!cacheIds) valueDb.set(schema, table, cacheIds = new Map());
for (const row of res)
cacheIds.set(row.id, row.val);
}
// Fill rows with show values
for (const op of ops) {
const {
relations,
showRelation,
showField
} = op.tableInfo;
for (const change of op.changes) {
let rows;
if (op.action == 'update')
rows = [change.newI, change.oldI];
else
rows = [change.instance];
for (const row of rows)
for (const col in row) {
const relation = relations.get(col);
if (!relation) continue;
const showValue = getValue(relation, row, col);
if (showValue) row[col +'$'] = showValue;
}
const {row} = change;
if (showRelation) {
const relation = relations.get(showRelation);
change.modelValue = getValue(relation, row, showRelation);
} else if (showField)
change.modelValue = row[showField];
}
}
function getValue(relation, row, col) {
const {schema, table} = relation;
const ids = valueDb.get(schema, table);
return ids && ids.get(row[col])
}
}
}

View File

@ -4,7 +4,7 @@ const ZongJi = require('./zongji');
const mysql = require('mysql2/promise');
const {loadConfig} = require('./lib/util');
const ModelLoader = require('./lib/model-loader');
const MultiMap = require('./lib/multi-map');
const ShowDb = require('./lib/show-db');
module.exports = class MyLogger {
constructor() {
@ -15,11 +15,14 @@ module.exports = class MyLogger {
this.isFlushed = true;
this.queue = [];
this.modelLoader = new ModelLoader();
this.showDb = new ShowDb();
}
async start() {
const conf = this.conf = loadConfig(__dirname, 'config');
Object.assign(this, this.modelLoader.init(conf));
this.modelLoader.init(this);
this.showDb.init(this);
const includeSchema = {};
for (const [schemaName, tableMap] of this.schemaMap.map)
@ -61,6 +64,9 @@ module.exports = class MyLogger {
const db = this.db = await mysql.createConnection(conf.dstDb);
db.on('error', this.onErrorListener);
await this.modelLoader.loadSchema();
await this.showDb.loadSchema();
for (const logInfo of this.logMap.values()) {
const table = logInfo.table;
const sqlTable = `${db.escapeId(table.schema)}.${db.escapeId(table.name)}`;
@ -94,11 +100,6 @@ module.exports = class MyLogger {
);
}
await this.modelLoader.loadSchema(db,
this.schemaMap,
this.showTables
);
// Zongji
this.onBinlogListener = evt => this.onBinlog(evt);
@ -412,11 +413,12 @@ module.exports = class MyLogger {
do {
const ops = [];
let txStarted;
try {
for (let i = 0; i < conf.maxBulkLog && queue.length; i++)
ops.push(queue.shift());
await this.getShowValues(ops);
await this.showDb.getValues(db, ops);
await db.query('START TRANSACTION');
txStarted = true;
@ -453,72 +455,6 @@ module.exports = class MyLogger {
}
}
async getShowValues(ops) {
const {db, showTables} = this;
const showValues = new MultiMap();
// Fetch relations id
for (const op of ops) {
if (op.hasShowValues) continue;
const {relations} = op.tableInfo;
for (const change of op.changes) {
if (op.action == 'update') {
getRelationsId(relations, change.newI);
getRelationsId(relations, change.oldI);
} else
getRelationsId(relations, change.instance);
}
}
function getRelationsId(relations, row) {
for (const col in row) {
const relation = relations.get(col);
if (!relation) continue;
const {schema, table} = relation;
let ids = showValues.get(schema, table);
if (!ids) showValues.set(schema, table, ids = new Map());
if (!ids.has(row[col])) ids.set(row[col], null);
}
}
// Fetch show values
for (const [schema, table, ids] of showValues) {
const tableInfo = showTables.get(schema, table);
const [res] = await db.query(
tableInfo.selectStmt,
[Array.from(ids.keys())]
);
for (const row of res)
ids.set(row.id, row.val);
}
// Fill rows with show values
for (const op of ops) {
const {relations} = op.tableInfo;
for (const change of op.changes) {
if (op.action == 'update') {
setShowValues(relations, change.newI);
setShowValues(relations, change.oldI);
} else
setShowValues(relations, change.instance);
}
op.hasShowValues = true;
}
function setShowValues(relations, row) {
for (const col in row) {
const relation = relations.get(col);
if (!relation) continue;
const {schema, table} = relation;
const showValue = showValues.get(schema, table).get(row[col]);
if (showValue) row[`${col}$`] = showValue;
}
}
}
async applyOp(op) {
const {conf} = this;
const {
@ -533,6 +469,8 @@ module.exports = class MyLogger {
const isUpdate = action == 'update';
const isSecondary = !tableInfo.isMain;
const relation = tableInfo.relation;
const modelName = tableInfo.modelName;
const created = new Date(evt.timestamp);
for (const change of changes) {
let newI, oldI;
@ -551,12 +489,8 @@ module.exports = class MyLogger {
break;
}
const created = new Date(evt.timestamp);
const modelName = tableInfo.modelName;
const modelId = row[tableInfo.idName];
const modelValue = tableInfo.showField && isSecondary
? row[tableInfo.showField] || null
: null;
const modelValue = change.modelValue;
const oldInstance = oldI ? JSON.stringify(oldI) : null;
const originFk = isSecondary ? row[relation] : modelId;
const originChanged = isUpdate && isSecondary

4
package-lock.json generated
View File

@ -1,12 +1,12 @@
{
"name": "mylogger",
"version": "0.1.22",
"version": "0.1.23",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "mylogger",
"version": "0.1.22",
"version": "0.1.23",
"license": "GPL-3.0",
"dependencies": {
"colors": "^1.4.0",

View File

@ -1,6 +1,6 @@
{
"name": "mylogger",
"version": "0.1.22",
"version": "0.1.23",
"author": "Verdnatura Levante SL",
"description": "MySQL and MariaDB logger using binary log",
"license": "GPL-3.0",