rfidnatura/worker/worker.js

201 lines
7.1 KiB
JavaScript

const { parentPort } = require('worker_threads');
const fs = require('fs').promises;
const path = require('path');
const pool = require('../db/pool');
const log4js = require('log4js');
const dotenv = require('dotenv');
const net = require('net');
const generateZPL = require('../resources/zplTemplate');
const label = require('../resources/label.json');
// Configuración de log4js
log4js.configure({
appenders: {
file: { type: 'file', filename: 'logs/app.log', maxLogSize: 10485760, backups: 3, compress: true },
console: { type: 'console' }
},
categories: {
default: { appenders: ['file', 'console'], level: 'info' }
}
});
const logger = log4js.getLogger('default');
// Función para obtener una conexión con reintentos
async function getConnectionWithRetries(retries = 3, delay = 5000) {
let attempt = 0;
while (attempt < retries) {
try {
return await pool.getConnection();
} catch (error) {
attempt++;
if (attempt >= retries)
throw new Error('No se pudo obtener conexión después de múltiples intentos.');
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
//Función para enviar ZPL a la impresora con TCP socket y reintentos
async function sendZPL(zplContent, ipAddress, retries = 3) {
const port = 9100;
try {
const result = await new Promise((resolve, reject) => {
const client = new net.Socket();
client.setTimeout(1000); // Establece el tiempo de espera
client.connect(port, ipAddress, () => {
client.write(zplContent, () => {
client.destroy();
resolve('success');
});
});
client.on('timeout', () => {
logger.error('Timeout en la conexión para el envío de ZPL');
client.destroy();
reject('error');
});
client.on('error', error => {
logger.error(`Error al enviar ZPL a la impresora: ${error.message || error}`);
client.destroy();
reject('error');
});
client.on('close', () => {
logger.info('Conexión cerrada');
reject('error');
});
});
if (result === 'success') return 'success';
} catch (error) {
if (error !== 'error') logger.error(`Error inesperado al enviar ZPL a la impresora: ${error.message || error}`);
if (retries > 0) {
logger.info(`Reintentando... Quedan ${retries} intentos.`);
return sendZPL(zplContent, ipAddress, retries - 1);
} else {
logger.error('Todos los intentos fallaron. No se pudo enviar el ZPL.');
return 'error';
}
}
}
// Función para obtener la dirección IP de la impresora, realizando una llamada a la base de datos a la tabla de printer
async function getPrinterIpAddress(printerFk, connection) {
try {
dotenv.config();
const [rows] = await connection.query(`
SELECT ipAddress FROM ${process.env.DB_PRINTER_SCHEMA}.printer WHERE id = ?
`, [printerFk]);
if (!rows.length)
throw new Error(`No se encontró la impresora con id = ${printerFk}`);
return rows[0].ipAddress;
} catch (error) {
logger.error('Error al obtener la dirección IP de la impresora:', error);
throw error;
}
}
// Función para obtener un único registro para procesar
async function getRecordForProcessing(retries = 5, delay = 4000) {
for (let attempt = 0; attempt < retries; attempt++) {
const connection = await getConnectionWithRetries();
try {
await connection.beginTransaction();
const [rows] = await connection.query(`
SELECT * FROM expedition_PrintOut WHERE isPrinted = 10 LIMIT 1 FOR UPDATE
`);
if (!rows.length) {
await connection.commit();
return;
}
const record = rows[0];
await connection.query(
'UPDATE expedition_PrintOut SET isPrinted = 12 WHERE expeditionFk = ?',
[record.expeditionFk]
);
await connection.commit();
return record;
} catch (error) {
await connection.rollback();
if (error.code === 'ER_LOCK_WAIT_TIMEOUT') {
logger.error('Lock wait timeout exceeded, retrying...');
if (attempt >= retries - 1)
throw new Error('No se pudo obtener el registro después de múltiples intentos.');
await new Promise(resolve => setTimeout(resolve, delay));
} else {
logger.error('Error al obtener y marcar el registro para procesamiento:', error);
return;
}
} finally {
connection.release();
}
}
}
// Función para procesar un único registro
async function processRecord(record) {
const connection = await getConnectionWithRetries();
try {
await connection.beginTransaction();
logger.info(`Procesando expedición = ${record.expeditionFk}`);
const zplData = generateZPL(record, label);
const filePath = path.join(__dirname, `zplData_${record.expeditionFk}.txt`);
await fs.writeFile(filePath, zplData["VerdNatura Label RFID"].zpl, 'utf8');
const zplContent = await fs.readFile(filePath, 'utf8');
const ipAddress = await getPrinterIpAddress(record.printerFk, connection);
const sendResult = await sendZPL(zplContent, ipAddress);
if (sendResult === 'success') {
await connection.query(
'UPDATE expedition_PrintOut SET isPrinted = 11 WHERE expeditionFk = ?',
[record.expeditionFk]
);
logger.info(`Base de datos actualizada para expeditionFk=${record.expeditionFk}`);
} else {
logger.error(`Error al enviar ZPL a la impresora para expeditionFk=${record.expeditionFk}`);
await connection.query(
'UPDATE expedition_PrintOut SET isPrinted = 13 WHERE expeditionFk = ?',
[record.expeditionFk]
);
}
parentPort.postMessage('done');
await connection.commit();
} catch (error) {
logger.error('Error al procesar el registro:', error);
parentPort.postMessage('error');
if (!error.message === `Can't add new command when connection is in closed state`)
await connection.rollback();
} finally {
connection.release();
}
}
// Escuchar mensajes del hilo principal
parentPort.on('message', async msg => {
if (msg === 'check') {
const record = await getRecordForProcessing();
if (record)
await processRecord(record);
else {
// Si no hay registros, espera y vuelve a verificar
setTimeout(async() => {
parentPort.postMessage('done');
}, 5000);
}
} else
processRecord(msg).catch(err => logger.error('Error en el worker:', err));
});