fix: refs #4409 AMQ messages ACK fixes
gitea/mycdc/pipeline/head This commit looks good Details

This commit is contained in:
Juan Ferrer 2024-05-07 11:56:42 +02:00
parent ba2ad3bbd2
commit 0d937cef67
2 changed files with 11 additions and 12 deletions

View File

@ -12,7 +12,7 @@ module.exports = class QueueFk extends Queue {
}
reset() {
this.lastMessage = null;
this.messages = [];
this.nMessages = 0;
this.scopes = new Map();
}
@ -27,11 +27,10 @@ module.exports = class QueueFk extends Queue {
}
async onFlushTimeout() {
if (this.nMessages) {
const {consumer} = this;
const {messages} = this;
const scopes = this.scopes;
const lastMessage = this.lastMessage;
if (messages.length) {
const {consumer, scopes} = this;
this.reset();
if (consumer.conf.debug)
@ -50,9 +49,11 @@ module.exports = class QueueFk extends Queue {
}
}
await this.channel.ack(lastMessage, true);
for (const message of messages)
await this.channel.ack(message);
} catch(err) {
await this.channel.nack(lastMessage, true);
for (const message of messages)
await this.channel.nack(message);
console.error(err);
}
}
@ -84,10 +85,8 @@ module.exports = class QueueFk extends Queue {
for (const row of data.rows)
ids.add(row[key]);
this.nMessages++;
this.lastMessage = msg;
if (this.nMessages == consumer.conf.amqpPrefetch)
this.messages.push(msg);
if (this.messages.length == consumer.conf.amqpPrefetch)
this.flush();
}
}

View File

@ -1,6 +1,6 @@
{
"name": "mycdc",
"version": "0.0.11",
"version": "0.0.12",
"author": "Verdnatura Levante SL",
"description": "Asynchronous DB calculations reading the binary log",
"license": "GPL-3.0",