Compare commits

...

3 Commits
dev ... master

Author SHA1 Message Date
Juan Ferrer c899e74465 feat: refs #4685 AMQ queue prefix, stock queue remove isDelivered
gitea/mycdc/pipeline/head This commit looks good Details
2025-04-02 10:04:14 +02:00
Juan Ferrer 6f950239a8 fix: refs #4685 Watch stock.isReceived
gitea/mycdc/pipeline/head This commit looks good Details
2025-03-31 18:02:15 +02:00
Juan Ferrer 7e71690210 refs #4685 Fields for queue stock updated
gitea/mycdc/pipeline/head This commit looks good Details
2025-03-28 10:21:01 +01:00
6 changed files with 25 additions and 23 deletions

View File

@ -4,6 +4,7 @@ defaults:
mode: fk
flushInterval: 5000
amqpPrefetch: 100
amqPrefix: cdc
amqp: amqp://user:password@localhost:5672
db:
host: localhost

View File

@ -2,6 +2,7 @@ code: mycdc
debug: false
testMode: false
deleteNonEmpty: false
amqPrefix: cdc
amqp: amqp://user:password@localhost:5672
pingInterval: 60
flushInterval: 10

View File

@ -6,12 +6,15 @@ module.exports = class Queue {
async consume() {
const channel = await this.consumer.amqpConn.createChannel();
channel.prefetch(this.conf.amqpPrefetch);
await channel.assertQueue(this.name, {
const {amqPrefix} = this.consumer.conf;
const amqQueue = `${amqPrefix}.${this.name}`;
await channel.assertQueue(amqQueue, {
durable: true
});
this.channel = channel;
await channel.consume(this.name,
await channel.consume(amqQueue,
msg => this.onConsume(msg));
}
}

View File

@ -147,17 +147,19 @@ module.exports = class MyCDC {
this.publisher = await amqp.connect(conf.amqp);
const channel = this.channel = await this.publisher.createChannel();
const {amqPrefix} = conf;
for (const tableMap of this.schemaMap.values()) {
for (const tableName of tableMap.keys()) {
await channel.assertExchange(tableName, 'headers', {
await channel.assertExchange(`${amqPrefix}.${tableName}`, 'headers', {
durable: true
});
}
}
for (const queueName in this.queuesConf) {
const amqQueue = `${amqPrefix}.${queueName}`;
const options = conf.deleteNonEmpty ? {} : {ifEmpty: true};
await channel.deleteQueue(queueName, {options});
await channel.assertQueue(queueName, {
await channel.deleteQueue(amqQueue, {options});
await channel.assertQueue(amqQueue, {
durable: true
});
@ -167,15 +169,15 @@ module.exports = class MyCDC {
for (const tableName in schema) {
const table = schema[tableName];
const events = table.events || allEvents;
let args = {'x-match': 'any'};
for (const event of events) {
let args;
if (event === 'updaterows' && table.columns) {
args = {'x-match': 'any'};
if (event === 'updaterows' && table.columns)
table.columns.map(c => args[c] = true);
} else
args = {'z-event': event};
await channel.bindQueue(queueName, tableName, '', args);
else
args[`z-${event}`] = true;
}
await channel.bindQueue(amqQueue,
`${amqPrefix}.${tableName}`, '', args);
}
}
}
@ -392,7 +394,7 @@ module.exports = class MyCDC {
};
let headers = {};
headers['z-event'] = eventName;
headers[`z-${eventName}`] = true;
if (isUpdate) {
for (const col of cols)
headers[col] = true;
@ -404,8 +406,9 @@ module.exports = class MyCDC {
headers
};
const {amqPrefix} = this.conf;
const jsonData = JSON.stringify(data);
this.channel.publish(tableName, '',
this.channel.publish(`${amqPrefix}.${tableName}`, '',
Buffer.from(jsonData), options);
if (this.debug) {

View File

@ -1,6 +1,6 @@
{
"name": "mycdc",
"version": "0.0.25",
"version": "0.0.28",
"author": "Verdnatura Levante SL",
"description": "Asynchronous DB calculations reading the binary log",
"license": "GPL-3.0",

View File

@ -12,13 +12,11 @@ includeSchema:
key: id
columns:
- id
- landed
- availabled
- shipped
- landingHour
- warehouseInFk
- warehouseOutFk
- isReceived
- isRaid
events:
- updaterows
entry:
@ -26,6 +24,7 @@ includeSchema:
columns:
- id
- travelFk
- isExcludedFromAvailable
events:
- updaterows
buy:
@ -37,13 +36,13 @@ includeSchema:
- quantity
- life
- isAlive
- created
ticket:
key: id
columns:
- id
- warehouseFk
- shipped
- landed
- isAlive
events:
- updaterows
@ -55,16 +54,11 @@ includeSchema:
- itemFk
- quantity
- created
- isPicked
hedera:
order:
key: id
columns:
- id
- date_send
- address_id
- company_id
- customer_id
- confirmed
events:
- updaterows