rfidnatura/worker/worker.js

183 lines
5.1 KiB
JavaScript

const { parentPort } = require('worker_threads');
const fs = require('fs').promises;
const path = require('path');
const pool = require('../db/pool');
const env = require('dotenv').config().parsed || process.env;
const net = require('net');
const generateZPL = require('../resources/zplTemplate');
const label = require('../resources/label.json');
const log = require('../log')
// Función para obtener una conexión con reintentos
async function getConn(retries = 3, delay = 5000) {
let attempt = 0;
while (attempt < retries) {
try {
return await pool.getConnection();
} catch (error) {
attempt++;
if (attempt >= retries)
throw new Error('No se pudo obtener conexión después de múltiples intentos.');
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
async function sendZPL(zplContent, ipAddress, retries = 2) {
const port = 9100;
try {
const result = await new Promise((resolve, reject) => {
const client = new net.Socket();
client.setTimeout(1000);
client.connect(port, ipAddress, () => {
if (!env.DRY_PRINT)
client.write(zplContent, () => {
client.destroy();
resolve('success');
});
else {
client.destroy();
resolve('success');
}
});
client.on('timeout', () => {
log('error', 'Tiempo de espera agotado al conectar con la impresora');
client.destroy();
reject('error');
});
client.on('error', error => {
log('error', `Error al enviar ZPL a la impresora: ${error.message || error}`);
client.destroy();
reject('error');
});
});
if (result === 'success') return true;
} catch (error) {
if (error !== 'error')
log('error', `Error inesperado al enviar ZPL a la impresora: ${error.message || error}`);
if (retries > 0) {
log('debug', `Reintentando... Quedan ${retries} intentos.`);
return sendZPL(zplContent, ipAddress, retries - 1);
} else {
log('error', 'Todos los intentos fallaron. No se pudo enviar el ZPL.');
return 'error';
}
}
}
async function getPrinterIp(printerFk, conn) {
try {
const [rows] = await conn.query(`
SELECT ipAddress
FROM ${env.DB_PRINTER_SCHEMA}.printer
WHERE id = ?
`, [printerFk]);
if (!rows.length)
throw new Error(`No se encontró la impresora con id = ${printerFk}`);
return rows[0].ipAddress;
} catch (error) {
log('error', 'Error al obtener la dirección IP de la impresora:', error);
throw error;
}
}
async function getRecord(retries = 5, delay = 4000) {
for (let attempt = 0; attempt < retries; attempt++) {
const conn = await getConn();
try {
await conn.beginTransaction();
const [rows] = await conn.query(`
SELECT * FROM expedition_PrintOut
WHERE isPrinted = 10
LIMIT 1 FOR UPDATE
`);
if (!rows.length) {
await conn.commit();
return;
}
const record = rows[0];
await updateState(conn, record.expeditionFk, 12)
await conn.commit();
return record;
} catch (error) {
await conn.rollback();
if (error.code === 'ER_LOCK_WAIT_TIMEOUT') {
log('error', 'Lock wait timeout exceeded, retrying...');
if (attempt >= retries - 1)
throw new Error('No se pudo obtener el registro después de múltiples intentos.');
await new Promise(resolve => setTimeout(resolve, delay));
} else {
log('error', 'Error al obtener y marcar el registro para procesamiento:', error);
return;
}
} finally {
conn.release();
}
}
}
async function processRecord(record) {
const conn = await getConn();
try {
await conn.beginTransaction();
log('debug', `(${record.expeditionFk}) Procesando...`);
const zplData = generateZPL(record, label);
if (env.KEEP_TMP_FILES) {
const dirPath = path.join(__dirname, '..', 'tmp');
await fs.mkdir(dirPath, { recursive: true });
const filePath = path.join(dirPath, `zplData_${record.expeditionFk}.txt`);
await fs.writeFile(filePath, zplData['VerdNatura Label RFID'].zpl, 'utf8');
}
const ipAddress = await getPrinterIp(record.printerFk, conn);
const isSendResult = await sendZPL(zplData['VerdNatura Label RFID'].zpl, ipAddress);
if (isSendResult) {
await updateState(conn, record.expeditionFk, 11)
log('success', `(${record.expeditionFk}) Impresión realizada correctamente`);
} else {
await updateState(conn, record.expeditionFk, 13)
log('error', `(${record.expeditionFk}) Error en la impresión`);
}
parentPort.postMessage('done');
await conn.commit();
} catch (error) {
log('error', `Error al procesar el registro ${error}`);
await updateState(conn, record.expeditionFk, 13)
parentPort.postMessage('error');
await conn.rollback();
} finally {
conn.release();
}
}
async function updateState(conn, expeditionId, state) {
await conn.query(`
UPDATE expedition_PrintOut
SET isPrinted = ?
WHERE expeditionFk = ?
`, [state, expeditionId]);
}
// Escuchar mensajes del hilo principal
parentPort.on('message', async msg => {
if (msg === 'check') {
const record = await getRecord();
if (record)
await processRecord(record);
else {
setTimeout(async() => {
parentPort.postMessage('done');
}, 5000);
}
} else
processRecord(msg).catch(err => log('error', 'Error en el worker:', err));
});