From c899e744656e7a7393add29fe9b9c2a5f45abf4e Mon Sep 17 00:00:00 2001 From: Juan Ferrer Toribio Date: Wed, 2 Apr 2025 10:04:14 +0200 Subject: [PATCH] feat: refs #4685 AMQ queue prefix, stock queue remove isDelivered --- config/consumer.yml | 1 + config/producer.yml | 1 + lib/queue.js | 7 +++++-- mycdc.js | 25 ++++++++++++++----------- package.json | 2 +- queues/stock.yml | 2 -- 6 files changed, 22 insertions(+), 16 deletions(-) diff --git a/config/consumer.yml b/config/consumer.yml index 97d7e8e..bd6d3f8 100644 --- a/config/consumer.yml +++ b/config/consumer.yml @@ -4,6 +4,7 @@ defaults: mode: fk flushInterval: 5000 amqpPrefetch: 100 +amqPrefix: cdc amqp: amqp://user:password@localhost:5672 db: host: localhost diff --git a/config/producer.yml b/config/producer.yml index 848e8a1..71fc4ed 100644 --- a/config/producer.yml +++ b/config/producer.yml @@ -2,6 +2,7 @@ code: mycdc debug: false testMode: false deleteNonEmpty: false +amqPrefix: cdc amqp: amqp://user:password@localhost:5672 pingInterval: 60 flushInterval: 10 diff --git a/lib/queue.js b/lib/queue.js index 29d45e6..469201b 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -6,12 +6,15 @@ module.exports = class Queue { async consume() { const channel = await this.consumer.amqpConn.createChannel(); channel.prefetch(this.conf.amqpPrefetch); - await channel.assertQueue(this.name, { + const {amqPrefix} = this.consumer.conf; + const amqQueue = `${amqPrefix}.${this.name}`; + + await channel.assertQueue(amqQueue, { durable: true }); this.channel = channel; - await channel.consume(this.name, + await channel.consume(amqQueue, msg => this.onConsume(msg)); } } diff --git a/mycdc.js b/mycdc.js index 64e93ed..057d14c 100644 --- a/mycdc.js +++ b/mycdc.js @@ -147,17 +147,19 @@ module.exports = class MyCDC { this.publisher = await amqp.connect(conf.amqp); const channel = this.channel = await this.publisher.createChannel(); + const {amqPrefix} = conf; for (const tableMap of this.schemaMap.values()) { for (const tableName of tableMap.keys()) { - await channel.assertExchange(tableName, 'headers', { + await channel.assertExchange(`${amqPrefix}.${tableName}`, 'headers', { durable: true }); } } for (const queueName in this.queuesConf) { + const amqQueue = `${amqPrefix}.${queueName}`; const options = conf.deleteNonEmpty ? {} : {ifEmpty: true}; - await channel.deleteQueue(queueName, {options}); - await channel.assertQueue(queueName, { + await channel.deleteQueue(amqQueue, {options}); + await channel.assertQueue(amqQueue, { durable: true }); @@ -167,15 +169,15 @@ module.exports = class MyCDC { for (const tableName in schema) { const table = schema[tableName]; const events = table.events || allEvents; + let args = {'x-match': 'any'}; for (const event of events) { - let args; - if (event === 'updaterows' && table.columns) { - args = {'x-match': 'any'}; + if (event === 'updaterows' && table.columns) table.columns.map(c => args[c] = true); - } else - args = {'z-event': event}; - await channel.bindQueue(queueName, tableName, '', args); + else + args[`z-${event}`] = true; } + await channel.bindQueue(amqQueue, + `${amqPrefix}.${tableName}`, '', args); } } } @@ -392,7 +394,7 @@ module.exports = class MyCDC { }; let headers = {}; - headers['z-event'] = eventName; + headers[`z-${eventName}`] = true; if (isUpdate) { for (const col of cols) headers[col] = true; @@ -404,8 +406,9 @@ module.exports = class MyCDC { headers }; + const {amqPrefix} = this.conf; const jsonData = JSON.stringify(data); - this.channel.publish(tableName, '', + this.channel.publish(`${amqPrefix}.${tableName}`, '', Buffer.from(jsonData), options); if (this.debug) { diff --git a/package.json b/package.json index bb6ca6f..02fa40b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "mycdc", - "version": "0.0.27", + "version": "0.0.28", "author": "Verdnatura Levante SL", "description": "Asynchronous DB calculations reading the binary log", "license": "GPL-3.0", diff --git a/queues/stock.yml b/queues/stock.yml index 7ff911f..1d054f9 100644 --- a/queues/stock.yml +++ b/queues/stock.yml @@ -16,7 +16,6 @@ includeSchema: - shipped - warehouseInFk - warehouseOutFk - - isDelivered - isReceived events: - updaterows @@ -55,7 +54,6 @@ includeSchema: - itemFk - quantity - created - - isPicked hedera: order: key: id