Flushing fixes
gitea/mylogger/pipeline/head There was a failure building this commit Details

This commit is contained in:
Juan Ferrer 2023-04-10 11:03:26 +02:00
parent e57ccf6e41
commit ce14583a71
4 changed files with 139 additions and 70 deletions

4
Jenkinsfile vendored
View File

@ -6,7 +6,7 @@ pipeline {
disableConcurrentBuilds() disableConcurrentBuilds()
} }
environment { environment {
PROJECT_NAME = 'mycdc' PROJECT_NAME = 'mylogger'
STACK_NAME = "${env.PROJECT_NAME}-${env.BRANCH_NAME}" STACK_NAME = "${env.PROJECT_NAME}-${env.BRANCH_NAME}"
} }
stages { stages {
@ -17,7 +17,7 @@ pipeline {
env.VERSION = packageJson.version env.VERSION = packageJson.version
} }
configFileProvider([ configFileProvider([
configFile(fileId: "mycdc.groovy", configFile(fileId: "mylogger.groovy",
variable: 'GROOVY_FILE') variable: 'GROOVY_FILE')
]) { ]) {
load env.GROOVY_FILE load env.GROOVY_FILE

View File

@ -1,14 +1,17 @@
debug: true debug: true
testMode: false testMode: false
db: srcDb:
host: localhost
port: 3306
user: root
password: root
database: vn
dstDb:
host: localhost host: localhost
port: 3306 port: 3306
user: root user: root
password: root password: root
database: vn database: vn
showField:
- name
description
logs: logs:
ticket: ticket:
logTable: clientLog logTable: clientLog

View File

@ -3,12 +3,20 @@ debug: false
testMode: false testMode: false
pingInterval: 60 pingInterval: 60
flushInterval: 10 flushInterval: 10
db: queueFlushDelay: 100
maxBulkLog: 100
srcDb:
host: localhost host: localhost
port: 3306 port: 3306
user: zongji user: zongji
password: password password: password
database: util database: util
dstDb:
host: localhost
port: 3306
user: root
password: password
database: util
showFields: showFields:
- name - name
- description - description
@ -24,7 +32,6 @@ logs:
- image - image
- supplyResponseFk - supplyResponseFk
types: types:
id: number
isPrinted: boolean isPrinted: boolean
- name: itemTag - name: itemTag
relation: itemFk relation: itemFk

View File

@ -5,11 +5,11 @@ const path = require('path');
const ZongJi = require('./zongji'); const ZongJi = require('./zongji');
const mysql = require('mysql2/promise'); const mysql = require('mysql2/promise');
const allEvents = [ const catchEvents = new Set([
'writerows', 'writerows',
'updaterows', 'updaterows',
'deleterows' 'deleterows'
]; ]);
const actions = { const actions = {
writerows: 'insert', writerows: 'insert',
@ -20,10 +20,12 @@ const actions = {
module.exports = class MyLogger { module.exports = class MyLogger {
constructor() { constructor() {
this.running = false; this.running = false;
this.filename = null; this.binlogName = null;
this.position = null; this.binlogPosition = null;
this.schemaMap = new Map(); this.schemaMap = new Map();
this.logMap = new Map(); this.logMap = new Map();
this.isFlushed = true;
this.queue = [];
} }
async start() { async start() {
@ -35,7 +37,7 @@ module.exports = class MyLogger {
Object.assign(conf, localConfig); Object.assign(conf, localConfig);
} }
const defaultSchema = conf.db.database; const defaultSchema = conf.srcDb.database;
function parseTable(tableString) { function parseTable(tableString) {
let name, schema; let name, schema;
const split = tableString.split('.'); const split = tableString.split('.');
@ -145,7 +147,7 @@ module.exports = class MyLogger {
// DB connection // DB connection
const db = this.db = await mysql.createConnection(conf.db); const db = this.db = await mysql.createConnection(conf.dstDb);
db.on('error', this.onErrorListener); db.on('error', this.onErrorListener);
for (const logInfo of this.logMap.values()) { for (const logInfo of this.logMap.values()) {
@ -192,7 +194,7 @@ module.exports = class MyLogger {
); );
for (const {col, type, def} of dbCols) { for (const {col, type, def} of dbCols) {
if (!tableInfo.exclude.has(col)) if (!tableInfo.exclude.has(col) && col != 'editorFk')
tableInfo.columns.set(col, {type, def}); tableInfo.columns.set(col, {type, def});
const castType = conf.castTypes[type]; const castType = conf.castTypes[type];
@ -259,14 +261,6 @@ module.exports = class MyLogger {
] ]
); );
console.debug(
table,
schema,
mainTable.name,
mainTable.schema,
mainTableInfo.idName
);
if (!relations.length) if (!relations.length)
throw new Error(`No relation to main table found for table: ${schema}.${table}`); throw new Error(`No relation to main table found for table: ${schema}.${table}`);
if (relations.length > 1) if (relations.length > 1)
@ -279,7 +273,7 @@ module.exports = class MyLogger {
// Zongji // Zongji
const zongji = new ZongJi(conf.db); const zongji = new ZongJi(conf.srcDb);
this.zongji = zongji; this.zongji = zongji;
this.onBinlogListener = evt => this.onBinlog(evt); this.onBinlogListener = evt => this.onBinlog(evt);
@ -291,11 +285,11 @@ module.exports = class MyLogger {
); );
if (res.length) { if (res.length) {
const [row] = res; const [row] = res;
this.filename = row.logName; this.binlogName = row.logName;
this.position = row.position; this.binlogPosition = row.position;
Object.assign(this.opts, { Object.assign(this.opts, {
filename: this.filename, filename: row.logName,
position: this.position position: row.position
}); });
} else } else
this.opts.startAtEnd = true; this.opts.startAtEnd = true;
@ -342,6 +336,9 @@ module.exports = class MyLogger {
clearInterval(this.flushInterval); clearInterval(this.flushInterval);
clearInterval(this.pingInterval); clearInterval(this.pingInterval);
clearInterval(this.flushTimeout);
await this.flushQueue();
zongji.off('binlog', this.onBinlogListener); zongji.off('binlog', this.onBinlogListener);
zongji.off('error', this.onErrorListener); zongji.off('error', this.onErrorListener);
this.zongji = null; this.zongji = null;
@ -409,29 +406,36 @@ module.exports = class MyLogger {
} }
} }
onBinlog(evt) { async onBinlog(evt) {
//evt.dump(); //evt.dump();
try {
let shouldFlush;
const eventName = evt.getEventName(); const eventName = evt.getEventName();
let position = evt.nextPosition;
switch (eventName) { if (eventName == 'rotate') {
case 'rotate': if (evt.binlogName !== this.binlogName) {
this.filename = evt.binlogName; shouldFlush = true;
position = evt.position; this.binlogName = evt.binlogName;
console.log(`[${eventName}] filename: ${this.filename}`, `position: ${this.position}, nextPosition: ${evt.nextPosition}`); this.binlogPosition = evt.position;
break; console.log(
case 'writerows': `[${eventName}] filename: ${this.binlogName}`,
case 'deleterows': `position: ${this.binlogPosition}`
case 'updaterows': );
}
} else {
shouldFlush = true;
this.binlogPosition = evt.nextPosition;
if (catchEvents.has(eventName))
this.onRowEvent(evt, eventName); this.onRowEvent(evt, eventName);
break;
} }
this.position = position; if (shouldFlush) this.isFlushed = false;
this.flushed = false; } catch(err) {
this.handleError(err);
}
} }
async onRowEvent(evt, eventName) { onRowEvent(evt, eventName) {
const table = evt.tableMap[evt.tableId]; const table = evt.tableMap[evt.tableId];
const tableName = table.tableName; const tableName = table.tableName;
const tableMap = this.schemaMap.get(table.parentSchema); const tableMap = this.schemaMap.get(table.parentSchema);
@ -450,7 +454,6 @@ module.exports = class MyLogger {
return !!value; return !!value;
default: default:
return value; return value;
} }
} }
@ -464,7 +467,6 @@ module.exports = class MyLogger {
for (const col in before) { for (const col in before) {
if (columns.has(col) if (columns.has(col)
&& after[col] !== undefined
&& !equals(after[col], before[col])) { && !equals(after[col], before[col])) {
if (before[col] !== null) if (before[col] !== null)
oldI[col] = castValue(col, before[col]); oldI[col] = castValue(col, before[col]);
@ -480,12 +482,10 @@ module.exports = class MyLogger {
for (const row of evt.rows) { for (const row of evt.rows) {
const instance = {}; const instance = {};
for (const col of cols) { for (const col of cols) {
if (row[col] !== null) if (row[col] !== null)
instance[col] = castValue(col, row[col]); instance[col] = castValue(col, row[col]);
} }
changes.push({row, instance}); changes.push({row, instance});
} }
} }
@ -497,6 +497,85 @@ module.exports = class MyLogger {
`${tableName}(${changes}) [${eventName}]`); `${tableName}(${changes}) [${eventName}]`);
} }
this.queue.push({
tableInfo,
action,
evt,
changes,
tableName,
binlogName: this.binlogName
});
if (!this.flushTimeout)
this.flushTimeout = setTimeout(
() => this.flushQueue(),
this.conf.queueFlushDelay
);
}
async flushQueue() {
if (this.isFlushed || this.isFlushing) return;
this.isFlushing = true;
const {conf, db} = this;
try {
if (this.queue.length) {
do {
let appliedOps;
try {
await db.query('START TRANSACTION');
let op;
appliedOps = [];
for (let i = 0; i < conf.maxBulkLog && this.queue.length; i++) {
op = this.queue.shift();
appliedOps.push(op);
await this.applyOp(op);
}
await this.savePosition(op.binlogName, op.evt.nextPosition)
await db.query('COMMIT');
} catch(err) {
this.queue = appliedOps.concat(this.queue);
await db.query('ROLLBACK');
throw err;
}
} while (this.queue.length);
} else {
await this.savePosition(this.binlogName, this.binlogPosition);
}
} catch(err) {
this.handleError(err);
} finally {
this.flushTimeout = null;
this.isFlushing = false;
}
}
async savePosition(binlogName, binlogPosition) {
this.debug('Flush', `filename: ${binlogName}, position: ${binlogPosition}`);
const replaceQuery =
'REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?';
if (!this.conf.testMode)
await this.db.query(replaceQuery, [this.conf.code, binlogName, binlogPosition]);
this.isFlushed = this.binlogName == binlogName
&& this.binlogPosition == binlogPosition;
}
handleError(err) {
console.error('Super error:', err);
}
async applyOp(op) {
const {
tableInfo,
action,
evt,
changes,
tableName
} = op;
const logInfo = tableInfo.log; const logInfo = tableInfo.log;
const isDelete = action == 'delete'; const isDelete = action == 'delete';
@ -558,26 +637,6 @@ module.exports = class MyLogger {
} }
} }
async flushQueue() {
if (this.flushed) return;
const position = this.nextPosition;
if (position) {
const filename = this.nextFilename;
this.debug('Flush', `filename: ${filename}, position: ${position}`);
const replaceQuery =
'REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?';
if (!this.conf.testMode)
await this.db.query(replaceQuery, [this.conf.code, filename, position]);
this.flushed = true;
}
this.nextFilename = this.filename;
this.nextPosition = this.position;
}
async connectionPing() { async connectionPing() {
this.debug('Ping', 'Sending ping to database.'); this.debug('Ping', 'Sending ping to database.');