diff --git a/config/consumer.yml b/config/consumer.yml index 9524fd7..97d7e8e 100644 --- a/config/consumer.yml +++ b/config/consumer.yml @@ -3,6 +3,7 @@ testMode: false defaults: mode: fk flushInterval: 5000 + amqpPrefetch: 100 amqp: amqp://user:password@localhost:5672 db: host: localhost diff --git a/lib/queue-fk.js b/lib/queue-fk.js index 5d22d2f..d24b92c 100644 --- a/lib/queue-fk.js +++ b/lib/queue-fk.js @@ -30,7 +30,7 @@ module.exports = class QueueFk extends Queue { const {messages} = this; if (messages.length) { - const {consumer, scopes} = this; + const {consumer, scopes, channel} = this; this.reset(); if (consumer.conf.debug) @@ -49,19 +49,21 @@ module.exports = class QueueFk extends Queue { } } - for (const message of messages) - await this.channel.ack(message); + const promises = []; + for (const msg of messages) + promises.push(channel.ack(msg)); + await Promise.all(promises); } catch(err) { - for (const message of messages) - await this.channel.nack(message); - console.error(err); + for (const msg of messages) + await channel.nack(msg); + throw err; } } this.flush(this.conf.flushInterval); } - async onConsume(msg) { + onConsume(msg) { const {conf} = this; const consumer = this.consumer; const data = JSON.parse(msg.content.toString()); @@ -86,7 +88,7 @@ module.exports = class QueueFk extends Queue { ids.add(row[key]); this.messages.push(msg); - if (this.messages.length == consumer.conf.amqpPrefetch) + if (this.messages.length == conf.amqpPrefetch) this.flush(); } } diff --git a/package.json b/package.json index ecc4094..fa89ccd 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "mycdc", - "version": "0.0.12", + "version": "0.0.13", "author": "Verdnatura Levante SL", "description": "Asynchronous DB calculations reading the binary log", "license": "GPL-3.0",