feat: refs #4452 vn changes

This commit is contained in:
Guillermo Bonet 2024-10-18 14:32:58 +02:00
parent 4dc6aac8da
commit 9fcff7c351
5 changed files with 69 additions and 48 deletions

View File

@ -63,7 +63,7 @@ Este es el archivo de entrada principal que inicializa y comienza el procesamien
- **WorkerPool**: Se importa la clase `WorkerPool` desde el archivo `worker/workerPool`.
- **threads**: Se define el número de threads/workers que se van a utilizar.
- **workerPool**: Se instancia un nuevo `WorkerPool` con el número de threads definido.
- **assignTasks**: Se llama al método `assignTasks` para asignar las tareas iniciales a los workers.
- **start**: Se llama al método `start` para asignar las tareas iniciales a los workers.
### `docker-compose.yml`
@ -102,8 +102,8 @@ Maneja un pool de workers, creando y asignando tareas a los workers.
- **initWorkers**: Inicializa el número especificado de workers.
- **createWorker**: Crea un nuevo worker y maneja sus mensajes, errores y su finalización.
- **replaceWorker**: Reemplaza un worker fallido creando uno nuevo.
- **assignTasks**: Asigna tareas iniciales a todos los workers.
- **closeAllWorkers**: Termina todos los workers.
- **start**: Asigna tareas iniciales a todos los workers.
- **end**: Termina todos los workers.
### `pool.js`

10
main.js
View File

@ -2,10 +2,14 @@ const workerPool = require('./worker/workerPool');
const dotenv = require('dotenv');
dotenv.config();
const threads = process.env.WORKERS || 30;
// Iniciar el pool de workers
const workers = new workerPool(threads);
const workers = new workerPool(process.env.WORKERS || 30);
// Asignar tareas iniciales a los workers
workers.assignTasks();
workers.start();
// Definimos las acciones a las señales para terminar correctamente el proceso
['SIGINT', 'SIGTSTP', 'SIGTERM', 'SIGQUIT'].forEach(signal => {
process.on(signal, () => workers.end());
});

View File

