Tab removed
This commit is contained in:
parent
6a49bb59b5
commit
baff147435
423
db-async.js
423
db-async.js
|
@ -9,251 +9,250 @@ const allEvents = new Set([
|
|||
const fks = new Set();
|
||||
|
||||
module.exports = class DbAsync {
|
||||
constructor(config) {
|
||||
this.config = config;
|
||||
this.running = false;
|
||||
this.filename = null;
|
||||
this.position = null;
|
||||
this.schemaMap = new Map();
|
||||
}
|
||||
constructor(config) {
|
||||
this.config = config;
|
||||
this.running = false;
|
||||
this.filename = null;
|
||||
this.position = null;
|
||||
this.schemaMap = new Map();
|
||||
}
|
||||
|
||||
async start() {
|
||||
if (this.config.testMode)
|
||||
console.debug('Test mode enabled, just logging queries to console.');
|
||||
async start() {
|
||||
if (this.config.testMode)
|
||||
console.debug('Test mode enabled, just logging queries to console.');
|
||||
|
||||
console.log('Starting process.');
|
||||
console.log('Starting process.');
|
||||
|
||||
const db = await mysql.createConnection(this.config.db);
|
||||
this.db = db;
|
||||
const db = await mysql.createConnection(this.config.db);
|
||||
this.db = db;
|
||||
|
||||
const includeSchema = {};
|
||||
for (const schemaName in this.config.includeSchema) {
|
||||
const schema = this.config.includeSchema[schemaName];
|
||||
const tables = [];
|
||||
const tableMap = new Map();
|
||||
const includeSchema = {};
|
||||
for (const schemaName in this.config.includeSchema) {
|
||||
const schema = this.config.includeSchema[schemaName];
|
||||
const tables = [];
|
||||
const tableMap = new Map();
|
||||
|
||||
for (const tableName in schema) {
|
||||
const table = schema[tableName];
|
||||
tables.push(tableName);
|
||||
for (const tableName in schema) {
|
||||
const table = schema[tableName];
|
||||
tables.push(tableName);
|
||||
|
||||
const tableInfo = {
|
||||
events: allEvents,
|
||||
columns: true,
|
||||
fk: 'id'
|
||||
};
|
||||
tableMap.set(tableName, tableInfo);
|
||||
const tableInfo = {
|
||||
events: allEvents,
|
||||
columns: true,
|
||||
fk: 'id'
|
||||
};
|
||||
tableMap.set(tableName, tableInfo);
|
||||
|
||||
if (typeof table === 'object') {
|
||||
if (Array.isArray(table.events))
|
||||
tableInfo.events = new Set(table.events);
|
||||
if (Array.isArray(table.columns))
|
||||
tableInfo.columns = new Set(table.columns);
|
||||
if (table.fk)
|
||||
tableInfo.fk = table.fk;
|
||||
}
|
||||
if (typeof table === 'object') {
|
||||
if (Array.isArray(table.events))
|
||||
tableInfo.events = new Set(table.events);
|
||||
if (Array.isArray(table.columns))
|
||||
tableInfo.columns = new Set(table.columns);
|
||||
if (table.fk)
|
||||
tableInfo.fk = table.fk;
|
||||
}
|
||||
|
||||
includeSchema[schemaName] = tables;
|
||||
this.schemaMap.set(schemaName, tableMap);
|
||||
}
|
||||
|
||||
const opts = {
|
||||
includeEvents: this.config.includeEvents,
|
||||
includeSchema
|
||||
};
|
||||
this.opts = opts;
|
||||
|
||||
const [res] = await this.db.query(
|
||||
'SELECT `logName`, `position` FROM `binlogQueue` WHERE code = ?',
|
||||
[this.config.queue]
|
||||
);
|
||||
if (res.length) {
|
||||
const [row] = res;
|
||||
this.filename = row.logName;
|
||||
this.position = row.position;
|
||||
Object.assign(opts, {
|
||||
filename: this.filename,
|
||||
position: this.position
|
||||
});
|
||||
} else
|
||||
opts.startAtEnd = true;
|
||||
|
||||
await this.startZongji();
|
||||
includeSchema[schemaName] = tables;
|
||||
this.schemaMap.set(schemaName, tableMap);
|
||||
}
|
||||
|
||||
async stop() {
|
||||
await this.stopZongji();
|
||||
await this.db.end();
|
||||
const opts = {
|
||||
includeEvents: this.config.includeEvents,
|
||||
includeSchema
|
||||
};
|
||||
this.opts = opts;
|
||||
|
||||
const [res] = await this.db.query(
|
||||
'SELECT `logName`, `position` FROM `binlogQueue` WHERE code = ?',
|
||||
[this.config.queue]
|
||||
);
|
||||
if (res.length) {
|
||||
const [row] = res;
|
||||
this.filename = row.logName;
|
||||
this.position = row.position;
|
||||
Object.assign(opts, {
|
||||
filename: this.filename,
|
||||
position: this.position
|
||||
});
|
||||
} else
|
||||
opts.startAtEnd = true;
|
||||
|
||||
await this.startZongji();
|
||||
}
|
||||
|
||||
async stop() {
|
||||
await this.stopZongji();
|
||||
await this.db.end();
|
||||
}
|
||||
|
||||
async startZongji() {
|
||||
const zongji = new ZongJi(this.config.db);
|
||||
this.zongji = zongji;
|
||||
|
||||
zongji.on('ready', () => this.onReady());
|
||||
zongji.on('stopped', () => this.onStopped());
|
||||
zongji.on('error', err => this.onError(err));
|
||||
zongji.on('binlog', evt => this.onBinlog(evt));
|
||||
zongji.start(this.opts);
|
||||
}
|
||||
|
||||
async stopZongji() {
|
||||
console.debug('Stopping Zongji.');
|
||||
this.running = false;
|
||||
clearInterval(this.flushInterval);
|
||||
clearInterval(this.pingInterval);
|
||||
this.zongji.stop();
|
||||
}
|
||||
|
||||
async restartZongji() {
|
||||
console.debug('Restaring Zongji.');
|
||||
await this.stopZongji();
|
||||
setTimeout(() => this.startZongji(this.opts), 1000);
|
||||
}
|
||||
|
||||
onReady() {
|
||||
this.running = true;
|
||||
|
||||
this.flushInterval = setInterval(
|
||||
() => this.flushQueue(), this.config.flushInterval);
|
||||
this.pingInterval = setInterval(
|
||||
() => this.connectionPing(), this.config.pingInterval * 1000);
|
||||
|
||||
console.debug('Zongji ready.');
|
||||
}
|
||||
|
||||
onStopped() {
|
||||
console.debug('Zongji stopped.');
|
||||
}
|
||||
|
||||
async onError(err) {
|
||||
console.log(`Error: ${err.code}: ${err.message}`);
|
||||
switch (err.code) {
|
||||
case 'PROTOCOL_CONNECTION_LOST':
|
||||
case 'ECONNRESET':
|
||||
await this.restartZongji();
|
||||
break;
|
||||
default:
|
||||
await this.stop();
|
||||
process.exit();
|
||||
}
|
||||
}
|
||||
|
||||
onBinlog(evt) {
|
||||
//evt.dump();
|
||||
const eventName = evt.getEventName();
|
||||
if (eventName === 'tablemap') return;
|
||||
|
||||
if (eventName === 'rotate') {
|
||||
this.filename = evt.binlogName;
|
||||
this.position = evt.position;
|
||||
console.log(`[${eventName}] filename: ${this.filename}`, `position: ${this.position}`);
|
||||
return;
|
||||
}
|
||||
|
||||
async startZongji() {
|
||||
const zongji = new ZongJi(this.config.db);
|
||||
this.zongji = zongji;
|
||||
const table = evt.tableMap[evt.tableId];
|
||||
const tableMap = this.schemaMap.get(table.parentSchema);
|
||||
if (!tableMap) return;
|
||||
|
||||
zongji.on('ready', () => this.onReady());
|
||||
zongji.on('stopped', () => this.onStopped());
|
||||
zongji.on('error', err => this.onError(err));
|
||||
zongji.on('binlog', evt => this.onBinlog(evt));
|
||||
zongji.start(this.opts);
|
||||
}
|
||||
const tableInfo = tableMap.get(table.tableName);
|
||||
if (!tableInfo) return;
|
||||
|
||||
async stopZongji() {
|
||||
console.debug('Stopping Zongji.');
|
||||
this.running = false;
|
||||
clearInterval(this.flushInterval);
|
||||
clearInterval(this.pingInterval);
|
||||
this.zongji.stop();
|
||||
}
|
||||
if (!tableInfo.events.has(eventName)) return;
|
||||
|
||||
async restartZongji() {
|
||||
console.debug('Restaring Zongji.');
|
||||
await this.stopZongji();
|
||||
setTimeout(() => this.startZongji(this.opts), 1000);
|
||||
}
|
||||
let column;
|
||||
const rows = evt.rows;
|
||||
|
||||
onReady() {
|
||||
this.running = true;
|
||||
|
||||
this.flushInterval = setInterval(
|
||||
() => this.flushQueue(), this.config.flushInterval);
|
||||
this.pingInterval = setInterval(
|
||||
() => this.connectionPing(), this.config.pingInterval * 1000);
|
||||
|
||||
console.debug('Zongji ready.');
|
||||
}
|
||||
|
||||
onStopped() {
|
||||
console.debug('Zongji stopped.');
|
||||
}
|
||||
|
||||
async onError(err) {
|
||||
console.log(`Error: ${err.code}: ${err.message}`);
|
||||
switch (err.code) {
|
||||
case 'PROTOCOL_CONNECTION_LOST':
|
||||
case 'ECONNRESET':
|
||||
await this.restartZongji();
|
||||
break;
|
||||
default:
|
||||
await this.stop();
|
||||
process.exit();
|
||||
}
|
||||
}
|
||||
|
||||
onBinlog(evt) {
|
||||
//evt.dump();
|
||||
const eventName = evt.getEventName();
|
||||
if (eventName === 'tablemap') return;
|
||||
|
||||
if (eventName === 'rotate') {
|
||||
this.filename = evt.binlogName;
|
||||
this.position = evt.position;
|
||||
console.log(`[${eventName}] filename: ${this.filename}`, `position: ${this.position}`);
|
||||
return;
|
||||
}
|
||||
|
||||
const table = evt.tableMap[evt.tableId];
|
||||
const tableMap = this.schemaMap.get(table.parentSchema);
|
||||
if (!tableMap) return;
|
||||
|
||||
const tableInfo = tableMap.get(table.tableName);
|
||||
if (!tableInfo) return;
|
||||
|
||||
if (!tableInfo.events.has(eventName)) return;
|
||||
|
||||
let column;
|
||||
const rows = evt.rows;
|
||||
|
||||
if (eventName === 'updaterows') {
|
||||
if (tableInfo.columns !== true) {
|
||||
let changes = false;
|
||||
for (const row of rows) {
|
||||
const after = row.after;
|
||||
for (const col in after) {
|
||||
if (tableInfo.columns.has(col) && !equals(after[col], row.before[col])) {
|
||||
fks.add(after[tableInfo.fk]);
|
||||
changes = true;
|
||||
if (!column) column = col;
|
||||
break;
|
||||
}
|
||||
if (eventName === 'updaterows') {
|
||||
if (tableInfo.columns !== true) {
|
||||
let changes = false;
|
||||
for (const row of rows) {
|
||||
const after = row.after;
|
||||
for (const col in after) {
|
||||
if (tableInfo.columns.has(col) && !equals(after[col], row.before[col])) {
|
||||
fks.add(after[tableInfo.fk]);
|
||||
changes = true;
|
||||
if (!column) column = col;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!changes) return;
|
||||
} else {
|
||||
for (const row of rows)
|
||||
fks.add(row.after[tableInfo.fk]);
|
||||
}
|
||||
if (!changes) return;
|
||||
} else {
|
||||
for (const row of rows)
|
||||
fks.add(row[tableInfo.fk]);
|
||||
fks.add(row.after[tableInfo.fk]);
|
||||
}
|
||||
|
||||
const row = eventName === 'updaterows'
|
||||
? rows[0].after
|
||||
: rows[0];
|
||||
|
||||
if (this.config.debug) {
|
||||
console.debug(`[${eventName}] ${table.tableName}: ${rows.length}`);
|
||||
console.debug(` ${tableInfo.fk}: ${row[tableInfo.fk]}`);
|
||||
if (column) {
|
||||
let before = formatValue(rows[0].before[column]);
|
||||
let after = formatValue(rows[0].after[column]);
|
||||
console.debug(` ${column}: ${before} <- ${after}`);
|
||||
}
|
||||
}
|
||||
|
||||
this.position = evt.nextPosition;
|
||||
} else {
|
||||
for (const row of rows)
|
||||
fks.add(row[tableInfo.fk]);
|
||||
}
|
||||
|
||||
async flushQueue() {
|
||||
if (!this.running) return;
|
||||
console.log('==========================================================');
|
||||
console.log('Flush:', `filename: ${this.filename}`, `position: ${this.position}`);
|
||||
console.log(fks);
|
||||
if (!fks.size) return;
|
||||
const row = eventName === 'updaterows'
|
||||
? rows[0].after
|
||||
: rows[0];
|
||||
|
||||
const ids = [];
|
||||
for (const fk of fks) ids.push([fk]);
|
||||
|
||||
const replaceQuery =
|
||||
'REPLACE INTO `binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?';
|
||||
|
||||
if (this.config.testMode) {
|
||||
console.debug(this.config.addQuery);
|
||||
console.debug(replaceQuery);
|
||||
} else {
|
||||
await this.db.query(this.config.addQuery, [ids]);
|
||||
await this.db.query(replaceQuery, [this.config.queue, this.filename, this.position]);
|
||||
if (this.config.debug) {
|
||||
console.debug(`[${eventName}] ${table.tableName}: ${rows.length}`);
|
||||
console.debug(` ${tableInfo.fk}: ${row[tableInfo.fk]}`);
|
||||
if (column) {
|
||||
let before = formatValue(rows[0].before[column]);
|
||||
let after = formatValue(rows[0].after[column]);
|
||||
console.debug(` ${column}: ${before} <- ${after}`);
|
||||
}
|
||||
fks.clear();
|
||||
console.log('==========================================================');
|
||||
}
|
||||
|
||||
async connectionPing() {
|
||||
if (!this.running) return;
|
||||
if (this.config.debug)
|
||||
console.debug('Sending ping to database.')
|
||||
this.zongji.connection.ping();
|
||||
this.zongji.ctrlConnection.ping();
|
||||
await this.db.ping();
|
||||
}
|
||||
this.position = evt.nextPosition;
|
||||
}
|
||||
|
||||
function equals(a, b) {
|
||||
if (a === b)
|
||||
return true;
|
||||
const type = typeof a;
|
||||
if (a == null || b == null || type !== typeof b)
|
||||
return false;
|
||||
if (type === 'object' && a.constructor === b.constructor) {
|
||||
if (a instanceof Date)
|
||||
return a.getTime() === b.getTime();
|
||||
async flushQueue() {
|
||||
if (!this.running) return;
|
||||
console.log('==========================================================');
|
||||
console.log('Flush:', `filename: ${this.filename}`, `position: ${this.position}`);
|
||||
console.log(fks);
|
||||
if (!fks.size) return;
|
||||
|
||||
const ids = [];
|
||||
for (const fk of fks) ids.push([fk]);
|
||||
|
||||
const replaceQuery =
|
||||
'REPLACE INTO `binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?';
|
||||
|
||||
if (this.config.testMode) {
|
||||
console.debug(this.config.addQuery);
|
||||
console.debug(replaceQuery);
|
||||
} else {
|
||||
await this.db.query(this.config.addQuery, [ids]);
|
||||
await this.db.query(replaceQuery, [this.config.queue, this.filename, this.position]);
|
||||
}
|
||||
fks.clear();
|
||||
console.log('==========================================================');
|
||||
}
|
||||
|
||||
async connectionPing() {
|
||||
if (!this.running) return;
|
||||
if (this.config.debug)
|
||||
console.debug('Sending ping to database.')
|
||||
this.zongji.connection.ping();
|
||||
this.zongji.ctrlConnection.ping();
|
||||
await this.db.ping();
|
||||
}
|
||||
}
|
||||
|
||||
function equals(a, b) {
|
||||
if (a === b)
|
||||
return true;
|
||||
const type = typeof a;
|
||||
if (a == null || b == null || type !== typeof b)
|
||||
return false;
|
||||
if (type === 'object' && a.constructor === b.constructor) {
|
||||
if (a instanceof Date)
|
||||
return a.getTime() === b.getTime();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function formatValue(value) {
|
||||
if (value instanceof Date)
|
||||
return value.toJSON();
|
||||
return value;
|
||||
}
|
||||
|
||||
function formatValue(value) {
|
||||
if (value instanceof Date)
|
||||
return value.toJSON();
|
||||
return value;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue