diff --git a/.gitignore b/.gitignore index 2cd4783..f14ce54 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ node_modules zongji -config.local.yml \ No newline at end of file +config/*.local.yml \ No newline at end of file diff --git a/config.yml b/config.yml deleted file mode 100644 index fedcb9b..0000000 --- a/config.yml +++ /dev/null @@ -1,136 +0,0 @@ - -debug: true -testMode: false -code: mycdc -db: - host: localhost - port: 3306 - user: zongji - password: password - database: util -consumerDb: - host: localhost - port: 3306 - user: zongji - password: password - database: util -amqp: amqp://user:password@localhost:5672 -pingInterval: 60 -flushInterval: 10 -queues: - orderTotal: - query: CALL hedera.order_recalc(?) - mode: fk - includeSchema: - hedera: - order: - key: id - columns: - - id - - address_id - - company_id - - date_send - - customer_id - events: - - updaterows - orderRow: - key: orderFk - columns: - - id - - orderFk - - itemFk - - warehouseFk - - shipment - - amount - ticketTotal: - query: CALL vn.ticket_recalc(?) - mode: fk - includeSchema: - vn: - ticket: - key: id - columns: - - id - - shipped - - warehouseFk - - clientFk - events: - - updaterows - sale: - key: ticketFk - columns: - - id - - ticketFk - - itemFk - - quantity - - price - comparative: - query: CALL vn.comparative_refresh(?, ?, ?) - mode: changes - includeSchema: - vn: - ticket: - key: id - columns: - - id - - shipped - - warehouseFk - - isDeleted - events: - - updaterows - sale: - key: id - columns: - - id - - ticketFk - - itemFk - - quantity - - price - stock: - query: CALL stock.available_refresh(?, ?, ?) - mode: changes - includeSchema: - vn: - ticket: - key: id - columns: - - id - - shipped - - warehouseFk - - isDeleted - events: - - updaterows - sale: - key: id - columns: - - id - - ticketFk - - itemFk - - quantity - travel: - key: id - columns: - - id - - shipped - - landing - - warehouseInFk - - warehouseOutFk - - isDelivered - - isReceived - events: - - updaterows - entry: - key: id - columns: - - id - - travelFk - events: - - updaterows - buy: - key: id - columns: - - id - - entryFk - - itemFk - - quantity - - life diff --git a/config/consumer.yml b/config/consumer.yml new file mode 100644 index 0000000..b4376cc --- /dev/null +++ b/config/consumer.yml @@ -0,0 +1,12 @@ +debug: false +testMode: false +amqp: amqp://user:password@localhost:5672 +db: + host: localhost + port: 3306 + user: zongji + password: password + database: util +pollQueues: + - queue1 + - queue2 \ No newline at end of file diff --git a/config/producer.yml b/config/producer.yml new file mode 100644 index 0000000..ec2fd27 --- /dev/null +++ b/config/producer.yml @@ -0,0 +1,13 @@ +code: mycdc +debug: false +testMode: false +amqp: amqp://user:password@localhost:5672 +pingInterval: 60 +flushInterval: 10 +db: + host: localhost + port: 3306 + user: zongji + password: password + database: util + \ No newline at end of file diff --git a/config/queues.yml b/config/queues.yml new file mode 100644 index 0000000..6b60f4a --- /dev/null +++ b/config/queues.yml @@ -0,0 +1,72 @@ +orderTotal: + query: CALL hedera.order_recalc(?) + mode: fk + flushInterval: 5000 + includeSchema: + hedera: + order: + key: id + columns: + - id + - address_id + - company_id + - date_send + - customer_id + events: + - updaterows + orderRow: + key: orderFk + columns: + - id + - orderFk + - itemFk + - warehouseFk + - shipment + - amount +ticketTotal: + query: CALL vn.ticket_recalc(?) + mode: fk + flushInterval: 5000 + includeSchema: + vn: + ticket: + key: id + columns: + - id + - shipped + - warehouseFk + - clientFk + events: + - updaterows + sale: + key: ticketFk + columns: + - id + - ticketFk + - itemFk + - quantity + - price +comparative: + query: CALL vn.itemComparative_refresh(?, ?, ?) + mode: changes + flushInterval: 5000 + includeSchema: + vn: + ticket: + key: id + columns: + - id + - shipped + - warehouseFk + - isDeleted + - clientFk + events: + - updaterows + sale: + key: id + columns: + - id + - ticketFk + - itemFk + - quantity + - priceFixed diff --git a/consumer.js b/consumer.js index 5228023..67ddac3 100644 --- a/consumer.js +++ b/consumer.js @@ -4,20 +4,20 @@ const fs = require('fs'); const path = require('path'); const mysql = require('mysql2/promise'); const amqp = require('amqplib'); +const Queue = require('./queue'); const {cpus} = require('os'); - class Consumer { async start() { - const defaultConfig = require('./config.yml'); - const config = this.config = Object.assign({}, defaultConfig); - const localPath = path.join(__dirname, 'config.local.yml'); + const defaultConfig = require('./config/consumer.yml'); + const conf = this.conf = Object.assign({}, defaultConfig); + const localPath = path.join(__dirname, 'config/consumer.local.yml'); if (fs.existsSync(localPath)) { const localConfig = require(localPath); - Object.assign(config, localConfig); + Object.assign(conf, localConfig); } - if (config.testMode) + if (conf.testMode) console.log('Test mode enabled, just logging queries to console.'); console.log('Starting process.'); @@ -34,33 +34,32 @@ class Consumer { } async init() { - const config = this.config; + const conf = this.conf; this.onErrorListener = err => this.onError(err); const dbConfig = Object.assign({ connectionLimit: cpus().length - }, config.consumerDb); + }, conf.db); this.db = await mysql.createPool(dbConfig); this.db.on('error', this.onErrorListener); - this.consumer = await amqp.connect(config.amqp); - this.channel = await this.consumer.createChannel(); - this.channel.prefetch(1); + this.amqpConn = await amqp.connect(conf.amqp); } async consumeQueues() { - for (const queueName in this.config.queues) { - await this.channel.assertQueue(queueName, { - durable: true - }); - await this.channel.consume(queueName, - msg => this.onConsume(msg, queueName)); + const queuesConf = require('./config/queues.yml'); + this.queues = {}; + + for (const queueName in queuesConf) { + const queue = new Queue(this, queueName, queuesConf[queueName]); + this.queues[queueName] = queue; + await queue.consume(); } } async end(silent) { - await this.consumer.close(); + await this.amqpConn.close(); this.db.off('error', this.onErrorListener); // FIXME: mysql2/promise bug, db.end() ends process @@ -73,47 +72,6 @@ class Consumer { } } - async onConsume(msg, queueName) { - const config = this.config; - const data = JSON.parse(msg.content.toString()); - - if (config.debug) - console.debug('Message:'.blue, queueName.yellow, data.table); - - const queue = config.queues[queueName]; - let query = queue.query; - if (!query) return; - - // XXX: Testing - //query = 'SELECT 1 sleep'; - - switch(queue.mode) { - case 'fk': - for (const fk of data.fks) { - const sql = this.db.format(query, fk); - this.debug('SQL', sql); - if (!config.testMode) - await this.db.query(query, fk); - } - break; - case 'changes': - const queueTable = queue.includeSchema[data.schema][data.table]; - for (const row of data.rows) { - const sql = this.db.format(query, [ - data.table, - row[queueTable.key], - JSON.stringify(row) - ]); - this.debug('SQL', sql); - if (!config.testMode) - await this.db.query(query, row); - } - break; - } - - await this.channel.ack(msg); - } - async tryRestart() { try { await this.init(); @@ -141,7 +99,7 @@ class Consumer { } debug(namespace, message) { - if (this.config.debug) + if (this.conf.debug) console.debug(`${namespace}:`.blue, message.yellow); } } diff --git a/mycdc.js b/mycdc.js index 92c448f..ab7e174 100644 --- a/mycdc.js +++ b/mycdc.js @@ -22,15 +22,16 @@ module.exports = class MyCDC { } async start() { - const defaultConfig = require('./config.yml'); - const config = this.config = Object.assign({}, defaultConfig); - const localPath = path.join(__dirname, 'config.local.yml'); + const defaultConfig = require('./config/producer.yml'); + const conf = this.conf = Object.assign({}, defaultConfig); + const localPath = path.join(__dirname, 'config/producer.local.yml'); if (fs.existsSync(localPath)) { const localConfig = require(localPath); - Object.assign(config, localConfig); + Object.assign(conf, localConfig); } + this.queuesConf = require('./config/queues.yml'); - const queues = config.queues; + const queues = this.queuesConf; for (const queueName in queues) { const includeSchema = queues[queueName].includeSchema; for (const schemaName in includeSchema) { @@ -100,7 +101,7 @@ module.exports = class MyCDC { includeSchema }; - if (config.testMode) + if (conf.testMode) console.log('Test mode enabled, just logging queries to console.'); console.log('Starting process.'); @@ -115,21 +116,28 @@ module.exports = class MyCDC { } async init() { - const config = this.config; + const conf = this.conf; this.debug('MyCDC', 'Initializing.'); this.onErrorListener = err => this.onError(err); // DB connection - this.db = await mysql.createConnection(config.db); + this.db = await mysql.createConnection(conf.db); this.db.on('error', this.onErrorListener); // RabbitMQ - this.publisher = await amqp.connect(config.amqp); + this.publisher = await amqp.connect(conf.amqp); this.channel = await this.publisher.createChannel(); - for (const queueName in config.queues) { + for (const tableMap of this.schemaMap.values()) { + for (const tableName of tableMap.keys()) { + await this.channel.assertExchange(tableName, 'headers', { + durable: true + }); + } + } + for (const queueName in this.queuesConf) { await this.channel.assertQueue(queueName, { durable: true }); @@ -137,7 +145,7 @@ module.exports = class MyCDC { // Zongji - const zongji = new ZongJi(config.db); + const zongji = new ZongJi(conf.db); this.zongji = zongji; this.onBinlogListener = evt => this.onBinlog(evt); @@ -145,7 +153,7 @@ module.exports = class MyCDC { const [res] = await this.db.query( 'SELECT `logName`, `position` FROM `binlogQueue` WHERE code = ?', - [config.code] + [conf.code] ); if (res.length) { const [row] = res; @@ -180,9 +188,9 @@ module.exports = class MyCDC { this.zongji.on('error', this.onErrorListener); this.flushInterval = setInterval( - () => this.flushQueue(), config.flushInterval * 1000); + () => this.flushQueue(), conf.flushInterval * 1000); this.pingInterval = setInterval( - () => this.connectionPing(), config.pingInterval * 1000); + () => this.connectionPing(), conf.pingInterval * 1000); // Summary @@ -211,9 +219,9 @@ module.exports = class MyCDC { console.log('zongji.connection.destroy'); }); await new Promise(resolve => { - zongji.ctrlConnection.query('KILL ' + zongji.connection.threadId, + zongji.ctrlConnection.query('KILL ?', [zongji.connection.threadId], err => { - if (err && !silent) + if (err && err.code !== 'ER_NO_SUCH_THREAD' && !silent) console.error(err); resolve(); }); @@ -274,143 +282,115 @@ module.exports = class MyCDC { onBinlog(evt) { //evt.dump(); const eventName = evt.getEventName(); - if (eventName === 'tablemap') return; + let position = evt.nextPosition; - if (eventName === 'rotate') { + switch (eventName) { + case 'rotate': this.filename = evt.binlogName; - this.position = evt.position; - console.log(`[${eventName}] filename: ${this.filename}`, `position: ${this.position}`); - return; + position = evt.position; + console.log(`[${eventName}] filename: ${this.filename}`, `position: ${this.position}, nextPosition: ${evt.nextPosition}`); + break; + case 'writerows': + case 'deleterows': + case 'updaterows': + this.onRowEvent(evt, eventName); + break; } + this.position = position; + this.flushed = false; + } + + onRowEvent(evt, eventName) { const table = evt.tableMap[evt.tableId]; + const tableName = table.tableName; const tableMap = this.schemaMap.get(table.parentSchema); if (!tableMap) return; - const tableInfo = tableMap.get(table.tableName); + const tableInfo = tableMap.get(tableName); if (!tableInfo) return; - const queueNames = tableInfo.events.get(eventName); - if (!queueNames) return; + const queues = tableInfo.events.get(eventName); + if (!queues) return; - const rows = evt.rows; - const queues = this.config.queues; - const tableQueues = tableInfo.queues; + const isUpdate = eventName === 'updaterows'; + let rows; + let cols; - const changes = new Map(); - for (const queueName of queueNames) { - const change = { - mode: queues[queueName].mode - }; - changes.set(queueName, change); + if (isUpdate) { + rows = []; + cols = new Set(); + const columns = tableInfo.columns.keys(); - switch(change.mode) { - case 'fk': - change.fks = new Set(); - break; - case 'changes': - change.rows = []; - break; - } - } - - function addChange(queueNames, row, old) { - for (const queueName of queueNames) { - const queueInfo = tableQueues.get(queueName); - const change = changes.get(queueName); - - const key = row[queueInfo.key]; - const oldKey = old ? old[queueInfo.key] : null; - - switch(change.mode) { - case 'fk': - change.fks.add(key); - if (old && !equals(oldKey, key)) - change.fks.add(oldKey); - break; - case 'changes': - const queueRow = {}; - for (const column of queueInfo.columns) - if (row[column] !== undefined) - queueRow[column] = row[column]; - change.rows.push(queueRow); - break; - } - } - } - - const columnMap = tableInfo.columns; - const columns = columnMap.keys(); - - if (eventName === 'updaterows') { - const changedQueues = new Set(); - for (const row of rows) { - changedQueues.clear(); + for (const row of evt.rows) { + let nColsChanged = 0; const after = row.after; for (const col of columns) { - if (after[col] === undefined - || equals(after[col], row.before[col])) - continue; - - for (const queue of columnMap.get(col)) - changedQueues.add(queue); - - if (changedQueues.size === queueNames.length) - break; + if (after[col] !== undefined + && !equals(after[col], row.before[col])) { + nColsChanged++; + cols.add(col); + } } - - if (changedQueues.size) - addChange(changedQueues, after, row.before); + if (nColsChanged) + rows.push(row); } + } else + rows = evt.rows; + + if (!rows || !rows.length) return; + + const data = { + eventName, + table: tableName, + schema: table.parentSchema, + rows + }; + + const headers = {}; + if (isUpdate) { + for (const col of cols) + headers[col] = true; + data.cols = Array.from(cols); } else { - for (const row of rows) - addChange(queueNames, row); + headers['z-'+ eventName] = true; } - for (const [queueName, change] of changes) { - const jsonData = { - eventName, - table: table.tableName, - schema: table.parentSchema, - mode: change.mode - }; + const options = { + persistent: true, + headers + }; - let nChanges; - switch(change.mode) { - case 'fk': - jsonData.fks = Array.from(change.fks); - nChanges = change.fks.size; - break; - case 'changes': - jsonData.rows = change.rows; - nChanges = change.rows.length; - break; - } + const jsonData = JSON.stringify(data); + this.channel.publish(tableName, '', + Buffer.from(jsonData), options); - if (!nChanges) continue; - - const data = JSON.stringify(jsonData); - this.channel.sendToQueue(queueName, - Buffer.from(data), {persistent: true}); - - console.debug('Queued:'.blue, `${queueName}:`.yellow, `${table.tableName}(${nChanges}) [${eventName}]`); + if (this.debug) { + //console.debug(data, options); + console.debug('Queued:'.blue, + `${tableName}(${rows.length}) [${eventName}]`); } - - this.position = evt.nextPosition; - this.flushed = false; } async flushQueue() { if (this.flushed) return; - this.debug('Flush', `filename: ${this.filename}, position: ${this.position}`); - - const replaceQuery = - 'REPLACE INTO `binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?'; - if (!this.config.testMode) - await this.db.query(replaceQuery, [this.config.code, this.filename, this.position]); + const position = this.nextPosition; - this.flushed = true; + if (position) { + const filename = this.nextFilename; + this.debug('Flush', `filename: ${filename}, position: ${position}`); + + const replaceQuery = + 'REPLACE INTO `binlogQueue` SET `code` = ?, `logName` = ?, `position` = ?'; + if (!this.conf.testMode) + await this.db.query(replaceQuery, [this.conf.code, filename, position]); + + this.flushed = true; + } + + this.nextFilename = this.filename; + this.nextPosition = this.position; } async connectionPing() { @@ -427,7 +407,7 @@ module.exports = class MyCDC { } debug(namespace, message) { - if (this.config.debug) + if (this.conf.debug) console.debug(`${namespace}:`.blue, message.yellow); } } @@ -444,9 +424,3 @@ function equals(a, b) { } return false; } - -function formatValue(value) { - if (value instanceof Date) - return value.toJSON(); - return value; -} diff --git a/queue.js b/queue.js new file mode 100644 index 0000000..d5668c0 --- /dev/null +++ b/queue.js @@ -0,0 +1,91 @@ +module.exports = class Queue { + constructor(consumer, name, conf) { + Object.assign(this, {consumer, name, conf}); + this.reset(); + } + + async consume() { + if (this.conf.mode !== 'fk') return; + + const channel = await this.consumer.amqpConn.createChannel(); + channel.prefetch(this.conf.amqpPrefetch); + await channel.assertQueue(this.name, { + durable: true + }); + this.channel = channel; + + await channel.consume(this.name, + msg => this.onConsume(msg)); + this.flush(); + } + + reset() { + this.lastMessage = null; + this.nMessages = 0; + this.ids = new Set(); + } + + flush(flushInterval) { + if (this.timeout) { + clearTimeout(this.timeout); + this.timeout = null; + } + this.timeout = setTimeout( + () => this.onFlushTimeout(), flushInterval); + } + + async onFlushTimeout() { + const consumer = this.consumer; + + if (this.ids.size) { + if (consumer.conf.debug) + console.debug('Flush:'.blue, this.name.yellow, this.ids); + + const ids = Array.from(this.ids); + const lastMessage = this.lastMessage; + this.reset(); + + try { + for (const id of ids) { + const sql = consumer.db.format(this.conf.query, id); + consumer.debug('SQL', sql); + if (!consumer.conf.testMode) + await consumer.db.query(sql); + } + + await this.channel.ack(lastMessage, true); + } catch(err) { + await this.channel.nack(lastMessage, true); + console.error(err); + } + } + + this.flush(this.conf.flushInterval); + } + + async onConsume(msg) { + const consumer = this.consumer; + const data = JSON.parse(msg.content.toString()); + + if (consumer.conf.debug) + console.debug('Message:'.blue, this.name.yellow, data.table); + + const ids = this.ids; + const key = this.conf.includeSchema[data.schema][data.table].key; + + if (data.eventName === 'updaterows') { + for (const row of data.rows) { + ids.add(row.before[key]); + ids.add(row.after[key]); + } + } else + for (const row of data.rows) + ids.add(row[key]); + + this.nMessages++; + this.lastMessage = msg; + + if (this.nMessages == consumer.conf.amqpPrefetch) + this.flush(); + } +} diff --git a/run-rabbit.sh b/run-rabbit.sh index f211336..7afdfba 100755 --- a/run-rabbit.sh +++ b/run-rabbit.sh @@ -1,10 +1,10 @@ #!/bin/bash -docker rm -f some-rabbit +docker rm -f rabbit docker run \ -d --hostname my-rabbit \ - --name some-rabbit \ + --name rabbit \ -e RABBITMQ_DEFAULT_USER=user \ -e RABBITMQ_DEFAULT_PASS=password \ -p 5672:5672 \