fix: refs #4409 AMQ messages ACK fixes
gitea/mycdc/pipeline/head This commit looks good Details

This commit is contained in:
Juan Ferrer 2024-05-07 13:04:07 +02:00
parent 0d937cef67
commit f8502e3026
3 changed files with 12 additions and 9 deletions

View File

@ -3,6 +3,7 @@ testMode: false
defaults:
mode: fk
flushInterval: 5000
amqpPrefetch: 100
amqp: amqp://user:password@localhost:5672
db:
host: localhost

View File

@ -30,7 +30,7 @@ module.exports = class QueueFk extends Queue {
const {messages} = this;
if (messages.length) {
const {consumer, scopes} = this;
const {consumer, scopes, channel} = this;
this.reset();
if (consumer.conf.debug)
@ -49,19 +49,21 @@ module.exports = class QueueFk extends Queue {
}
}
for (const message of messages)
await this.channel.ack(message);
const promises = [];
for (const msg of messages)
promises.push(channel.ack(msg));
await Promise.all(promises);
} catch(err) {
for (const message of messages)
await this.channel.nack(message);
console.error(err);
for (const msg of messages)
await channel.nack(msg);
throw err;
}
}
this.flush(this.conf.flushInterval);
}
async onConsume(msg) {
onConsume(msg) {
const {conf} = this;
const consumer = this.consumer;
const data = JSON.parse(msg.content.toString());
@ -86,7 +88,7 @@ module.exports = class QueueFk extends Queue {
ids.add(row[key]);
this.messages.push(msg);
if (this.messages.length == consumer.conf.amqpPrefetch)
if (this.messages.length == conf.amqpPrefetch)
this.flush();
}
}

View File

@ -1,6 +1,6 @@
{
"name": "mycdc",
"version": "0.0.12",
"version": "0.0.13",
"author": "Verdnatura Levante SL",
"description": "Asynchronous DB calculations reading the binary log",
"license": "GPL-3.0",