Compare commits

..

4 Commits
master ... dev

Author SHA1 Message Date
Juan Ferrer 00b6f8cc6e Merge branch 'dev' of https://gitea.verdnatura.es/verdnatura/mycdc into dev
gitea/mycdc/pipeline/head This commit looks good Details
2025-02-26 18:38:16 +01:00
Juan Ferrer 9a45cf3baf refs #4685 Jenkinsfile empty line removed 2025-02-26 18:35:04 +01:00
Juan Ferrer 8b64744e73 Merge pull request 'test' (!9) from test into dev
gitea/mycdc/pipeline/head This commit looks good Details
Reviewed-on: #9
2025-02-26 17:19:27 +00:00
Juan Ferrer 19489da545 Merge pull request 'refs #4685 Jenkinsfile deploy branch fix' (!8) from master into test
gitea/mycdc/pipeline/head This commit looks good Details
gitea/mycdc/pipeline/pr-dev Build started... Details
Reviewed-on: #8
2025-02-26 17:19:02 +00:00
7 changed files with 23 additions and 26 deletions

1
Jenkinsfile vendored
View File

@ -12,7 +12,6 @@ node {
'beta'
].contains(env.BRANCH_NAME)
// https://www.jenkins.io/doc/book/pipeline/jenkinsfile/#using-environment-variables
echo "NODE_NAME: ${env.NODE_NAME}"
echo "WORKSPACE: ${env.WORKSPACE}"

View File

@ -4,7 +4,6 @@ defaults:
mode: fk
flushInterval: 5000
amqpPrefetch: 100
amqPrefix: cdc
amqp: amqp://user:password@localhost:5672
db:
host: localhost

View File

@ -2,7 +2,6 @@ code: mycdc
debug: false
testMode: false
deleteNonEmpty: false
amqPrefix: cdc
amqp: amqp://user:password@localhost:5672
pingInterval: 60
flushInterval: 10

View File

@ -6,15 +6,12 @@ module.exports = class Queue {
async consume() {
const channel = await this.consumer.amqpConn.createChannel();
channel.prefetch(this.conf.amqpPrefetch);
const {amqPrefix} = this.consumer.conf;
const amqQueue = `${amqPrefix}.${this.name}`;
await channel.assertQueue(amqQueue, {
await channel.assertQueue(this.name, {
durable: true
});
this.channel = channel;
await channel.consume(amqQueue,
await channel.consume(this.name,
msg => this.onConsume(msg));
}
}

View File

@ -147,19 +147,17 @@ module.exports = class MyCDC {
this.publisher = await amqp.connect(conf.amqp);
const channel = this.channel = await this.publisher.createChannel();
const {amqPrefix} = conf;
for (const tableMap of this.schemaMap.values()) {
for (const tableName of tableMap.keys()) {
await channel.assertExchange(`${amqPrefix}.${tableName}`, 'headers', {
await channel.assertExchange(tableName, 'headers', {
durable: true
});
}
}
for (const queueName in this.queuesConf) {
const amqQueue = `${amqPrefix}.${queueName}`;
const options = conf.deleteNonEmpty ? {} : {ifEmpty: true};
await channel.deleteQueue(amqQueue, {options});
await channel.assertQueue(amqQueue, {
await channel.deleteQueue(queueName, {options});
await channel.assertQueue(queueName, {
durable: true
});
@ -169,15 +167,15 @@ module.exports = class MyCDC {
for (const tableName in schema) {
const table = schema[tableName];
const events = table.events || allEvents;
let args = {'x-match': 'any'};
for (const event of events) {
if (event === 'updaterows' && table.columns)
let args;
if (event === 'updaterows' && table.columns) {
args = {'x-match': 'any'};
table.columns.map(c => args[c] = true);
else
args[`z-${event}`] = true;
} else
args = {'z-event': event};
await channel.bindQueue(queueName, tableName, '', args);
}
await channel.bindQueue(amqQueue,
`${amqPrefix}.${tableName}`, '', args);
}
}
}
@ -394,7 +392,7 @@ module.exports = class MyCDC {
};
let headers = {};
headers[`z-${eventName}`] = true;
headers['z-event'] = eventName;
if (isUpdate) {
for (const col of cols)
headers[col] = true;
@ -406,9 +404,8 @@ module.exports = class MyCDC {
headers
};
const {amqPrefix} = this.conf;
const jsonData = JSON.stringify(data);
this.channel.publish(`${amqPrefix}.${tableName}`, '',
this.channel.publish(tableName, '',
Buffer.from(jsonData), options);
if (this.debug) {

View File

@ -1,6 +1,6 @@
{
"name": "mycdc",
"version": "0.0.28",
"version": "0.0.25",
"author": "Verdnatura Levante SL",
"description": "Asynchronous DB calculations reading the binary log",
"license": "GPL-3.0",

View File

@ -12,11 +12,13 @@ includeSchema:
key: id
columns:
- id
- availabled
- landed
- shipped
- landingHour
- warehouseInFk
- warehouseOutFk
- isReceived
- isRaid
events:
- updaterows
entry:
@ -24,7 +26,6 @@ includeSchema:
columns:
- id
- travelFk
- isExcludedFromAvailable
events:
- updaterows
buy:
@ -36,13 +37,13 @@ includeSchema:
- quantity
- life
- isAlive
- created
ticket:
key: id
columns:
- id
- warehouseFk
- shipped
- landed
- isAlive
events:
- updaterows
@ -54,11 +55,16 @@ includeSchema:
- itemFk
- quantity
- created
- isPicked
hedera:
order:
key: id
columns:
- id
- date_send
- address_id
- company_id
- customer_id
- confirmed
events:
- updaterows