mycdc/db-async.js

350 lines
8.7 KiB
JavaScript

const ZongJi = require('./zongji');
const mysql = require('mysql2/promise');
const amqp = require('amqplib');
require('colors');
const allEvents = new Set([
'writerows',
'updaterows',
'deleterows'
]);
module.exports = class DbAsync {
constructor(config) {
this.config = config;
this.running = false;
this.filename = null;
this.position = null;
this.schemaMap = new Map();
this.fks = new Set();
const includeSchema = {};
for (const schemaName in this.config.includeSchema) {
const schema = this.config.includeSchema[schemaName];
const tables = [];
const tableMap = new Map();
for (const tableName in schema) {
const table = schema[tableName];
tables.push(tableName);
const tableInfo = {
events: allEvents,
columns: true,
fk: 'id'
};
tableMap.set(tableName, tableInfo);
if (typeof table === 'object') {
if (Array.isArray(table.events))
tableInfo.events = new Set(table.events);
if (Array.isArray(table.columns))
tableInfo.columns = new Set(table.columns);
if (table.fk)
tableInfo.fk = table.fk;
}
}
includeSchema[schemaName] = tables;
this.schemaMap.set(schemaName, tableMap);
}
this.opts = {
includeEvents: this.config.includeEvents,
includeSchema
};
}
async start() {
if (this.config.testMode)
console.log('Test mode enabled, just logging queries to console.');
console.log('Starting process.');
await this.init();
console.log('Process started.');
}
async stop() {
console.log('Stopping process.');
await this.end();
console.log('Process stopped.');
}
async init() {
this.debug('DbAsync', 'Initializing.');
this.onErrorListener = err => this.onError(err);
// DB connection
this.db = await mysql.createConnection(this.config.db);
this.db.on('error', this.onErrorListener);
// RabbitMQ
this.publisher = await amqp.connect(this.config.amqp);
this.channel = await this.publisher.createChannel();
this.channel.assertQueue(this.config.queue, {
durable: true
});
// Zongji
const zongji = new ZongJi(this.config.db);
this.zongji = zongji;
this.onBinlogListener = evt => this.onBinlog(evt);
zongji.on('binlog', this.onBinlogListener);
const [res] = await this.db.query(
'SELECT `logName`, `position` FROM `binlogQueue` WHERE code = ?',
[this.config.queue]
);
if (res.length) {
const [row] = res;
this.filename = row.logName;
this.position = row.position;
Object.assign(this.opts, {
filename: this.filename,
position: this.position
});
} else
this.opts.startAtEnd = true;
this.debug('Zongji', 'Starting.');
await new Promise((resolve, reject) => {
const onReady = () => {
zongji.off('error', onError);
resolve();
};
const onError = err => {
this.zongji = null;
zongji.off('ready', onReady);
zongji.off('binlog', this.onBinlogListener);
reject(err);
}
zongji.once('ready', onReady);
zongji.once('error', onError);
zongji.start(this.opts);
});
this.debug('Zongji', 'Started.');
this.zongji.on('error', this.onErrorListener);
this.flushInterval = setInterval(
() => this.flushQueue(), this.config.flushInterval);
this.pingInterval = setInterval(
() => this.connectionPing(), this.config.pingInterval * 1000);
// Summary
this.running = true;
this.debug('DbAsync', 'Initialized.');
}
async end(silent) {
const zongji = this.zongji;
if (!zongji) return;
this.debug('DbAsync', 'Ending.');
// Zongji
clearInterval(this.flushInterval);
clearInterval(this.pingInterval);
zongji.off('binlog', this.onBinlogListener);
zongji.off('error', this.onErrorListener);
this.zongji = null;
this.running = false;
this.debug('Zongji', 'Stopping.');
// FIXME: Cannot call Zongji.stop(), it doesn't wait to end connection
zongji.connection.destroy(() => {
console.log('zongji.connection.destroy');
});
await new Promise(resolve => {
zongji.ctrlConnection.query('KILL ' + zongji.connection.threadId,
err => {
if (err && !silent)
console.error(err);
resolve();
});
});
zongji.ctrlConnection.destroy(() => {
console.log('zongji.ctrlConnection.destroy');
});
zongji.emit('stopped');
this.debug('Zongji', 'Stopped.');
// RabbitMQ
await this.publisher.close();
// DB connection
this.db.off('error', this.onErrorListener);
// FIXME: mysql2/promise bug, db.end() ends process
this.db.on('error', () => {});
try {
await this.db.end();
} catch (err) {
if (!silent)
console.error(err);
}
// Summary
this.debug('DbAsync', 'Ended.');
}
async tryRestart() {
try {
await this.init();
console.log('Process restarted.');
} catch(err) {
setTimeout(() => this.tryRestart(), 30);
}
}
async onError(err) {
console.log(`Error: ${err.code}: ${err.message}`);
try {
await this.end(true);
} catch(e) {}
switch (err.code) {
case 'PROTOCOL_CONNECTION_LOST':
case 'ECONNRESET':
console.log('Trying to restart process.');
await this.tryRestart();
break;
default:
process.exit();
}
}
onBinlog(evt) {
//evt.dump();
const eventName = evt.getEventName();
if (eventName === 'tablemap') return;
if (eventName === 'rotate') {
this.filename = evt.binlogName;
this.position = evt.position;
console.log(`[${eventName}] filename: ${this.filename}`, `position: ${this.position}`);
return;
}
const table = evt.tableMap[evt.tableId];
const tableMap = this.schemaMap.get(table.parentSchema);
if (!tableMap) return;
const tableInfo = tableMap.get(table.tableName);
if (!tableInfo) return;
if (!tableInfo.events.has(eventName)) return;
let column;
const rows = evt.rows;
const fks = new Set();
if (eventName === 'updaterows') {
if (tableInfo.columns !== true) {
let changes = false;
for (const row of rows) {
const after = row.after;
for (const col in after) {
if (tableInfo.columns.has(col) && !equals(after[col], row.before[col])) {
fks.add(after[tableInfo.fk]);
changes = true;
if (!column) column = col;
break;
}
}
}
if (!changes) return;
} else {
for (const row of rows)
fks.add(row.after[tableInfo.fk]);
}
} else {
for (const row of rows)
fks.add(row[tableInfo.fk]);
}
if (fks.size) {
const data = JSON.stringify(Array.from(fks));
this.channel.sendToQueue(this.config.queue,
Buffer.from(data));
this.debug('Queued', data);
}
const row = eventName === 'updaterows'
? rows[0].after
: rows[0];
if (this.config.debug) {
console.debug(`[${eventName}] ${table.tableName}: ${rows.length}`);
console.debug(` ${tableInfo.fk}: ${row[tableInfo.fk]}`);
if (column) {
let before = formatValue(rows[0].before[column]);
let after = formatValue(rows[0].after[column]);
console.debug(` ${column}: ${before} <- ${after}`);
}
}
this.position = evt.nextPosition;
this.flushed = false;
}
async flushQueue() {
if (this.flushed) return;
this.debug('Flush', `filename: ${this.filename}, position: ${this.position}`);
const replaceQuery =
'REPLACE INTO `binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?';
if (!this.config.testMode)
await this.db.query(replaceQuery, [this.config.queue, this.filename, this.position]);
this.flushed = true;
}
async connectionPing() {
this.debug('Ping', 'Sending ping to database.');
// FIXME: Should Zongji.connection be pinged?
await new Promise((resolve, reject) => {
this.zongji.ctrlConnection.ping(err => {
if (err) return reject(err);
resolve();
});
})
await this.db.ping();
}
debug(namespace, message) {
if (this.config.debug)
console.debug(`${namespace}:`.blue, message.yellow);
}
}
function equals(a, b) {
if (a === b)
return true;
const type = typeof a;
if (a == null || b == null || type !== typeof b)
return false;
if (type === 'object' && a.constructor === b.constructor) {
if (a instanceof Date)
return a.getTime() === b.getTime();
}
return false;
}
function formatValue(value) {
if (value instanceof Date)
return value.toJSON();
return value;
}