const Queue = require('../queue'); module.exports = class QueueId extends Queue { async onConsume(msg) { const {consumer, conf} = this; const data = JSON.parse(msg.content.toString()); if (consumer.conf.debug) console.debug('Message:'.blue, this.name.yellow, data.table); const table = conf.includeSchema[data.schema][data.table] const {query, key} = table; const params = { schema: data.schema, table: data.table, }; async function dbQuery(id) { const myParams = Object.assign({id}, params); if (consumer.conf.debug) console.debug('SQL:'.blue, query, myParams); if (!consumer.conf.testMode) await consumer.db.query(query, myParams); } if (data.eventName === 'updaterows') { const keyChanged = data.cols.indexOf(key) !== -1; for (const row of data.rows) { if (keyChanged) await dbQuery(row.before[key]); await dbQuery(row.after[key]); } } else for (const row of data.rows) await dbQuery(row[key]); await this.channel.ack(msg); } }