module.exports = class Queue { constructor(consumer, name, conf) { Object.assign(this, {consumer, name, conf}); this.reset(); } async consume() { if (this.conf.mode !== 'fk') { console.warn(`Ignoring queue '${this.name} with unknown mode '${this.conf.mode}'`); return; } const channel = await this.consumer.amqpConn.createChannel(); channel.prefetch(this.conf.amqpPrefetch); await channel.assertQueue(this.name, { durable: true }); this.channel = channel; await channel.consume(this.name, msg => this.onConsume(msg)); this.flush(); } reset() { this.lastMessage = null; this.nMessages = 0; this.ids = new Set(); } flush(flushInterval) { if (this.timeout) { clearTimeout(this.timeout); this.timeout = null; } this.timeout = setTimeout( () => this.onFlushTimeout(), flushInterval); } async onFlushTimeout() { const consumer = this.consumer; if (this.ids.size) { if (consumer.conf.debug) console.debug('Flush:'.blue, this.name.yellow, this.ids); const ids = Array.from(this.ids); const lastMessage = this.lastMessage; this.reset(); try { for (const id of ids) { const sql = consumer.db.format(this.conf.query, id); consumer.debug('SQL', sql); if (!consumer.conf.testMode) await consumer.db.query(sql); } await this.channel.ack(lastMessage, true); } catch(err) { await this.channel.nack(lastMessage, true); console.error(err); } } this.flush(this.conf.flushInterval); } async onConsume(msg) { const consumer = this.consumer; const data = JSON.parse(msg.content.toString()); if (consumer.conf.debug) console.debug('Message:'.blue, this.name.yellow, data.table); const ids = this.ids; const key = this.conf.includeSchema[data.schema][data.table].key; if (data.eventName === 'updaterows') { for (const row of data.rows) { ids.add(row.before[key]); ids.add(row.after[key]); } } else for (const row of data.rows) ids.add(row[key]); this.nMessages++; this.lastMessage = msg; if (this.nMessages == consumer.conf.amqpPrefetch) this.flush(); } }