mycdc/lib/queue-fk.js

94 lines
2.3 KiB
JavaScript

const Queue = require('./queue');
module.exports = class QueueFk extends Queue {
constructor(consumer, name, conf) {
super(consumer, name, conf);
this.reset();
}
async consume() {
await super.consume();
this.flush();
}
reset() {
this.lastMessage = null;
this.nMessages = 0;
this.scopes = new Map();
}
flush(flushInterval) {
if (this.timeout) {
clearTimeout(this.timeout);
this.timeout = null;
}
this.timeout = setTimeout(
() => this.onFlushTimeout(), flushInterval);
}
async onFlushTimeout() {
if (this.nMessages) {
const {consumer} = this;
if (consumer.conf.debug)
console.debug('Flush:'.blue, this.name.yellow, this.ids);
const scopes = this.scopes;
const lastMessage = this.lastMessage;
this.reset();
try {
for (const [scope, ids] of scopes) {
let query = this.conf.query[scope];
for (const id of ids) {
const params = {id};
//query = consumer.db.format(query, params);
if (consumer.conf.debug)
console.debug('SQL:'.blue, query, params);
if (!consumer.conf.testMode)
await consumer.db.query(query, params);
}
}
await this.channel.ack(lastMessage, true);
} catch(err) {
await this.channel.nack(lastMessage, true);
console.error(err);
}
}
this.flush(this.conf.flushInterval);
}
async onConsume(msg) {
const {conf} = this;
const consumer = this.consumer;
const data = JSON.parse(msg.content.toString());
if (consumer.conf.debug)
console.debug('Message:'.blue, this.name.yellow, data.table);
const tableConf = conf.includeSchema[data.schema][data.table];
const scope = tableConf.scope ?? data.table;
let ids = this.scopes.get(scope);
if (!ids) this.scopes.set(scope, ids = new Set());
const key = tableConf.key;
if (data.eventName === 'updaterows') {
for (const row of data.rows) {
ids.add(row.before[key]);
ids.add(row.after[key]);
}
} else
for (const row of data.rows)
ids.add(row[key]);
this.nMessages++;
this.lastMessage = msg;
if (this.nMessages == consumer.conf.amqpPrefetch)
this.flush();
}
}