95 lines
2.3 KiB
JavaScript
95 lines
2.3 KiB
JavaScript
module.exports = class Queue {
|
|
constructor(consumer, name, conf) {
|
|
Object.assign(this, {consumer, name, conf});
|
|
this.reset();
|
|
}
|
|
|
|
async consume() {
|
|
if (this.conf.mode !== 'fk') {
|
|
console.warn(`Ignoring queue '${this.name} with unknown mode '${this.conf.mode}'`);
|
|
return;
|
|
}
|
|
|
|
const channel = await this.consumer.amqpConn.createChannel();
|
|
channel.prefetch(this.conf.amqpPrefetch);
|
|
await channel.assertQueue(this.name, {
|
|
durable: true
|
|
});
|
|
this.channel = channel;
|
|
|
|
await channel.consume(this.name,
|
|
msg => this.onConsume(msg));
|
|
this.flush();
|
|
}
|
|
|
|
reset() {
|
|
this.lastMessage = null;
|
|
this.nMessages = 0;
|
|
this.ids = new Set();
|
|
}
|
|
|
|
flush(flushInterval) {
|
|
if (this.timeout) {
|
|
clearTimeout(this.timeout);
|
|
this.timeout = null;
|
|
}
|
|
this.timeout = setTimeout(
|
|
() => this.onFlushTimeout(), flushInterval);
|
|
}
|
|
|
|
async onFlushTimeout() {
|
|
const consumer = this.consumer;
|
|
|
|
if (this.ids.size) {
|
|
if (consumer.conf.debug)
|
|
console.debug('Flush:'.blue, this.name.yellow, this.ids);
|
|
|
|
const ids = Array.from(this.ids);
|
|
const lastMessage = this.lastMessage;
|
|
this.reset();
|
|
|
|
try {
|
|
for (const id of ids) {
|
|
const sql = consumer.db.format(this.conf.query, id);
|
|
consumer.debug('SQL', sql);
|
|
if (!consumer.conf.testMode)
|
|
await consumer.db.query(sql);
|
|
}
|
|
|
|
await this.channel.ack(lastMessage, true);
|
|
} catch(err) {
|
|
await this.channel.nack(lastMessage, true);
|
|
console.error(err);
|
|
}
|
|
}
|
|
|
|
this.flush(this.conf.flushInterval);
|
|
}
|
|
|
|
async onConsume(msg) {
|
|
const consumer = this.consumer;
|
|
const data = JSON.parse(msg.content.toString());
|
|
|
|
if (consumer.conf.debug)
|
|
console.debug('Message:'.blue, this.name.yellow, data.table);
|
|
|
|
const ids = this.ids;
|
|
const key = this.conf.includeSchema[data.schema][data.table].key;
|
|
|
|
if (data.eventName === 'updaterows') {
|
|
for (const row of data.rows) {
|
|
ids.add(row.before[key]);
|
|
ids.add(row.after[key]);
|
|
}
|
|
} else
|
|
for (const row of data.rows)
|
|
ids.add(row[key]);
|
|
|
|
this.nMessages++;
|
|
this.lastMessage = msg;
|
|
|
|
if (this.nMessages == consumer.conf.amqpPrefetch)
|
|
this.flush();
|
|
}
|
|
}
|