rfidnatura/worker/worker.js

159 lines
4.3 KiB
JavaScript

const { parentPort } = require('worker_threads');
const fs = require('fs').promises;
const path = require('path');
const pool = require('../db/pool');
const env = require('dotenv').config().parsed || process.env;
const net = require('net');
const generateZPL = require('../resources/zplTemplate');
const label = require('../resources/label.json');
const log = require('../log')
async function getConn(retries = Number(env.DB_CONN_ATTEMPTS), delay = Number(env.DB_CONN_DELAY)) {
let attempt = 0;
while (attempt < retries) {
try {
return await pool.getConnection();
} catch (error) {
attempt++;
if (attempt >= retries)
throw new Error(`Could not get connection: ${error}`);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
async function sendZPL(zplContent, ipAddress, retries = Number(env.PRINTER_CONN_ATTEMPTS)) {
try {
const result = await new Promise((resolve, reject) => {
const client = new net.Socket();
client.setTimeout(Number(env.PRINTER_CONN_TIMEOUT));
client.connect(Number(env.PRINTER_CONN_PORT), ipAddress, () => {
if (!env.DRY_PRINT)
client.write(zplContent, () => {
client.destroy();
resolve('success');
});
else {
client.destroy();
resolve('success');
}
});
client.on('timeout', () => {
log('error', 'Connection to the printer timed out');
client.destroy();
reject('error');
});
client.on('error', error => {
log('error', `Error sending ZPL to the printer: ${error.message || error}`);
client.destroy();
reject('error');
});
});
if (result === 'success') return true;
} catch (error) {
if (retries > 0) {
log('debug', `Retrying... ${retries} attempts remaining`);
return sendZPL(zplContent, ipAddress, retries - 1);
} else {
log('error', 'All attempts failed. Unable to send ZPL.');
return false;
}
}
}
async function getPrinterIp(printerFk, conn) {
try {
const [rows] = await conn.query(`
SELECT ipAddress
FROM ${env.DB_PRINTER_SCHEMA}.printer
WHERE id = ?
`, [printerFk]);
if (!rows.length)
throw new Error(`Printer with ID ${printerFk} not found`);
return rows[0].ipAddress;
} catch (error) {
log('error', `Error retrieving the printer IP address:`, error);
throw error;
}
}
async function getRecord() {
const conn = await getConn();
try {
await conn.beginTransaction();
const [rows] = await conn.query(`
SELECT * FROM expedition_PrintOut
WHERE isPrinted = 0
ORDER BY created
LIMIT 1 FOR UPDATE
`);
if (!rows.length) {
await conn.commit();
return;
}
const record = rows[0];
await updateState(conn, record.expeditionFk, 2)
await conn.commit();
return record;
} catch (error) {
await conn.rollback();
log('error', 'Unable to retrieve and mark the record:', error);
return;
} finally {
conn.release();
}
}
async function processRecord(record) {
const conn = await getConn();
try {
log('debug', `(${record.expeditionFk}) Processing...`);
const zplData = generateZPL(record, label);
if (env.KEEP_TMP_FILES) {
const dirPath = path.join(__dirname, '..', 'tmp');
await fs.mkdir(dirPath, { recursive: true });
const filePath = path.join(dirPath, `zplData_${record.expeditionFk}.txt`);
await fs.writeFile(filePath, zplData['VerdNatura Label RFID'].zpl, 'utf8');
}
const ipAddress = await getPrinterIp(record.printerFk, conn);
const isSendResult = await sendZPL(zplData['VerdNatura Label RFID'].zpl, ipAddress);
if (isSendResult) {
await updateState(conn, record.expeditionFk, 1)
log('success', `(${record.expeditionFk}) Print completed successfully`);
} else {
await updateState(conn, record.expeditionFk, 3)
log('error', `(${record.expeditionFk}) Print not completed`);
}
parentPort.postMessage('done');
} catch (error) {
log('error', `Unable to process the record ${error}`);
await updateState(conn, record.expeditionFk, 3)
parentPort.postMessage('error');
} finally {
conn.release();
}
}
async function updateState(conn, expeditionId, state) {
await conn.query(`
UPDATE expedition_PrintOut
SET isPrinted = ?
WHERE expeditionFk = ?
`, [state, expeditionId]);
}
parentPort.on('message', async msg => {
if (msg === 'check') {
const record = await getRecord();
if (record)
await processRecord(record);
else {
setTimeout(async() => {
parentPort.postMessage('done');
}, Number(env.DB_RECORD_DELAY));
}
}
});