feat: refs #4685 AMQ queue prefix, stock queue remove isDelivered
gitea/mycdc/pipeline/head This commit looks good Details

This commit is contained in:
Juan Ferrer 2025-04-02 10:04:14 +02:00
parent 6f950239a8
commit c899e74465
6 changed files with 22 additions and 16 deletions

View File

@ -4,6 +4,7 @@ defaults:
mode: fk mode: fk
flushInterval: 5000 flushInterval: 5000
amqpPrefetch: 100 amqpPrefetch: 100
amqPrefix: cdc
amqp: amqp://user:password@localhost:5672 amqp: amqp://user:password@localhost:5672
db: db:
host: localhost host: localhost

View File

@ -2,6 +2,7 @@ code: mycdc
debug: false debug: false
testMode: false testMode: false
deleteNonEmpty: false deleteNonEmpty: false
amqPrefix: cdc
amqp: amqp://user:password@localhost:5672 amqp: amqp://user:password@localhost:5672
pingInterval: 60 pingInterval: 60
flushInterval: 10 flushInterval: 10

View File

@ -6,12 +6,15 @@ module.exports = class Queue {
async consume() { async consume() {
const channel = await this.consumer.amqpConn.createChannel(); const channel = await this.consumer.amqpConn.createChannel();
channel.prefetch(this.conf.amqpPrefetch); channel.prefetch(this.conf.amqpPrefetch);
await channel.assertQueue(this.name, { const {amqPrefix} = this.consumer.conf;
const amqQueue = `${amqPrefix}.${this.name}`;
await channel.assertQueue(amqQueue, {
durable: true durable: true
}); });
this.channel = channel; this.channel = channel;
await channel.consume(this.name, await channel.consume(amqQueue,
msg => this.onConsume(msg)); msg => this.onConsume(msg));
} }
} }

View File

@ -147,17 +147,19 @@ module.exports = class MyCDC {
this.publisher = await amqp.connect(conf.amqp); this.publisher = await amqp.connect(conf.amqp);
const channel = this.channel = await this.publisher.createChannel(); const channel = this.channel = await this.publisher.createChannel();
const {amqPrefix} = conf;
for (const tableMap of this.schemaMap.values()) { for (const tableMap of this.schemaMap.values()) {
for (const tableName of tableMap.keys()) { for (const tableName of tableMap.keys()) {
await channel.assertExchange(tableName, 'headers', { await channel.assertExchange(`${amqPrefix}.${tableName}`, 'headers', {
durable: true durable: true
}); });
} }
} }
for (const queueName in this.queuesConf) { for (const queueName in this.queuesConf) {
const amqQueue = `${amqPrefix}.${queueName}`;
const options = conf.deleteNonEmpty ? {} : {ifEmpty: true}; const options = conf.deleteNonEmpty ? {} : {ifEmpty: true};
await channel.deleteQueue(queueName, {options}); await channel.deleteQueue(amqQueue, {options});
await channel.assertQueue(queueName, { await channel.assertQueue(amqQueue, {
durable: true durable: true
}); });
@ -167,15 +169,15 @@ module.exports = class MyCDC {
for (const tableName in schema) { for (const tableName in schema) {
const table = schema[tableName]; const table = schema[tableName];
const events = table.events || allEvents; const events = table.events || allEvents;
let args = {'x-match': 'any'};
for (const event of events) { for (const event of events) {
let args; if (event === 'updaterows' && table.columns)
if (event === 'updaterows' && table.columns) {
args = {'x-match': 'any'};
table.columns.map(c => args[c] = true); table.columns.map(c => args[c] = true);
} else else
args = {'z-event': event}; args[`z-${event}`] = true;
await channel.bindQueue(queueName, tableName, '', args);
} }
await channel.bindQueue(amqQueue,
`${amqPrefix}.${tableName}`, '', args);
} }
} }
} }
@ -392,7 +394,7 @@ module.exports = class MyCDC {
}; };
let headers = {}; let headers = {};
headers['z-event'] = eventName; headers[`z-${eventName}`] = true;
if (isUpdate) { if (isUpdate) {
for (const col of cols) for (const col of cols)
headers[col] = true; headers[col] = true;
@ -404,8 +406,9 @@ module.exports = class MyCDC {
headers headers
}; };
const {amqPrefix} = this.conf;
const jsonData = JSON.stringify(data); const jsonData = JSON.stringify(data);
this.channel.publish(tableName, '', this.channel.publish(`${amqPrefix}.${tableName}`, '',
Buffer.from(jsonData), options); Buffer.from(jsonData), options);
if (this.debug) { if (this.debug) {

View File

@ -1,6 +1,6 @@
{ {
"name": "mycdc", "name": "mycdc",
"version": "0.0.27", "version": "0.0.28",
"author": "Verdnatura Levante SL", "author": "Verdnatura Levante SL",
"description": "Asynchronous DB calculations reading the binary log", "description": "Asynchronous DB calculations reading the binary log",
"license": "GPL-3.0", "license": "GPL-3.0",

View File

@ -16,7 +16,6 @@ includeSchema:
- shipped - shipped
- warehouseInFk - warehouseInFk
- warehouseOutFk - warehouseOutFk
- isDelivered
- isReceived - isReceived
events: events:
- updaterows - updaterows
@ -55,7 +54,6 @@ includeSchema:
- itemFk - itemFk
- quantity - quantity
- created - created
- isPicked
hedera: hedera:
order: order:
key: id key: id