const Queue = require('./queue');

module.exports = class QueueFk extends Queue {
  constructor(consumer, name, conf) {
    super(consumer, name, conf);
    this.reset();
  }

  async consume() {
    await super.consume();
    this.flush();
  }

  reset() {
    this.messages = [];
    this.nMessages = 0;
    this.scopes = new Map();
  }

  flush(flushInterval) {
    if (this.timeout) {
      clearTimeout(this.timeout);
      this.timeout = null;
    }
    this.timeout = setTimeout(
      () => this.onFlushTimeout(), flushInterval);
  }

  async onFlushTimeout() {
    const {messages} = this;

    if (messages.length) {
      const {consumer, scopes, channel} = this;
      this.reset();
      
      if (consumer.conf.debug)
        console.debug('Flush:'.blue, this.name.yellow, scopes);

      try {
        for (const [scope, ids] of scopes) {
          let query = this.conf.query[scope];
          for (const id of ids) {
            const params = {id};
            //query = consumer.db.format(query, params);
            if (consumer.conf.debug)
              console.debug('SQL:'.blue, query, params);
            if (!consumer.conf.testMode)
              await consumer.db.query(query, params);
          }
        }

        const promises = [];
        for (const msg of messages)
          promises.push(channel.ack(msg));
        await Promise.all(promises);
      } catch(err) {
        for (const msg of messages)
          await channel.nack(msg);
        throw err;
      }
    }

    this.flush(this.conf.flushInterval);
  }

  onConsume(msg) {
    const {conf} = this;
    const consumer = this.consumer;
    const data = JSON.parse(msg.content.toString());

    if (consumer.conf.debug)
      console.debug('Message:'.blue, this.name.yellow, data.table);

    const tableConf = conf.includeSchema[data.schema][data.table];

    const scope = tableConf.scope ?? data.table;
    let ids = this.scopes.get(scope);
    if (!ids) this.scopes.set(scope, ids = new Set());

    const key = tableConf.key;
    if (data.eventName === 'updaterows') {
      for (const row of data.rows) {
        ids.add(row.before[key]);
        ids.add(row.after[key]);
      }
    } else
      for (const row of data.rows)
        ids.add(row[key]);

    this.messages.push(msg);
    if (this.messages.length == conf.amqpPrefetch)
      this.flush();
  }
}