132 lines
3.2 KiB
JavaScript
132 lines
3.2 KiB
JavaScript
|
const ZongJi = require('./zongji');
|
||
|
const mysql = require('mysql2/promise');
|
||
|
const config = require('./config.json');
|
||
|
|
||
|
const zongji = new ZongJi(config.db);
|
||
|
const schemaMap = new Map();
|
||
|
|
||
|
const allEvents = new Set([
|
||
|
'writerows',
|
||
|
'updaterows',
|
||
|
'deleterows'
|
||
|
]);
|
||
|
|
||
|
let db;
|
||
|
let nextPosition;
|
||
|
let filename;
|
||
|
const fks = new Set();
|
||
|
|
||
|
async function main() {
|
||
|
db = await mysql.createConnection(config.db);
|
||
|
|
||
|
const includeSchema = {};
|
||
|
for (const schemaName in config.includeSchema) {
|
||
|
const schema = config.includeSchema[schemaName];
|
||
|
const tables = [];
|
||
|
const tableMap = new Map();
|
||
|
|
||
|
for (const tableName in schema) {
|
||
|
const table = schema[tableName];
|
||
|
tables.push(tableName);
|
||
|
|
||
|
const tableInfo = {
|
||
|
events: allEvents,
|
||
|
columns: true,
|
||
|
fk: 'id'
|
||
|
};
|
||
|
tableMap.set(tableName, tableInfo);
|
||
|
|
||
|
if (typeof table === 'object') {
|
||
|
if (Array.isArray(table.events))
|
||
|
tableInfo.events = new Set(table.events);
|
||
|
if (Array.isArray(table.columns))
|
||
|
tableInfo.columns = new Set(table.columns);
|
||
|
if (table.fk)
|
||
|
tableInfo.fk = table.fk;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
includeSchema[schemaName] = tables;
|
||
|
schemaMap.set(schemaName, tableMap);
|
||
|
}
|
||
|
|
||
|
const opts = {
|
||
|
includeEvents: config.includeEvents,
|
||
|
includeSchema
|
||
|
};
|
||
|
|
||
|
const [res] = await db.query(config.queries.getPos);
|
||
|
if (res.length) {
|
||
|
const [row] = res;
|
||
|
filename = row.logName;
|
||
|
position = row.position;
|
||
|
Object.assign(opts, {filename, position});
|
||
|
} else
|
||
|
opts.startAtEnd = true;
|
||
|
|
||
|
zongji.start(opts);
|
||
|
setInterval(flushQueue, config.interval);
|
||
|
console.log('Listenig binary log events.');
|
||
|
|
||
|
process.on('SIGINT', async function() {
|
||
|
console.log('Got SIGINT.');
|
||
|
zongji.stop();
|
||
|
await db.end();
|
||
|
process.exit();
|
||
|
});
|
||
|
}
|
||
|
|
||
|
async function flushQueue() {
|
||
|
console.log('==========================================================')
|
||
|
console.log('Flush:', `filename=${filename}`, `position=${nextPosition}`);
|
||
|
console.log(fks);
|
||
|
if (!fks.size) return;
|
||
|
|
||
|
const ids = [];
|
||
|
for (const fk of fks) ids.push([fk]);
|
||
|
await db.query(config.queries.queue, [ids]);
|
||
|
await db.query(config.queries.setPos, [filename, nextPosition]);
|
||
|
fks.clear();
|
||
|
}
|
||
|
|
||
|
zongji.on('binlog', function(evt) {
|
||
|
//evt.dump();
|
||
|
const eventName = evt.getEventName();
|
||
|
const table = evt.tableMap[evt.tableId];
|
||
|
if (eventName === 'tablemap') return;
|
||
|
|
||
|
const tableMap = schemaMap.get(table.parentSchema);
|
||
|
if (!tableMap) return;
|
||
|
|
||
|
const tableInfo = tableMap.get(table.tableName);
|
||
|
if (!tableInfo) return;
|
||
|
|
||
|
if (!tableInfo.events.has(eventName)) return;
|
||
|
|
||
|
if (eventName === 'updaterows') {
|
||
|
if (tableInfo.columns !== true) {
|
||
|
for (const row of evt.rows) {
|
||
|
const after = row.after;
|
||
|
for (const col in after) {
|
||
|
if (tableInfo.columns.has(col) && row.before[col] !== after[col]) {
|
||
|
fks.add(after[tableInfo.fk]);
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
} else {
|
||
|
for (const row of evt.rows)
|
||
|
fks.add(row.after[tableInfo.fk]);
|
||
|
}
|
||
|
} else {
|
||
|
for (const row of evt.rows)
|
||
|
fks.add(row[tableInfo.fk]);
|
||
|
}
|
||
|
|
||
|
console.log(`[${eventName}] ${table.tableName}: ${evt.rows.length}`);
|
||
|
nextPosition = evt.nextPosition;
|
||
|
filename = zongji.options.filename;
|
||
|
});
|
||
|
|
||
|
main();
|