const Queue = require('./queue'); module.exports = class QueueFk extends Queue { constructor(consumer, name, conf) { super(consumer, name, conf); this.reset(); } async consume() { await super.consume(); this.flush(); } reset() { this.lastMessage = null; this.nMessages = 0; this.scopes = new Map(); } flush(flushInterval) { if (this.timeout) { clearTimeout(this.timeout); this.timeout = null; } this.timeout = setTimeout( () => this.onFlushTimeout(), flushInterval); } async onFlushTimeout() { if (this.nMessages) { const {consumer} = this; const scopes = this.scopes; const lastMessage = this.lastMessage; this.reset(); if (consumer.conf.debug) console.debug('Flush:'.blue, this.name.yellow, scopes); try { for (const [scope, ids] of scopes) { let query = this.conf.query[scope]; for (const id of ids) { const params = {id}; //query = consumer.db.format(query, params); if (consumer.conf.debug) console.debug('SQL:'.blue, query, params); if (!consumer.conf.testMode) await consumer.db.query(query, params); } } await this.channel.ack(lastMessage, true); } catch(err) { await this.channel.nack(lastMessage, true); console.error(err); } } this.flush(this.conf.flushInterval); } async onConsume(msg) { const {conf} = this; const consumer = this.consumer; const data = JSON.parse(msg.content.toString()); if (consumer.conf.debug) console.debug('Message:'.blue, this.name.yellow, data.table); const tableConf = conf.includeSchema[data.schema][data.table]; const scope = tableConf.scope ?? data.table; let ids = this.scopes.get(scope); if (!ids) this.scopes.set(scope, ids = new Set()); const key = tableConf.key; if (data.eventName === 'updaterows') { for (const row of data.rows) { ids.add(row.before[key]); ids.add(row.after[key]); } } else for (const row of data.rows) ids.add(row[key]); this.nMessages++; this.lastMessage = msg; if (this.nMessages == consumer.conf.amqpPrefetch) this.flush(); } }