From 0d937cef67aff0ff697aadf3ac060c8d29e7f741 Mon Sep 17 00:00:00 2001 From: Juan Ferrer Toribio Date: Tue, 7 May 2024 11:56:42 +0200 Subject: [PATCH] fix: refs #4409 AMQ messages ACK fixes --- lib/queue-fk.js | 21 ++++++++++----------- package.json | 2 +- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/lib/queue-fk.js b/lib/queue-fk.js index d241d1f..5d22d2f 100644 --- a/lib/queue-fk.js +++ b/lib/queue-fk.js @@ -12,7 +12,7 @@ module.exports = class QueueFk extends Queue { } reset() { - this.lastMessage = null; + this.messages = []; this.nMessages = 0; this.scopes = new Map(); } @@ -27,11 +27,10 @@ module.exports = class QueueFk extends Queue { } async onFlushTimeout() { - if (this.nMessages) { - const {consumer} = this; + const {messages} = this; - const scopes = this.scopes; - const lastMessage = this.lastMessage; + if (messages.length) { + const {consumer, scopes} = this; this.reset(); if (consumer.conf.debug) @@ -50,9 +49,11 @@ module.exports = class QueueFk extends Queue { } } - await this.channel.ack(lastMessage, true); + for (const message of messages) + await this.channel.ack(message); } catch(err) { - await this.channel.nack(lastMessage, true); + for (const message of messages) + await this.channel.nack(message); console.error(err); } } @@ -84,10 +85,8 @@ module.exports = class QueueFk extends Queue { for (const row of data.rows) ids.add(row[key]); - this.nMessages++; - this.lastMessage = msg; - - if (this.nMessages == consumer.conf.amqpPrefetch) + this.messages.push(msg); + if (this.messages.length == consumer.conf.amqpPrefetch) this.flush(); } } diff --git a/package.json b/package.json index 791a6f3..ecc4094 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "mycdc", - "version": "0.0.11", + "version": "0.0.12", "author": "Verdnatura Levante SL", "description": "Asynchronous DB calculations reading the binary log", "license": "GPL-3.0",