634 lines
16 KiB
JavaScript
634 lines
16 KiB
JavaScript
require("require-yaml");
|
|
require("colors");
|
|
const ZongJi = require("./zongji");
|
|
const mysql = require("mysql2/promise");
|
|
const { loadConfig } = require("./lib/util");
|
|
const ModelLoader = require("./lib/model-loader");
|
|
const ShowDb = require("./lib/show-db");
|
|
const Salix = require("./lib/salix");
|
|
|
|
module.exports = class MyLogger {
|
|
constructor() {
|
|
this.running = false;
|
|
this.isOk = null;
|
|
this.binlogName = null;
|
|
this.binlogPosition = null;
|
|
this.isFlushed = true;
|
|
this.queue = [];
|
|
this.modelLoader = new ModelLoader();
|
|
this.showDb = new ShowDb();
|
|
}
|
|
|
|
async start() {
|
|
const conf = (this.conf = loadConfig(__dirname, "config"));
|
|
const salix = new Salix();
|
|
await salix.init(this);
|
|
// await salix.salixLogInfo();
|
|
this.modelLoader.init(this, salix.logInfo);
|
|
this.showDb.init(this);
|
|
|
|
const includeSchema = {};
|
|
for (const [schemaName, tableMap] of this.schemaMap.map)
|
|
includeSchema[schemaName] = Array.from(tableMap.keys());
|
|
|
|
this.zongjiOpts = {
|
|
includeEvents: [
|
|
"rotate",
|
|
"tablemap",
|
|
"writerows",
|
|
"updaterows",
|
|
"deleterows",
|
|
],
|
|
includeSchema,
|
|
serverId: conf.serverId,
|
|
};
|
|
|
|
if (conf.testMode)
|
|
console.log("Test mode enabled, just logging queries to console.");
|
|
|
|
console.log("Starting process.");
|
|
await this.init();
|
|
console.log("Process started.");
|
|
}
|
|
|
|
async stop() {
|
|
console.log("Stopping process.");
|
|
await this.end();
|
|
console.log("Process stopped.");
|
|
}
|
|
|
|
async init() {
|
|
const { conf } = this;
|
|
this.debug("MyLogger", "Initializing.");
|
|
this.onErrorListener = (err) => this.onError(err);
|
|
|
|
// DB connection
|
|
|
|
const db = (this.db = await mysql.createConnection(conf.dstDb));
|
|
db.on("error", this.onErrorListener);
|
|
|
|
await this.modelLoader.loadSchema();
|
|
await this.showDb.loadSchema();
|
|
|
|
for (const logInfo of this.logMap.values()) {
|
|
const table = logInfo.table;
|
|
const sqlTable = `${db.escapeId(table.schema)}.${db.escapeId(
|
|
table.name
|
|
)}`;
|
|
logInfo.addStmt = await db.prepare(
|
|
`INSERT INTO ${sqlTable}
|
|
SET originFk = ?,
|
|
userFk = ?,
|
|
action = ?,
|
|
creationDate = ?,
|
|
changedModel = ?,
|
|
oldInstance = ?,
|
|
newInstance = ?,
|
|
changedModelId = ?,
|
|
changedModelValue = ?`
|
|
);
|
|
logInfo.fetchStmt = await db.prepare(
|
|
`SELECT id FROM ${sqlTable}
|
|
WHERE changedModel = ?
|
|
AND changedModelId = ?
|
|
AND action = 'delete'
|
|
AND (originFk IS NULL OR originFk = ?)
|
|
LIMIT 1`
|
|
);
|
|
logInfo.updateStmt = await db.prepare(
|
|
`UPDATE ${sqlTable}
|
|
SET originFk = ?,
|
|
creationDate = ?,
|
|
oldInstance = ?,
|
|
changedModelValue = ?
|
|
WHERE id = ?`
|
|
);
|
|
}
|
|
|
|
// Zongji
|
|
|
|
this.onBinlogListener = (evt) => this.onBinlog(evt);
|
|
|
|
const [res] = await db.query(
|
|
"SELECT `logName`, `position` FROM `util`.`binlogQueue` WHERE code = ?",
|
|
[conf.code]
|
|
);
|
|
if (res.length) {
|
|
const [row] = res;
|
|
this.binlogName = row.logName;
|
|
this.binlogPosition = row.position;
|
|
}
|
|
|
|
await this.zongjiStart();
|
|
|
|
this.flushInterval = setInterval(
|
|
() => this.flushQueue(),
|
|
conf.flushInterval * 1000
|
|
);
|
|
this.pingInterval = setInterval(
|
|
() => this.connectionPing(),
|
|
conf.pingInterval * 1000
|
|
);
|
|
|
|
// Summary
|
|
|
|
this.running = true;
|
|
this.isOk = true;
|
|
this.debug("MyLogger", "Initialized.");
|
|
}
|
|
|
|
async end(silent) {
|
|
if (!this.running) return;
|
|
this.running = false;
|
|
this.debug("MyLogger", "Ending.");
|
|
|
|
this.db.off("error", this.onErrorListener);
|
|
|
|
clearInterval(this.flushInterval);
|
|
clearInterval(this.pingInterval);
|
|
clearInterval(this.flushTimeout);
|
|
|
|
function logError(err) {
|
|
if (!silent) console.error(err);
|
|
}
|
|
|
|
try {
|
|
await this.flushQueue();
|
|
} catch (err) {
|
|
logError(err);
|
|
}
|
|
|
|
// Zongji
|
|
|
|
await this.zongjiStop();
|
|
this.zongji = null;
|
|
|
|
// DB connection
|
|
|
|
// FIXME: mysql2/promise bug, db.end() ends process
|
|
this.db.on("error", () => {});
|
|
try {
|
|
this.db.end();
|
|
} catch (err) {
|
|
logError(err);
|
|
}
|
|
|
|
// Summary
|
|
|
|
this.debug("MyLogger", "Ended.");
|
|
}
|
|
|
|
async zongjiStart() {
|
|
await this.zongjiStop();
|
|
const zongji = new ZongJi(this.conf.srcDb);
|
|
const zongjiOpts = this.zongjiOpts;
|
|
|
|
if (this.binlogName) {
|
|
this.debug(
|
|
"Zongji",
|
|
`Starting: ${this.binlogName}, position: ${this.binlogPosition}`
|
|
);
|
|
Object.assign(zongjiOpts, {
|
|
filename: this.binlogName,
|
|
position: this.binlogPosition,
|
|
});
|
|
} else {
|
|
this.debug("Zongji", "Starting at end.");
|
|
zongjiOpts.startAtEnd = true;
|
|
}
|
|
|
|
zongji.on("binlog", this.onBinlogListener);
|
|
|
|
await new Promise((resolve, reject) => {
|
|
const onReady = () => {
|
|
zongji.off("error", onError);
|
|
resolve();
|
|
};
|
|
const onError = (err) => {
|
|
zongji.off("ready", onReady);
|
|
zongji.off("binlog", this.onBinlogListener);
|
|
reject(err);
|
|
};
|
|
|
|
zongji.once("ready", onReady);
|
|
zongji.once("error", onError);
|
|
zongji.start(zongjiOpts);
|
|
});
|
|
zongji.on("error", this.onErrorListener);
|
|
this.zongji = zongji;
|
|
this.debug("Zongji", "Started.");
|
|
}
|
|
|
|
async zongjiStop() {
|
|
if (!this.zongji) return;
|
|
this.debug(
|
|
"Zongji",
|
|
`Stopping: ${this.binlogName}, position: ${this.binlogPosition}`
|
|
);
|
|
const zongji = this.zongji;
|
|
this.zongji = null;
|
|
|
|
zongji.off("binlog", this.onBinlogListener);
|
|
zongji.off("error", this.onErrorListener);
|
|
|
|
// FIXME: Cannot call Zongji.stop(), it doesn't wait to end connection
|
|
zongji.connection.destroy(() => {
|
|
console.log("zongji.connection.destroy");
|
|
});
|
|
await new Promise((resolve) => {
|
|
zongji.ctrlConnection.query(
|
|
"KILL ?",
|
|
[zongji.connection.threadId],
|
|
(err) => {
|
|
// if (err && err.code !== 'ER_NO_SUCH_THREAD');
|
|
// console.error(err);
|
|
resolve();
|
|
}
|
|
);
|
|
});
|
|
zongji.ctrlConnection.destroy(() => {
|
|
console.log("zongji.ctrlConnection.destroy");
|
|
});
|
|
zongji.emit("stopped");
|
|
this.debug("Zongji", "Stopped.");
|
|
}
|
|
|
|
async tryRestart() {
|
|
try {
|
|
await this.init();
|
|
console.log("Process restarted.");
|
|
} catch (err) {
|
|
setTimeout(() => this.tryRestart(), this.conf.restartTimeout * 1000);
|
|
}
|
|
}
|
|
|
|
async onError(err) {
|
|
if (!this.isOk) return;
|
|
this.isOk = false;
|
|
console.log(`Error: ${err.code}: ${err.message}`);
|
|
|
|
try {
|
|
await this.end(true);
|
|
} catch (e) {}
|
|
|
|
switch (err.code) {
|
|
case "PROTOCOL_CONNECTION_LOST":
|
|
case "ECONNRESET":
|
|
console.log("Trying to restart process.");
|
|
await this.tryRestart();
|
|
break;
|
|
default:
|
|
process.exit();
|
|
}
|
|
}
|
|
|
|
handleError(err) {
|
|
console.error(err);
|
|
}
|
|
|
|
async onBinlog(evt) {
|
|
//evt.dump();
|
|
try {
|
|
let shouldFlush;
|
|
const eventName = evt.getEventName();
|
|
|
|
if (eventName == "rotate") {
|
|
if (evt.binlogName !== this.binlogName) {
|
|
shouldFlush = true;
|
|
this.binlogName = evt.binlogName;
|
|
this.binlogPosition = evt.position;
|
|
console.log(
|
|
`[${eventName}] filename: ${this.binlogName}`,
|
|
`position: ${this.binlogPosition}`
|
|
);
|
|
}
|
|
} else {
|
|
shouldFlush = true;
|
|
this.binlogPosition = evt.nextPosition;
|
|
if (catchEvents.has(eventName)) this.onRowEvent(evt, eventName);
|
|
}
|
|
|
|
if (shouldFlush) this.isFlushed = false;
|
|
|
|
if (this.queue.length > this.conf.maxQueueEvents) {
|
|
this.debug("MyLogger", "Queue full, stopping Zongji.");
|
|
await this.zongjiStop();
|
|
}
|
|
} catch (err) {
|
|
this.handleError(err);
|
|
}
|
|
}
|
|
|
|
onRowEvent(evt, eventName) {
|
|
const table = evt.tableMap[evt.tableId];
|
|
const tableName = table.tableName;
|
|
const tableInfo = this.schemaMap.get(table.parentSchema, tableName);
|
|
if (!tableInfo) return;
|
|
|
|
const action = actions[eventName];
|
|
const { rowExcludeField } = tableInfo;
|
|
const changes = [];
|
|
|
|
function isExcluded(row) {
|
|
return rowExcludeField && row[rowExcludeField];
|
|
}
|
|
|
|
function cast(value, type) {
|
|
if (value == null || !type) return value;
|
|
|
|
const fn = castFn[type];
|
|
return fn ? fn(value) : value;
|
|
}
|
|
|
|
function equals(a, b) {
|
|
if (a === b) return true;
|
|
const type = typeof a;
|
|
if (a == null || b == null || type !== typeof b) return false;
|
|
if (type === "object" && a.constructor === b.constructor) {
|
|
if (a instanceof Date) {
|
|
// FIXME: zongji creates invalid dates for NULL DATE
|
|
// Error is somewhere here: zongji/lib/rows_event.js:129
|
|
let aTime = a.getTime();
|
|
if (isNaN(aTime)) aTime = null;
|
|
let bTime = b.getTime();
|
|
if (isNaN(bTime)) bTime = null;
|
|
|
|
return aTime === bTime;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
const { castTypes } = tableInfo;
|
|
|
|
if (action == "update") {
|
|
const cols = tableInfo.columns;
|
|
|
|
for (const row of evt.rows) {
|
|
const after = row.after;
|
|
if (isExcluded(after)) continue;
|
|
|
|
const before = row.before;
|
|
const oldI = {};
|
|
const newI = {};
|
|
let nColsChanged = 0;
|
|
|
|
for (const col in before) {
|
|
if (!cols.has(col)) continue;
|
|
const type = castTypes.get(col);
|
|
const oldValue = cast(before[col], type);
|
|
const newValue = cast(after[col], type);
|
|
if (!equals(oldValue, newValue)) {
|
|
oldI[col] = oldValue;
|
|
newI[col] = newValue;
|
|
nColsChanged++;
|
|
}
|
|
}
|
|
|
|
if (nColsChanged) changes.push({ row: after, oldI, newI });
|
|
}
|
|
} else {
|
|
const cols = tableInfo.instanceColumns;
|
|
|
|
for (const row of evt.rows) {
|
|
if (isExcluded(row)) continue;
|
|
|
|
const instance = {};
|
|
for (const col of cols) {
|
|
if (row[col] == null) continue;
|
|
const type = castTypes.get(col);
|
|
instance[col] = cast(row[col], type);
|
|
}
|
|
|
|
changes.push({ row, instance });
|
|
}
|
|
}
|
|
|
|
if (!changes.length) return;
|
|
|
|
this.queue.push({
|
|
tableInfo,
|
|
action,
|
|
evt,
|
|
changes,
|
|
binlogName: this.binlogName,
|
|
});
|
|
|
|
if (!this.flushTimeout)
|
|
this.flushTimeout = setTimeout(
|
|
() => this.flushQueue(),
|
|
this.conf.queueFlushDelay
|
|
);
|
|
|
|
if (this.conf.debug)
|
|
console.debug(
|
|
"Evt:".blue,
|
|
`[${action}]`[actionColor[action]],
|
|
`${tableName}: ${changes.length} changes, queue: ${this.queue.length} elements`
|
|
);
|
|
}
|
|
|
|
async flushQueue() {
|
|
if (this.isFlushed || this.isFlushing || !this.isOk) return;
|
|
this.isFlushing = true;
|
|
const { conf, db, queue } = this;
|
|
let op;
|
|
|
|
try {
|
|
if (queue.length) {
|
|
do {
|
|
const ops = [];
|
|
let txStarted;
|
|
|
|
try {
|
|
for (let i = 0; i < conf.maxBulkLog && queue.length; i++)
|
|
ops.push(queue.shift());
|
|
|
|
await this.showDb.getValues(db, ops);
|
|
|
|
await db.query("START TRANSACTION");
|
|
txStarted = true;
|
|
|
|
for (op of ops) await this.applyOp(op);
|
|
|
|
this.debug(
|
|
"Queue",
|
|
`applied: ${ops.length}, remaining: ${queue.length}`
|
|
);
|
|
await this.savePosition(op.binlogName, op.evt.nextPosition);
|
|
|
|
await db.query("COMMIT");
|
|
} catch (err) {
|
|
queue.unshift(...ops);
|
|
if (txStarted)
|
|
try {
|
|
await db.query("ROLLBACK");
|
|
} catch (err) {}
|
|
throw err;
|
|
}
|
|
} while (queue.length);
|
|
|
|
if (!this.zongji) {
|
|
this.debug("MyLogger", "Queue flushed, restarting Zongji.");
|
|
await this.zongjiStart();
|
|
}
|
|
} else {
|
|
await this.savePosition(this.binlogName, this.binlogPosition);
|
|
}
|
|
} catch (err) {
|
|
this.handleError(err);
|
|
} finally {
|
|
this.flushTimeout = null;
|
|
this.isFlushing = false;
|
|
}
|
|
}
|
|
|
|
async applyOp(op) {
|
|
const { conf } = this;
|
|
const { tableInfo, action, evt, changes } = op;
|
|
const { logInfo, isMain, relation, modelName, logFields } = tableInfo;
|
|
|
|
const isDelete = action == "delete";
|
|
const isUpdate = action == "update";
|
|
const created = new Date(evt.timestamp);
|
|
|
|
for (const change of changes) {
|
|
let newI, oldI;
|
|
const row = change.row;
|
|
|
|
switch (action) {
|
|
case "update":
|
|
newI = change.newI;
|
|
oldI = change.oldI;
|
|
if (logFields) {
|
|
for (const field of logFields)
|
|
if (newI[field] === undefined) newI[field] = row[field];
|
|
}
|
|
break;
|
|
case "insert":
|
|
newI = change.instance;
|
|
break;
|
|
case "delete":
|
|
oldI = change.instance;
|
|
break;
|
|
}
|
|
|
|
const modelId = row[tableInfo.idName];
|
|
const modelValue = change.modelValue ?? null;
|
|
const oldInstance = oldI ? JSON.stringify(oldI) : null;
|
|
const originFk = !isMain ? row[relation] : modelId;
|
|
const originChanged = isUpdate && !isMain && newI[relation] !== undefined;
|
|
|
|
let deleteRow;
|
|
if (conf.debug)
|
|
console.debug(
|
|
"Log:".blue,
|
|
`[${action}]`[actionColor[action]],
|
|
`${logInfo.name}: ${originFk}, ${modelName}: ${modelId}`
|
|
);
|
|
|
|
try {
|
|
if (isDelete) {
|
|
[[deleteRow]] = await logInfo.fetchStmt.execute([
|
|
modelName,
|
|
modelId,
|
|
originFk,
|
|
]);
|
|
if (!conf.testMode && deleteRow)
|
|
await logInfo.updateStmt.execute([
|
|
originFk,
|
|
created,
|
|
oldInstance,
|
|
modelValue,
|
|
deleteRow.id,
|
|
]);
|
|
}
|
|
if (!conf.testMode && (!isDelete || !deleteRow)) {
|
|
async function log(originFk) {
|
|
if (originFk == null) return;
|
|
await logInfo.addStmt.execute([
|
|
originFk,
|
|
row[tableInfo.userField] ?? null,
|
|
action,
|
|
created,
|
|
modelName,
|
|
oldInstance,
|
|
newI ? JSON.stringify(newI) : null,
|
|
modelId,
|
|
modelValue,
|
|
]);
|
|
}
|
|
|
|
if (originChanged) await log(oldI[relation]);
|
|
await log(originFk);
|
|
}
|
|
} catch (err) {
|
|
if (err.code == "ER_NO_REFERENCED_ROW_2") {
|
|
this.debug("Log", `Ignored because of constraint failed.`);
|
|
} else throw err;
|
|
}
|
|
}
|
|
}
|
|
|
|
async savePosition(binlogName, binlogPosition) {
|
|
this.debug("Flush", `filename: ${binlogName}, position: ${binlogPosition}`);
|
|
|
|
const replaceQuery =
|
|
"REPLACE INTO `util`.`binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?";
|
|
if (!this.conf.testMode)
|
|
await this.db.query(replaceQuery, [
|
|
this.conf.code,
|
|
binlogName,
|
|
binlogPosition,
|
|
]);
|
|
|
|
this.isFlushed =
|
|
this.binlogName == binlogName && this.binlogPosition == binlogPosition;
|
|
}
|
|
|
|
async connectionPing() {
|
|
if (!this.isOk) return;
|
|
try {
|
|
this.debug("Ping", "Sending ping to database.");
|
|
|
|
if (this.zongji) {
|
|
// FIXME: Should Zongji.connection be pinged?
|
|
await new Promise((resolve, reject) => {
|
|
this.zongji.ctrlConnection.ping((err) => {
|
|
if (err) return reject(err);
|
|
resolve();
|
|
});
|
|
});
|
|
}
|
|
|
|
await this.db.ping();
|
|
} catch (err) {
|
|
this.handleError(err);
|
|
}
|
|
}
|
|
|
|
debug(namespace, message) {
|
|
if (this.conf.debug) console.debug(`${namespace}:`.blue, message.yellow);
|
|
}
|
|
};
|
|
|
|
const catchEvents = new Set(["writerows", "updaterows", "deleterows"]);
|
|
|
|
const actions = {
|
|
writerows: "insert",
|
|
updaterows: "update",
|
|
deleterows: "delete",
|
|
};
|
|
|
|
const actionColor = {
|
|
insert: "green",
|
|
update: "yellow",
|
|
delete: "red",
|
|
};
|
|
|
|
const castFn = {
|
|
boolean: function (value) {
|
|
return !!value;
|
|
},
|
|
};
|