Compare commits
3 Commits
Author | SHA1 | Date |
---|---|---|
|
c899e74465 | |
|
6f950239a8 | |
|
7e71690210 |
|
@ -4,6 +4,7 @@ defaults:
|
||||||
mode: fk
|
mode: fk
|
||||||
flushInterval: 5000
|
flushInterval: 5000
|
||||||
amqpPrefetch: 100
|
amqpPrefetch: 100
|
||||||
|
amqPrefix: cdc
|
||||||
amqp: amqp://user:password@localhost:5672
|
amqp: amqp://user:password@localhost:5672
|
||||||
db:
|
db:
|
||||||
host: localhost
|
host: localhost
|
||||||
|
|
|
@ -2,6 +2,7 @@ code: mycdc
|
||||||
debug: false
|
debug: false
|
||||||
testMode: false
|
testMode: false
|
||||||
deleteNonEmpty: false
|
deleteNonEmpty: false
|
||||||
|
amqPrefix: cdc
|
||||||
amqp: amqp://user:password@localhost:5672
|
amqp: amqp://user:password@localhost:5672
|
||||||
pingInterval: 60
|
pingInterval: 60
|
||||||
flushInterval: 10
|
flushInterval: 10
|
||||||
|
|
|
@ -6,12 +6,15 @@ module.exports = class Queue {
|
||||||
async consume() {
|
async consume() {
|
||||||
const channel = await this.consumer.amqpConn.createChannel();
|
const channel = await this.consumer.amqpConn.createChannel();
|
||||||
channel.prefetch(this.conf.amqpPrefetch);
|
channel.prefetch(this.conf.amqpPrefetch);
|
||||||
await channel.assertQueue(this.name, {
|
const {amqPrefix} = this.consumer.conf;
|
||||||
|
const amqQueue = `${amqPrefix}.${this.name}`;
|
||||||
|
|
||||||
|
await channel.assertQueue(amqQueue, {
|
||||||
durable: true
|
durable: true
|
||||||
});
|
});
|
||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
|
|
||||||
await channel.consume(this.name,
|
await channel.consume(amqQueue,
|
||||||
msg => this.onConsume(msg));
|
msg => this.onConsume(msg));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
25
mycdc.js
25
mycdc.js
|
@ -147,17 +147,19 @@ module.exports = class MyCDC {
|
||||||
this.publisher = await amqp.connect(conf.amqp);
|
this.publisher = await amqp.connect(conf.amqp);
|
||||||
const channel = this.channel = await this.publisher.createChannel();
|
const channel = this.channel = await this.publisher.createChannel();
|
||||||
|
|
||||||
|
const {amqPrefix} = conf;
|
||||||
for (const tableMap of this.schemaMap.values()) {
|
for (const tableMap of this.schemaMap.values()) {
|
||||||
for (const tableName of tableMap.keys()) {
|
for (const tableName of tableMap.keys()) {
|
||||||
await channel.assertExchange(tableName, 'headers', {
|
await channel.assertExchange(`${amqPrefix}.${tableName}`, 'headers', {
|
||||||
durable: true
|
durable: true
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (const queueName in this.queuesConf) {
|
for (const queueName in this.queuesConf) {
|
||||||
|
const amqQueue = `${amqPrefix}.${queueName}`;
|
||||||
const options = conf.deleteNonEmpty ? {} : {ifEmpty: true};
|
const options = conf.deleteNonEmpty ? {} : {ifEmpty: true};
|
||||||
await channel.deleteQueue(queueName, {options});
|
await channel.deleteQueue(amqQueue, {options});
|
||||||
await channel.assertQueue(queueName, {
|
await channel.assertQueue(amqQueue, {
|
||||||
durable: true
|
durable: true
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -167,15 +169,15 @@ module.exports = class MyCDC {
|
||||||
for (const tableName in schema) {
|
for (const tableName in schema) {
|
||||||
const table = schema[tableName];
|
const table = schema[tableName];
|
||||||
const events = table.events || allEvents;
|
const events = table.events || allEvents;
|
||||||
|
let args = {'x-match': 'any'};
|
||||||
for (const event of events) {
|
for (const event of events) {
|
||||||
let args;
|
if (event === 'updaterows' && table.columns)
|
||||||
if (event === 'updaterows' && table.columns) {
|
|
||||||
args = {'x-match': 'any'};
|
|
||||||
table.columns.map(c => args[c] = true);
|
table.columns.map(c => args[c] = true);
|
||||||
} else
|
else
|
||||||
args = {'z-event': event};
|
args[`z-${event}`] = true;
|
||||||
await channel.bindQueue(queueName, tableName, '', args);
|
|
||||||
}
|
}
|
||||||
|
await channel.bindQueue(amqQueue,
|
||||||
|
`${amqPrefix}.${tableName}`, '', args);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -392,7 +394,7 @@ module.exports = class MyCDC {
|
||||||
};
|
};
|
||||||
|
|
||||||
let headers = {};
|
let headers = {};
|
||||||
headers['z-event'] = eventName;
|
headers[`z-${eventName}`] = true;
|
||||||
if (isUpdate) {
|
if (isUpdate) {
|
||||||
for (const col of cols)
|
for (const col of cols)
|
||||||
headers[col] = true;
|
headers[col] = true;
|
||||||
|
@ -404,8 +406,9 @@ module.exports = class MyCDC {
|
||||||
headers
|
headers
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const {amqPrefix} = this.conf;
|
||||||
const jsonData = JSON.stringify(data);
|
const jsonData = JSON.stringify(data);
|
||||||
this.channel.publish(tableName, '',
|
this.channel.publish(`${amqPrefix}.${tableName}`, '',
|
||||||
Buffer.from(jsonData), options);
|
Buffer.from(jsonData), options);
|
||||||
|
|
||||||
if (this.debug) {
|
if (this.debug) {
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{
|
{
|
||||||
"name": "mycdc",
|
"name": "mycdc",
|
||||||
"version": "0.0.25",
|
"version": "0.0.28",
|
||||||
"author": "Verdnatura Levante SL",
|
"author": "Verdnatura Levante SL",
|
||||||
"description": "Asynchronous DB calculations reading the binary log",
|
"description": "Asynchronous DB calculations reading the binary log",
|
||||||
"license": "GPL-3.0",
|
"license": "GPL-3.0",
|
||||||
|
|
|
@ -12,13 +12,11 @@ includeSchema:
|
||||||
key: id
|
key: id
|
||||||
columns:
|
columns:
|
||||||
- id
|
- id
|
||||||
- landed
|
- availabled
|
||||||
- shipped
|
- shipped
|
||||||
- landingHour
|
|
||||||
- warehouseInFk
|
- warehouseInFk
|
||||||
- warehouseOutFk
|
- warehouseOutFk
|
||||||
- isReceived
|
- isReceived
|
||||||
- isRaid
|
|
||||||
events:
|
events:
|
||||||
- updaterows
|
- updaterows
|
||||||
entry:
|
entry:
|
||||||
|
@ -26,6 +24,7 @@ includeSchema:
|
||||||
columns:
|
columns:
|
||||||
- id
|
- id
|
||||||
- travelFk
|
- travelFk
|
||||||
|
- isExcludedFromAvailable
|
||||||
events:
|
events:
|
||||||
- updaterows
|
- updaterows
|
||||||
buy:
|
buy:
|
||||||
|
@ -37,13 +36,13 @@ includeSchema:
|
||||||
- quantity
|
- quantity
|
||||||
- life
|
- life
|
||||||
- isAlive
|
- isAlive
|
||||||
|
- created
|
||||||
ticket:
|
ticket:
|
||||||
key: id
|
key: id
|
||||||
columns:
|
columns:
|
||||||
- id
|
- id
|
||||||
- warehouseFk
|
- warehouseFk
|
||||||
- shipped
|
- shipped
|
||||||
- landed
|
|
||||||
- isAlive
|
- isAlive
|
||||||
events:
|
events:
|
||||||
- updaterows
|
- updaterows
|
||||||
|
@ -55,16 +54,11 @@ includeSchema:
|
||||||
- itemFk
|
- itemFk
|
||||||
- quantity
|
- quantity
|
||||||
- created
|
- created
|
||||||
- isPicked
|
|
||||||
hedera:
|
hedera:
|
||||||
order:
|
order:
|
||||||
key: id
|
key: id
|
||||||
columns:
|
columns:
|
||||||
- id
|
- id
|
||||||
- date_send
|
|
||||||
- address_id
|
|
||||||
- company_id
|
|
||||||
- customer_id
|
|
||||||
- confirmed
|
- confirmed
|
||||||
events:
|
events:
|
||||||
- updaterows
|
- updaterows
|
||||||
|
|
Loading…
Reference in New Issue