mycdc/queues/queue-id.js

42 lines
1.1 KiB
JavaScript
Raw Normal View History

const Queue = require('../queue');
module.exports = class QueueId extends Queue {
async onConsume(msg) {
const {consumer, conf} = this;
const data = JSON.parse(msg.content.toString());
if (consumer.conf.debug)
console.debug('Message:'.blue, this.name.yellow, data.table);
const table = conf.includeSchema[data.schema][data.table]
const {query, key} = table;
const params = {
schema: data.schema,
table: data.table,
};
async function dbQuery(id) {
const myParams = Object.assign({id}, params);
if (consumer.conf.debug)
console.debug('SQL:'.blue, query, myParams);
if (!consumer.conf.testMode)
await consumer.db.query(query, myParams);
}
const keyChanged = data.cols.indexOf(key) !== -1;
if (data.eventName === 'updaterows') {
for (const row of data.rows) {
if (keyChanged)
await dbQuery(row.before[key]);
await dbQuery(row.after[key]);
}
} else
for (const row of data.rows)
await dbQuery(row[key]);
await this.channel.ack(msg);
}
}