@ -1,7 +1,6 @@
module.exports = function generateZPL(record, label) {
const rf_id_venature = 'AABB';
const id_venature = String(record.expeditionFk).padStart(20, '0');
const rf_id_verdnatura = 'AABB';
const id_verdnatura = String(record.expeditionFk).padStart(20, '0');
// Convert text values of the record to uppercase
const upperCaseRecord = {};
@ -27,7 +26,7 @@ module.exports = function generateZPL(record, label) {
const recordValue = upperCaseRecord[recordKey];
const placeholder = `#${mappingKey}`;
if(placeholder == '#RFID_Code')
zpl = zpl.replace(new RegExp(placeholder, 'g'), rf_id_venature + id_venature);
zpl = zpl.replace(new RegExp(placeholder, 'g'), rf_id_verdnatura + id_verdnatura);
else
zpl = zpl.replace(new RegExp(placeholder, 'g'), recordValue);
}

View File

@ -8,6 +8,7 @@ const net = require('net');
const generateZPL = require('../resources/zplTemplate');
const label = require('../resources/label.json');
dotenv.config();
// Configuración de log4js
log4js.configure({
appenders: {
@ -38,12 +39,14 @@ async function getConnectionWithRetries(retries = 3, delay = 5000) {
}
//Función para enviar ZPL a la impresora con TCP socket y reintentos
async function sendZPL(zplContent, ipAddress, retries = 3) {
async function sendZPL(zplContent, ipAddress, retries = 2) {
const port = 9100;
try {
const result = await new Promise((resolve, reject) => {
const client = new net.Socket();
client.setTimeout(1000);
client.connect(port, ipAddress, () => {
client.write(zplContent, () => {
client.destroy();
@ -51,15 +54,16 @@ async function sendZPL(zplContent, ipAddress, retries = 3) {
});
});
client.on('error', error => {
logger.error(`Error al enviar ZPL a la impresora: ${error.message || error}`);
client.on('timeout', () => {
logger.error('Tiempo de espera agotado al conectar con la impresora');
client.destroy();
reject('error');
});
client.on('close', () => {
logger.info('Conexión cerrada');
client.on('error', error => {
logger.error(`Error al enviar ZPL a la impresora: ${error.message || error}`);
client.destroy();
reject('error');
});
});
@ -80,9 +84,10 @@ async function sendZPL(zplContent, ipAddress, retries = 3) {
// Función para obtener la dirección IP de la impresora, realizando una llamada a la base de datos a la tabla de printer
async function getPrinterIpAddress(printerFk, connection) {
try {
dotenv.config();
const [rows] = await connection.query(`
SELECT ipAddress FROM ${process.env.DB_PRINTER_SCHEMA}.printer WHERE id = ?
SELECT ipAddress
FROM ${process.env.DB_PRINTER_SCHEMA}.printer
WHERE id = ?
`, [printerFk]);
if (!rows.length)
throw new Error(`No se encontró la impresora con id = ${printerFk}`);
@ -96,26 +101,29 @@ async function getPrinterIpAddress(printerFk, connection) {
// Función para obtener un único registro para procesar
async function getRecordForProcessing(retries = 5, delay = 4000) {
for (let attempt = 0; attempt < retries; attempt++) {
const connection = await getConnectionWithRetries();
const conn = await getConnectionWithRetries();
try {
await connection.beginTransaction();
const [rows] = await connection.query(`
SELECT * FROM expedition_PrintOut WHERE isPrinted = 10 LIMIT 1 FOR UPDATE
await conn.beginTransaction();
const [rows] = await conn.query(`
SELECT * FROM expedition_PrintOut
WHERE isPrinted = 10
LIMIT 1 FOR UPDATE
`);
if (!rows.length) {
await connection.commit();
await conn.commit();
return;
}
const record = rows[0];
await connection.query(
'UPDATE expedition_PrintOut SET isPrinted = 12 WHERE expeditionFk = ?',
[record.expeditionFk]
);
await connection.commit();
await conn.query(`
UPDATE expedition_PrintOut
SET isPrinted = 12
WHERE expeditionFk = ?
`, [record.expeditionFk]);
await conn.commit();
return record;
} catch (error) {
await connection.rollback();
await conn.rollback();
if (error.code === 'ER_LOCK_WAIT_TIMEOUT') {
logger.error('Lock wait timeout exceeded, retrying...');
if (attempt >= retries - 1)
@ -126,7 +134,7 @@ async function getRecordForProcessing(retries = 5, delay = 4000) {
return;
}
} finally {
connection.release();
conn.release();
}
}
}
@ -138,7 +146,7 @@ async function processRecord(record) {
try {
await connection.beginTransaction();
logger.info(`Procesando expedición = ${record.expeditionFk}`);
logger.info(`(${record.expeditionFk}) Procesando...`);
const zplData = generateZPL(record, label);
const filePath = path.join(__dirname, `zplData_${record.expeditionFk}.txt`);
@ -146,31 +154,34 @@ async function processRecord(record) {
await fs.writeFile(filePath, zplData["VerdNatura Label RFID"].zpl, 'utf8');
const zplContent = await fs.readFile(filePath, 'utf8');
const ipAddress = await getPrinterIpAddress(record.printerFk, connection);
const sendResult = await sendZPL(zplContent, ipAddress);
fs.unlink(filePath);
if (sendResult === 'success') {
await connection.query(
'UPDATE expedition_PrintOut SET isPrinted = 11 WHERE expeditionFk = ?',
[record.expeditionFk]
);
logger.info(`Base de datos actualizada para expeditionFk=${record.expeditionFk}`);
await connection.query(`
UPDATE expedition_PrintOut
SET isPrinted = 11
WHERE expeditionFk = ?
`, [record.expeditionFk]);
logger.info(`(${record.expeditionFk}) Base de datos actualizada`);
} else {
logger.error(`Error al enviar ZPL a la impresora para expeditionFk=${record.expeditionFk}`);
await connection.query(
'UPDATE expedition_PrintOut SET isPrinted = 13 WHERE expeditionFk = ?',
[record.expeditionFk]
);
await connection.query(`
UPDATE expedition_PrintOut
SET isPrinted = 13
WHERE expeditionFk = ?
`, [record.expeditionFk]);
logger.error(`(${record.expeditionFk}) Error al enviar ZPL a la impresora`);
}
parentPort.postMessage('done');
await connection.commit();
} catch (error) {
logger.error('Error al procesar el registro:', error);
parentPort.postMessage('error');
if (!error.message === `Can't add new command when connection is in closed state`)
await connection.rollback();
// if (!error.message === `Can't add new command when connection is in closed state`)
await connection.rollback();
} finally {
connection.release();
}

View File

@ -54,15 +54,22 @@ class WorkerPool {
}
// Asignar tareas iniciales a los workers
async assignTasks() {
async start() {
const decoration = '△▽'.repeat(10);
console.log(`${decoration} Dismuntel service ${decoration}`);
for (const worker of this.workers)
worker.postMessage('check'); // Pedir al worker que verifique nuevos registros
}
// Cerrar todos los workers
closeAllWorkers() {
for (const worker of this.workers)
worker.terminate();
end() {
try {
for (const worker of this.workers) worker.terminate();
console.log('\nBye ( ◕ ‿ ◕ )っ');
} catch (err) {
console.error(err);
}
process.exit();
}
}