95 lines
2.3 KiB
JavaScript
95 lines
2.3 KiB
JavaScript
const Queue = require('./queue');
|
|
|
|
module.exports = class QueueFk extends Queue {
|
|
constructor(consumer, name, conf) {
|
|
super(consumer, name, conf);
|
|
this.reset();
|
|
}
|
|
|
|
async consume() {
|
|
await super.consume();
|
|
this.flush();
|
|
}
|
|
|
|
reset() {
|
|
this.messages = [];
|
|
this.nMessages = 0;
|
|
this.scopes = new Map();
|
|
}
|
|
|
|
flush(flushInterval) {
|
|
if (this.timeout) {
|
|
clearTimeout(this.timeout);
|
|
this.timeout = null;
|
|
}
|
|
this.timeout = setTimeout(
|
|
() => this.onFlushTimeout(), flushInterval);
|
|
}
|
|
|
|
async onFlushTimeout() {
|
|
const {messages} = this;
|
|
|
|
if (messages.length) {
|
|
const {consumer, scopes, channel} = this;
|
|
this.reset();
|
|
|
|
if (consumer.conf.debug)
|
|
console.debug('Flush:'.blue, this.name.yellow, scopes);
|
|
|
|
try {
|
|
for (const [scope, ids] of scopes) {
|
|
let query = this.conf.query[scope];
|
|
for (const id of ids) {
|
|
const params = {id};
|
|
//query = consumer.db.format(query, params);
|
|
if (consumer.conf.debug)
|
|
console.debug('SQL:'.blue, query, params);
|
|
if (!consumer.conf.testMode)
|
|
await consumer.db.query(query, params);
|
|
}
|
|
}
|
|
|
|
const promises = [];
|
|
for (const msg of messages)
|
|
promises.push(channel.ack(msg));
|
|
await Promise.all(promises);
|
|
} catch(err) {
|
|
for (const msg of messages)
|
|
await channel.nack(msg);
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
this.flush(this.conf.flushInterval);
|
|
}
|
|
|
|
onConsume(msg) {
|
|
const {conf} = this;
|
|
const consumer = this.consumer;
|
|
const data = JSON.parse(msg.content.toString());
|
|
|
|
if (consumer.conf.debug)
|
|
console.debug('Message:'.blue, this.name.yellow, data.table);
|
|
|
|
const tableConf = conf.includeSchema[data.schema][data.table];
|
|
|
|
const scope = tableConf.scope ?? data.table;
|
|
let ids = this.scopes.get(scope);
|
|
if (!ids) this.scopes.set(scope, ids = new Set());
|
|
|
|
const key = tableConf.key;
|
|
if (data.eventName === 'updaterows') {
|
|
for (const row of data.rows) {
|
|
ids.add(row.before[key]);
|
|
ids.add(row.after[key]);
|
|
}
|
|
} else
|
|
for (const row of data.rows)
|
|
ids.add(row[key]);
|
|
|
|
this.messages.push(msg);
|
|
if (this.messages.length == conf.amqpPrefetch)
|
|
this.flush();
|
|
}
|
|
}
|