mycdc/mycdc.js

462 lines
12 KiB
JavaScript

require('require-yaml');
require('colors');
const fs = require('fs-extra');
const path = require('path');
const ZongJi = require('./zongji');
const mysql = require('mysql2/promise');
const amqp = require('amqplib');
const allEvents = [
'writerows',
'updaterows',
'deleterows'
];
module.exports = class MyCDC {
constructor() {
this.running = false;
this.filename = null;
this.position = null;
this.schemaMap = new Map();
this.queues = {};
}
async start() {
const defaultConfig = require('./config/producer.yml');
const conf = this.conf = Object.assign({}, defaultConfig);
const localPath = path.join(__dirname, 'config/producer.local.yml');
if (fs.existsSync(localPath)) {
const localConfig = require(localPath);
Object.assign(conf, localConfig);
}
this.queuesConf = {};
const queueDir = path.join(__dirname, 'queues');
const queueFiles = await fs.readdir(queueDir);
for (const queueFile of queueFiles) {
const match = queueFile.match(/^([a-zA-Z0-9-_]+)\.ya?ml$/);
if (!match)
throw new Error(`Invalid queue file name '${queueFile}'`);
this.queuesConf[match[1]] = require(path.join(queueDir, queueFile));
}
const queues = this.queuesConf;
for (const queueName in queues) {
const includeSchema = queues[queueName].includeSchema;
for (const schemaName in includeSchema) {
let tableMap = this.schemaMap.get(schemaName);
if (!tableMap) {
tableMap = new Map();
this.schemaMap.set(schemaName, tableMap);
}
const schema = includeSchema[schemaName];
for (const tableName in schema) {
const table = schema[tableName];
//if (typeof table !== 'object') continue;
let tableInfo = tableMap.get(tableName);
if (!tableInfo) {
tableInfo = {
queues: new Map(),
events: new Map(),
columnSet: false,
fk: 'id'
};
tableMap.set(tableName, tableInfo);
}
tableInfo.queues.set(queueName, table);
const events = table.events || allEvents;
for (const event of events) {
let eventInfo = tableInfo.events.get(event);
if (!eventInfo) {
eventInfo = [];
tableInfo.events.set(event, eventInfo);
}
eventInfo.push(queueName);
}
const columns = table.columns;
if (columns) {
if (tableInfo.columnSet === false)
tableInfo.columnSet = new Set();
if (tableInfo.columnSet !== true)
for (const column of columns)
tableInfo.columnSet.add(column);
} else
tableInfo.columnSet = true;
if (table.id)
tableInfo.id = table.id;
}
this.schemaMap.set(schemaName, tableMap);
}
}
const includeSchema = {};
for (const [schemaName, tableMap] of this.schemaMap) {
includeSchema[schemaName] = Array.from(tableMap.keys());
for (const [tableName, tableInfo] of tableMap) {
if (tableInfo.columnSet !== true)
tableInfo.columns = Array.from(tableInfo.columnSet.keys());
}
}
this.opts = {
includeEvents: [
'rotate',
'tablemap',
'writerows',
'updaterows',
'deleterows'
],
includeSchema,
serverId: conf.serverId
};
if (conf.testMode)
console.log('Test mode enabled, just logging queries to console.');
console.log('Starting process.');
await this.init();
console.log('Process started.');
}
async stop() {
console.log('Stopping process.');
await this.end();
console.log('Process stopped.');
}
async init() {
const conf = this.conf;
this.debug('MyCDC', 'Initializing.');
this.onErrorListener = err => this.onError(err);
// DB connection
this.db = await mysql.createConnection(conf.db);
this.db.on('error', this.onErrorListener);
// RabbitMQ
this.publisher = await amqp.connect(conf.amqp);
const channel = this.channel = await this.publisher.createChannel();
for (const tableMap of this.schemaMap.values()) {
for (const tableName of tableMap.keys()) {
await channel.assertExchange(tableName, 'headers', {
durable: true
});
}
}
for (const queueName in this.queuesConf) {
const options = conf.deleteNonEmpty ? {} : {ifEmpty: true};
await channel.deleteQueue(queueName, {options});
await channel.assertQueue(queueName, {
durable: true
});
const includeSchema = this.queuesConf[queueName].includeSchema;
for (const schemaName in includeSchema) {
const schema = includeSchema[schemaName];
for (const tableName in schema) {
const table = schema[tableName];
const events = table.events || allEvents;
for (const event of events) {
let args;
if (event === 'updaterows' && table.columns) {
args = {'x-match': 'any'};
table.columns.map(c => args[c] = true);
} else
args = {'z-event': event};
await channel.bindQueue(queueName, tableName, '', args);
}
}
}
}
// Zongji
const zongji = new ZongJi(conf.db);
this.zongji = zongji;
this.onBinlogListener = evt => this.onBinlog(evt);
zongji.on('binlog', this.onBinlogListener);
const [res] = await this.db.query(
'SELECT `logName`, `position` FROM `binlogQueue` WHERE code = ?',
[conf.code]
);
if (res.length) {
const [row] = res;
this.filename = row.logName;
this.position = row.position;
this.isFlushed = true;
Object.assign(this.opts, {
filename: this.filename,
position: this.position
});
} else
this.opts.startAtEnd = true;
this.debug('Zongji', 'Starting.');
await new Promise((resolve, reject) => {
const onReady = () => {
zongji.off('error', onError);
resolve();
};
const onError = err => {
this.zongji = null;
zongji.off('ready', onReady);
zongji.off('binlog', this.onBinlogListener);
reject(err);
}
zongji.once('ready', onReady);
zongji.once('error', onError);
zongji.start(this.opts);
});
this.debug('Zongji', 'Started.');
this.zongji.on('error', this.onErrorListener);
this.flushInterval = setInterval(
() => this.flushQueue(), conf.flushInterval * 1000);
this.pingInterval = setInterval(
() => this.connectionPing(), conf.pingInterval * 1000);
// Summary
this.running = true;
this.debug('MyCDC', 'Initialized.');
}
async end(silent) {
const zongji = this.zongji;
if (!zongji) return;
this.debug('MyCDC', 'Ending.');
// Zongji
clearInterval(this.flushInterval);
clearInterval(this.pingInterval);
zongji.off('binlog', this.onBinlogListener);
zongji.off('error', this.onErrorListener);
this.zongji = null;
this.running = false;
this.debug('Zongji', 'Stopping.');
// FIXME: Cannot call Zongji.stop(), it doesn't wait to end connection
zongji.connection.destroy(() => {
console.log('zongji.connection.destroy');
});
await new Promise(resolve => {
zongji.ctrlConnection.query('KILL ?', [zongji.connection.threadId],
err => {
if (err && err.code !== 'ER_NO_SUCH_THREAD' && !silent)
console.error(err);
resolve();
});
});
zongji.ctrlConnection.destroy(() => {
console.log('zongji.ctrlConnection.destroy');
});
zongji.emit('stopped');
this.debug('Zongji', 'Stopped.');
// RabbitMQ
await this.publisher.close();
// DB connection
this.db.off('error', this.onErrorListener);
// FIXME: mysql2/promise bug, db.end() ends process
this.db.on('error', () => {});
try {
await this.db.end();
} catch (err) {
if (!silent)
console.error(err);
}
// Summary
this.debug('MyCDC', 'Ended.');
}
async tryRestart() {
try {
await this.init();
console.log('Process restarted.');
} catch(err) {
setTimeout(() => this.tryRestart(), 30);
}
}
async onError(err) {
console.log(`Error: ${err.code}: ${err.message}`);
try {
await this.end(true);
} catch(e) {}
switch (err.code) {
case 'PROTOCOL_CONNECTION_LOST':
case 'ECONNRESET':
console.log('Trying to restart process.');
await this.tryRestart();
break;
default:
process.exit();
}
}
onBinlog(evt) {
//evt.dump();
const eventName = evt.getEventName();
let position = evt.nextPosition;
switch (eventName) {
case 'rotate':
this.filename = evt.binlogName;
position = evt.position;
console.log(
`[${eventName}] filename: ${this.filename}`,
`position: ${this.position}, nextPosition: ${evt.nextPosition}`
);
break;
case 'writerows':
case 'deleterows':
case 'updaterows':
this.onRowEvent(evt, eventName);
break;
}
this.position = position;
this.isFlushed = false;
}
onRowEvent(evt, eventName) {
const table = evt.tableMap[evt.tableId];
const tableName = table.tableName;
const tableMap = this.schemaMap.get(table.parentSchema);
if (!tableMap) return;
const tableInfo = tableMap.get(tableName);
if (!tableInfo) return;
const queues = tableInfo.events.get(eventName);
if (!queues) return;
const isUpdate = eventName === 'updaterows';
let rows;
let cols;
if (isUpdate) {
rows = [];
cols = new Set();
const columns = tableInfo.columnSet === true
? Object.keys(evt.rows[0].after)
: tableInfo.columns;
for (const row of evt.rows) {
let nColsChanged = 0;
const after = row.after;
for (const col of columns) {
if (after[col] !== undefined
&& !equals(after[col], row.before[col])) {
nColsChanged++;
cols.add(col);
}
}
if (nColsChanged)
rows.push(row);
}
} else
rows = evt.rows;
if (!rows || !rows.length) return;
const data = {
eventName,
table: tableName,
schema: table.parentSchema,
rows
};
let headers = {};
headers['z-event'] = eventName;
if (isUpdate) {
for (const col of cols)
headers[col] = true;
data.cols = Array.from(cols);
}
const options = {
persistent: true,
headers
};
const jsonData = JSON.stringify(data);
this.channel.publish(tableName, '',
Buffer.from(jsonData), options);
if (this.debug) {
// console.debug(data, options);
console.debug('Queued:'.blue,
`${tableName}(${rows.length}) [${eventName}]`);
}
}
async flushQueue() {
if (this.isFlushed) return;
const {filename, position} = this;
this.debug('Flush', `filename: ${filename}, position: ${position}`);
const replaceQuery =
'REPLACE INTO `binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?';
if (!this.conf.testMode)
await this.db.query(replaceQuery, [this.conf.code, filename, position]);
this.isFlushed = true;
}
async connectionPing() {
this.debug('Ping', 'Sending ping to database.');
// FIXME: Should Zongji.connection be pinged?
await new Promise((resolve, reject) => {
this.zongji.ctrlConnection.ping(err => {
if (err) return reject(err);
resolve();
});
})
await this.db.ping();
}
debug(namespace, message) {
if (this.conf.debug)
console.debug(`${namespace}:`.blue, message.yellow);
}
}
function equals(a, b) {
if (a === b)
return true;
const type = typeof a;
if (a == null || b == null || type !== typeof b)
return false;
if (type === 'object' && a.constructor === b.constructor) {
if (a instanceof Date)
return a.getTime() === b.getTime();
}
return false;
}