const Queue = require('./queue'); module.exports = class QueueFk extends Queue { constructor(consumer, name, conf) { super(consumer, name, conf); this.reset(); } async consume() { await super.consume(); this.flush(); } reset() { this.messages = []; this.nMessages = 0; this.scopes = new Map(); } flush(flushInterval) { if (this.timeout) { clearTimeout(this.timeout); this.timeout = null; } this.timeout = setTimeout( () => this.onFlushTimeout(), flushInterval); } async onFlushTimeout() { const {messages} = this; if (messages.length) { const {consumer, scopes, channel} = this; this.reset(); if (consumer.conf.debug) console.debug('Flush:'.blue, this.name.yellow, scopes); try { for (const [scope, ids] of scopes) { let query = this.conf.query[scope]; for (const id of ids) { const params = {id}; //query = consumer.db.format(query, params); if (consumer.conf.debug) console.debug('SQL:'.blue, query, params); if (!consumer.conf.testMode) await consumer.db.query(query, params); } } const promises = []; for (const msg of messages) promises.push(channel.ack(msg)); await Promise.all(promises); } catch(err) { for (const msg of messages) await channel.nack(msg); throw err; } } this.flush(this.conf.flushInterval); } onConsume(msg) { const {conf} = this; const consumer = this.consumer; const data = JSON.parse(msg.content.toString()); if (consumer.conf.debug) console.debug('Message:'.blue, this.name.yellow, data.table); const tableConf = conf.includeSchema[data.schema][data.table]; const scope = tableConf.scope ?? data.table; let ids = this.scopes.get(scope); if (!ids) this.scopes.set(scope, ids = new Set()); const key = tableConf.key; if (data.eventName === 'updaterows') { for (const row of data.rows) { ids.add(row.before[key]); ids.add(row.after[key]); } } else for (const row of data.rows) ids.add(row[key]); this.messages.push(msg); if (this.messages.length == conf.amqpPrefetch) this.flush(); } }