Compare commits

...

3 Commits
dev ... master

Author SHA1 Message Date
Juan Ferrer c899e74465 feat: refs #4685 AMQ queue prefix, stock queue remove isDelivered
gitea/mycdc/pipeline/head This commit looks good Details
2025-04-02 10:04:14 +02:00
Juan Ferrer 6f950239a8 fix: refs #4685 Watch stock.isReceived
gitea/mycdc/pipeline/head This commit looks good Details
2025-03-31 18:02:15 +02:00
Juan Ferrer 7e71690210 refs #4685 Fields for queue stock updated
gitea/mycdc/pipeline/head This commit looks good Details
2025-03-28 10:21:01 +01:00
6 changed files with 25 additions and 23 deletions

View File

@ -4,6 +4,7 @@ defaults:
mode: fk mode: fk
flushInterval: 5000 flushInterval: 5000
amqpPrefetch: 100 amqpPrefetch: 100
amqPrefix: cdc
amqp: amqp://user:password@localhost:5672 amqp: amqp://user:password@localhost:5672
db: db:
host: localhost host: localhost

View File

@ -2,6 +2,7 @@ code: mycdc
debug: false debug: false
testMode: false testMode: false
deleteNonEmpty: false deleteNonEmpty: false
amqPrefix: cdc
amqp: amqp://user:password@localhost:5672 amqp: amqp://user:password@localhost:5672
pingInterval: 60 pingInterval: 60
flushInterval: 10 flushInterval: 10

View File

@ -6,12 +6,15 @@ module.exports = class Queue {
async consume() { async consume() {
const channel = await this.consumer.amqpConn.createChannel(); const channel = await this.consumer.amqpConn.createChannel();
channel.prefetch(this.conf.amqpPrefetch); channel.prefetch(this.conf.amqpPrefetch);
await channel.assertQueue(this.name, { const {amqPrefix} = this.consumer.conf;
const amqQueue = `${amqPrefix}.${this.name}`;
await channel.assertQueue(amqQueue, {
durable: true durable: true
}); });
this.channel = channel; this.channel = channel;
await channel.consume(this.name, await channel.consume(amqQueue,
msg => this.onConsume(msg)); msg => this.onConsume(msg));
} }
} }

View File

@ -147,17 +147,19 @@ module.exports = class MyCDC {
this.publisher = await amqp.connect(conf.amqp); this.publisher = await amqp.connect(conf.amqp);
const channel = this.channel = await this.publisher.createChannel(); const channel = this.channel = await this.publisher.createChannel();
const {amqPrefix} = conf;
for (const tableMap of this.schemaMap.values()) { for (const tableMap of this.schemaMap.values()) {
for (const tableName of tableMap.keys()) { for (const tableName of tableMap.keys()) {
await channel.assertExchange(tableName, 'headers', { await channel.assertExchange(`${amqPrefix}.${tableName}`, 'headers', {
durable: true durable: true
}); });
} }
} }
for (const queueName in this.queuesConf) { for (const queueName in this.queuesConf) {
const amqQueue = `${amqPrefix}.${queueName}`;
const options = conf.deleteNonEmpty ? {} : {ifEmpty: true}; const options = conf.deleteNonEmpty ? {} : {ifEmpty: true};
await channel.deleteQueue(queueName, {options}); await channel.deleteQueue(amqQueue, {options});
await channel.assertQueue(queueName, { await channel.assertQueue(amqQueue, {
durable: true durable: true
}); });
@ -167,15 +169,15 @@ module.exports = class MyCDC {
for (const tableName in schema) { for (const tableName in schema) {
const table = schema[tableName]; const table = schema[tableName];
const events = table.events || allEvents; const events = table.events || allEvents;
let args = {'x-match': 'any'};
for (const event of events) { for (const event of events) {
let args; if (event === 'updaterows' && table.columns)
if (event === 'updaterows' && table.columns) {
args = {'x-match': 'any'};
table.columns.map(c => args[c] = true); table.columns.map(c => args[c] = true);
} else else
args = {'z-event': event}; args[`z-${event}`] = true;
await channel.bindQueue(queueName, tableName, '', args);
} }
await channel.bindQueue(amqQueue,
`${amqPrefix}.${tableName}`, '', args);
} }
} }
} }
@ -392,7 +394,7 @@ module.exports = class MyCDC {
}; };
let headers = {}; let headers = {};
headers['z-event'] = eventName; headers[`z-${eventName}`] = true;
if (isUpdate) { if (isUpdate) {
for (const col of cols) for (const col of cols)
headers[col] = true; headers[col] = true;
@ -404,8 +406,9 @@ module.exports = class MyCDC {
headers headers
}; };
const {amqPrefix} = this.conf;
const jsonData = JSON.stringify(data); const jsonData = JSON.stringify(data);
this.channel.publish(tableName, '', this.channel.publish(`${amqPrefix}.${tableName}`, '',
Buffer.from(jsonData), options); Buffer.from(jsonData), options);
if (this.debug) { if (this.debug) {

View File

@ -1,6 +1,6 @@
{ {
"name": "mycdc", "name": "mycdc",
"version": "0.0.25", "version": "0.0.28",
"author": "Verdnatura Levante SL", "author": "Verdnatura Levante SL",
"description": "Asynchronous DB calculations reading the binary log", "description": "Asynchronous DB calculations reading the binary log",
"license": "GPL-3.0", "license": "GPL-3.0",

View File

@ -12,13 +12,11 @@ includeSchema:
key: id key: id
columns: columns:
- id - id
- landed - availabled
- shipped - shipped
- landingHour
- warehouseInFk - warehouseInFk
- warehouseOutFk - warehouseOutFk
- isReceived - isReceived
- isRaid
events: events:
- updaterows - updaterows
entry: entry:
@ -26,6 +24,7 @@ includeSchema:
columns: columns:
- id - id
- travelFk - travelFk
- isExcludedFromAvailable
events: events:
- updaterows - updaterows
buy: buy:
@ -37,13 +36,13 @@ includeSchema:
- quantity - quantity
- life - life
- isAlive - isAlive
- created
ticket: ticket:
key: id key: id
columns: columns:
- id - id
- warehouseFk - warehouseFk
- shipped - shipped
- landed
- isAlive - isAlive
events: events:
- updaterows - updaterows
@ -55,16 +54,11 @@ includeSchema:
- itemFk - itemFk
- quantity - quantity
- created - created
- isPicked
hedera: hedera:
order: order:
key: id key: id
columns: columns:
- id - id
- date_send
- address_id
- company_id
- customer_id
- confirmed - confirmed
events: events:
- updaterows - updaterows