fix: refs #4409 Columns changed detection fix
gitea/mycdc/pipeline/head This commit looks good
Details
gitea/mycdc/pipeline/head This commit looks good
Details
This commit is contained in:
parent
f8502e3026
commit
5e01721f75
|
@ -5,6 +5,7 @@ deleteNonEmpty: false
|
||||||
amqp: amqp://user:password@localhost:5672
|
amqp: amqp://user:password@localhost:5672
|
||||||
pingInterval: 60
|
pingInterval: 60
|
||||||
flushInterval: 10
|
flushInterval: 10
|
||||||
|
serverId: 1
|
||||||
db:
|
db:
|
||||||
host: localhost
|
host: localhost
|
||||||
port: 3306
|
port: 3306
|
||||||
|
|
28
mycdc.js
28
mycdc.js
|
@ -60,7 +60,7 @@ module.exports = class MyCDC {
|
||||||
tableInfo = {
|
tableInfo = {
|
||||||
queues: new Map(),
|
queues: new Map(),
|
||||||
events: new Map(),
|
events: new Map(),
|
||||||
columns: false,
|
columnSet: false,
|
||||||
fk: 'id'
|
fk: 'id'
|
||||||
};
|
};
|
||||||
tableMap.set(tableName, tableInfo);
|
tableMap.set(tableName, tableInfo);
|
||||||
|
@ -79,13 +79,13 @@ module.exports = class MyCDC {
|
||||||
|
|
||||||
const columns = table.columns;
|
const columns = table.columns;
|
||||||
if (columns) {
|
if (columns) {
|
||||||
if (tableInfo.columns === false)
|
if (tableInfo.columnSet === false)
|
||||||
tableInfo.columns = new Set();
|
tableInfo.columnSet = new Set();
|
||||||
if (tableInfo.columns !== true)
|
if (tableInfo.columnSet !== true)
|
||||||
for (const column of columns)
|
for (const column of columns)
|
||||||
tableInfo.columns.add(column);
|
tableInfo.columnSet.add(column);
|
||||||
} else
|
} else
|
||||||
tableInfo.columns = true;
|
tableInfo.columnSet = true;
|
||||||
|
|
||||||
if (table.id)
|
if (table.id)
|
||||||
tableInfo.id = table.id;
|
tableInfo.id = table.id;
|
||||||
|
@ -96,8 +96,15 @@ module.exports = class MyCDC {
|
||||||
}
|
}
|
||||||
|
|
||||||
const includeSchema = {};
|
const includeSchema = {};
|
||||||
for (const [schemaName, tableMap] of this.schemaMap)
|
|
||||||
|
for (const [schemaName, tableMap] of this.schemaMap) {
|
||||||
includeSchema[schemaName] = Array.from(tableMap.keys());
|
includeSchema[schemaName] = Array.from(tableMap.keys());
|
||||||
|
|
||||||
|
for (const [tableName, tableInfo] of tableMap) {
|
||||||
|
if (tableInfo.columnSet !== true)
|
||||||
|
tableInfo.columns = Array.from(tableInfo.columnSet.keys());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
this.opts = {
|
this.opts = {
|
||||||
includeEvents: [
|
includeEvents: [
|
||||||
|
@ -107,7 +114,8 @@ module.exports = class MyCDC {
|
||||||
'updaterows',
|
'updaterows',
|
||||||
'deleterows'
|
'deleterows'
|
||||||
],
|
],
|
||||||
includeSchema
|
includeSchema,
|
||||||
|
serverId: conf.serverId
|
||||||
};
|
};
|
||||||
|
|
||||||
if (conf.testMode)
|
if (conf.testMode)
|
||||||
|
@ -353,9 +361,9 @@ module.exports = class MyCDC {
|
||||||
if (isUpdate) {
|
if (isUpdate) {
|
||||||
rows = [];
|
rows = [];
|
||||||
cols = new Set();
|
cols = new Set();
|
||||||
let columns = tableInfo.columns === true
|
const columns = tableInfo.columnSet === true
|
||||||
? Object.keys(evt.rows[0].after)
|
? Object.keys(evt.rows[0].after)
|
||||||
: tableInfo.columns.keys();
|
: tableInfo.columns;
|
||||||
|
|
||||||
for (const row of evt.rows) {
|
for (const row of evt.rows) {
|
||||||
let nColsChanged = 0;
|
let nColsChanged = 0;
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{
|
{
|
||||||
"name": "mycdc",
|
"name": "mycdc",
|
||||||
"version": "0.0.13",
|
"version": "0.0.14",
|
||||||
"author": "Verdnatura Levante SL",
|
"author": "Verdnatura Levante SL",
|
||||||
"description": "Asynchronous DB calculations reading the binary log",
|
"description": "Asynchronous DB calculations reading the binary log",
|
||||||
"license": "GPL-3.0",
|
"license": "GPL-3.0",
|
||||||
|
|
Loading…
Reference in New Issue