refs #5541 Fixes, refactor, RC
gitea/mylogger/pipeline/head This commit looks good Details

This commit is contained in:
Juan Ferrer 2023-04-11 10:15:06 +02:00
parent 5f754df2ce
commit 17a0f40fcd
3 changed files with 80 additions and 46 deletions

2
.gitignore vendored
View File

@ -1,3 +1,3 @@
node_modules
zongji
config/*.local.yml
config.*.yml

View File

@ -1,11 +1,12 @@
code: mylogger
debug: false
testMode: false
pingInterval: 60
flushInterval: 10
pingInterval: 3600
flushInterval: 30
restartTimeout: 30
queueFlushDelay: 100
queueFlushDelay: 200
maxBulkLog: 100
upperCaseTable: true
srcDb:
host: localhost
port: 3306

View File

@ -5,18 +5,6 @@ const path = require('path');
const ZongJi = require('./zongji');
const mysql = require('mysql2/promise');
const catchEvents = new Set([
'writerows',
'updaterows',
'deleterows'
]);
const actions = {
writerows: 'insert',
updaterows: 'update',
deleterows: 'delete'
};
module.exports = class MyLogger {
constructor() {
this.running = false;
@ -72,9 +60,14 @@ module.exports = class MyLogger {
tableMap.set(table.name, tableInfo);
}
const modelName = conf.upperCaseTable
? toUpperCamelCase(table.name)
: table.name;
Object.assign(tableInfo, {
conf: tableConf,
exclude: new Set(tableConf.exclude)
exclude: new Set(tableConf.exclude),
modelName
});
return tableInfo;
@ -239,7 +232,7 @@ module.exports = class MyLogger {
for (const [schema, tableMap] of this.schemaMap)
for (const [table, tableInfo] of tableMap) {
// Fetch relation
// Fetch relation to main table
if (!tableInfo.conf.relation && !tableInfo.isMain) {
const mainTable = tableInfo.log.mainTable;
@ -247,7 +240,7 @@ module.exports = class MyLogger {
.get(mainTable.schema)
.get(mainTable.name);
const [relations] = await db.query(
const [mainRelations] = await db.query(
`SELECT COLUMN_NAME relation
FROM information_schema.KEY_COLUMN_USAGE
WHERE TABLE_NAME = ?
@ -264,16 +257,40 @@ module.exports = class MyLogger {
]
);
if (!relations.length)
if (!mainRelations.length)
throw new Error(`No relation to main table found for table: ${schema}.${table}`);
if (relations.length > 1)
if (mainRelations.length > 1)
throw new Error(`Found more multiple relations to main table: ${schema}.${table}`);
for (const {relation} of relations)
for (const {relation} of mainRelations)
tableInfo.relation = relation;
}
// Fetch relations
// TODO: Use relations to fetch names of related entities
const [relations] = await db.query(
`SELECT
COLUMN_NAME \`col\`,
REFERENCED_TABLE_SCHEMA \`schema\`,
REFERENCED_TABLE_NAME \`table\`,
REFERENCED_COLUMN_NAME \`column\`
FROM information_schema.KEY_COLUMN_USAGE
WHERE TABLE_NAME = ?
AND TABLE_SCHEMA = ?
AND REFERENCED_TABLE_NAME IS NOT NULL`,
[
table,
schema
]
);
tableInfo.relations = new Map();
for (const {col, schema, table, column} of relations)
tableInfo.relations.set(col, {schema, table, column});
}
// Zongji
const zongji = new ZongJi(conf.srcDb);
@ -421,6 +438,10 @@ module.exports = class MyLogger {
}
}
handleError(err) {
console.error(err);
}
async onBinlog(evt) {
//evt.dump();
try {
@ -508,8 +529,7 @@ module.exports = class MyLogger {
if (!changes.length) return;
if (this.debug) {
console.debug('Log:'.blue,
`${tableName}(${changes}) [${eventName}]`);
console.debug('Log:'.blue, `[${action}]`.yellow, tableName);
}
this.queue.push({
@ -517,7 +537,6 @@ module.exports = class MyLogger {
action,
evt,
changes,
tableName,
binlogName: this.binlogName
});
if (!this.flushTimeout)
@ -531,6 +550,7 @@ module.exports = class MyLogger {
if (this.isFlushed || this.isFlushing || !this.isOk) return;
this.isFlushing = true;
const {conf, db} = this;
let op;
try {
if (this.queue.length) {
@ -538,7 +558,6 @@ module.exports = class MyLogger {
let appliedOps;
try {
await db.query('START TRANSACTION');
let op;
appliedOps = [];
for (let i = 0; i < conf.maxBulkLog && this.queue.length; i++) {
@ -566,29 +585,12 @@ module.exports = class MyLogger {
}
}
async savePosition(binlogName, binlogPosition) {
this.debug('Flush', `filename: ${binlogName}, position: ${binlogPosition}`);
const replaceQuery =
'REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?';
if (!this.conf.testMode)
await this.db.query(replaceQuery, [this.conf.code, binlogName, binlogPosition]);
this.isFlushed = this.binlogName == binlogName
&& this.binlogPosition == binlogPosition;
}
handleError(err) {
console.error(err);
}
async applyOp(op) {
const {
tableInfo,
action,
evt,
changes,
tableName
changes
} = op;
const logInfo = tableInfo.log;
@ -611,6 +613,7 @@ module.exports = class MyLogger {
break;
}
const modelName = tableInfo.modelName;
const modelId = row[tableInfo.idName];
const modelValue = tableInfo.showField
? row[tableInfo.showField] || null
@ -625,7 +628,7 @@ module.exports = class MyLogger {
if (isDelete) {
[[deleteRow]] = await logInfo.fetchStmt.execute([
tableName, modelId
modelName, modelId
]);
if (deleteRow)
await logInfo.updateStmt.execute([
@ -642,7 +645,7 @@ module.exports = class MyLogger {
row.editorFk || null,
action,
created,
tableName,
modelName,
oldInstance,
newI ? JSON.stringify(newI) : null,
modelId,
@ -652,6 +655,18 @@ module.exports = class MyLogger {
}
}
async savePosition(binlogName, binlogPosition) {
this.debug('Flush', `filename: ${binlogName}, position: ${binlogPosition}`);
const replaceQuery =
'REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?';
if (!this.conf.testMode)
await this.db.query(replaceQuery, [this.conf.code, binlogName, binlogPosition]);
this.isFlushed = this.binlogName == binlogName
&& this.binlogPosition == binlogPosition;
}
async connectionPing() {
if (!this.isOk) return;
try {
@ -676,6 +691,24 @@ module.exports = class MyLogger {
}
}
const catchEvents = new Set([
'writerows',
'updaterows',
'deleterows'
]);
const actions = {
writerows: 'insert',
updaterows: 'update',
deleterows: 'delete'
};
function toUpperCamelCase(str) {
str = str.replace(/[-_ ][a-z]/g,
match => match.charAt(1).toUpperCase());
return str.charAt(0).toUpperCase() + str.substr(1);
}
function equals(a, b) {
if (a === b)
return true;