refs #4823 Refactor
This commit is contained in:
parent
b9cc8f1a7c
commit
1aaccaa73b
|
@ -11,7 +11,7 @@ DB_USER = root
|
|||
DB_PWD = root
|
||||
DB_HOST = localhost
|
||||
DB_DIALECT = mariadb
|
||||
DB_TIMEOUT_RECONECT = 30000
|
||||
DB_RECON_TIMEOUT = 30000
|
||||
DB_MAX_CONN_POOL = 40
|
||||
DB_TIMEZONE = Europe/Madrid
|
||||
|
||||
|
@ -23,12 +23,12 @@ SECRETS = true
|
|||
FORCE_SYNC = true
|
||||
SYNC_ORGANIZATION = true
|
||||
SYNC_WAREHOUSE = true
|
||||
SYNC_CONN = true
|
||||
SYNC_CON = true
|
||||
SYNC_TRADEITEM = true
|
||||
|
||||
#REQUEST CONFIG
|
||||
MS_RETRY_UNHANDLED_ERROR = 900000
|
||||
|
||||
#DEV OPTIONS
|
||||
SUPPLIERS_ALWAYS_CONN = false
|
||||
ORGS_ALWAYS_CONN = false
|
||||
APPLY_ORG_FILTER = false
|
22
floriday.js
22
floriday.js
|
@ -1,4 +1,4 @@
|
|||
import { checkConn, closeConn } from './models/sequelize.js';
|
||||
import { checkCon, closeCon } from './models/sequelize.js';
|
||||
import * as utils from './utils.js';
|
||||
import moment from 'moment';
|
||||
import chalk from 'chalk';
|
||||
|
@ -11,8 +11,8 @@ class Floriday {
|
|||
try {
|
||||
await utils.checkConfig();
|
||||
await utils.requestToken();
|
||||
if (JSON.parse(env.SYNC_ORGANIZATION)) await utils.syncSuppliers();
|
||||
if (JSON.parse(env.SYNC_CONN)) await utils.syncConnections();
|
||||
if (JSON.parse(env.SYNC_ORGANIZATION)) await utils.syncOrganizations();
|
||||
if (JSON.parse(env.SYNC_CON)) await utils.syncConnections();
|
||||
if (JSON.parse(env.SYNC_WAREHOUSE)) await utils.syncWarehouses();
|
||||
if (JSON.parse(env.SYNC_TRADEITEM)) await utils.syncTradeItems();
|
||||
} catch (err) {
|
||||
|
@ -23,13 +23,12 @@ class Floriday {
|
|||
async tryConn() {
|
||||
while (true)
|
||||
try {
|
||||
utils.warning(new Error('Waiting for the database to respond...'));
|
||||
await utils.sleep(env.DB_TIMEOUT_RECONECT);
|
||||
await checkConn();
|
||||
utils.warning(new Error('Awaiting a response from the database...'));
|
||||
await utils.sleep(env.DB_RECON_TIMEOUT);
|
||||
await checkCon();
|
||||
await this.schedule();
|
||||
}
|
||||
catch (err) {
|
||||
}
|
||||
catch (err) {}
|
||||
}
|
||||
|
||||
async schedule () {
|
||||
|
@ -54,21 +53,20 @@ class Floriday {
|
|||
|
||||
async trunk() {
|
||||
try{
|
||||
if (JSON.parse(env.SYNC_CONN)) await utils.syncConnections();
|
||||
if (JSON.parse(env.SYNC_CON)) await utils.syncConnections();
|
||||
await utils.syncSupplyLines();
|
||||
|
||||
// Continuar con todo lo que haga falta realizar en la rutina
|
||||
|
||||
} catch (err) {
|
||||
if (err.name === 'SequelizeConnectionRefusedError')
|
||||
throw err;
|
||||
if (err.name === 'SequelizeConnectionRefusedError') throw err;
|
||||
utils.criticalError(err);
|
||||
}
|
||||
}
|
||||
|
||||
async stop() {
|
||||
this.continueSchedule = false;
|
||||
await closeConn();
|
||||
await closeCon();
|
||||
console.warn(chalk.dim('Bye, come back soon 👋'))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@ import { Sequelize } from 'sequelize';
|
|||
import dotenv from 'dotenv';
|
||||
import chalk from 'chalk';
|
||||
import ora from 'ora';
|
||||
import { criticalError, updateClientConfig } from '../utils.js';
|
||||
import { criticalError } from '../utils.js';
|
||||
|
||||
dotenv.config();
|
||||
const env = process.env;
|
||||
|
@ -22,7 +22,7 @@ let sequelize, conSpinner
|
|||
try {
|
||||
conSpinner = ora('Creating database connection...').start();
|
||||
sequelize = createConn();
|
||||
await checkConn();
|
||||
await checkCon();
|
||||
conSpinner.succeed();
|
||||
} catch (err) {
|
||||
conSpinner.fail();
|
||||
|
@ -139,22 +139,16 @@ try {
|
|||
criticalError(err);
|
||||
}
|
||||
|
||||
let action, isForce;
|
||||
if (JSON.parse(env.FORCE_SYNC)) {
|
||||
action = 'Forcing'
|
||||
isForce = true
|
||||
} else {
|
||||
action = 'Altering'
|
||||
isForce = false
|
||||
}
|
||||
|
||||
const modSpinner = ora(`${action} models...`).start();
|
||||
let spinner;
|
||||
try {
|
||||
await sequelize.sync({ force: isForce });
|
||||
modSpinner.succeed();
|
||||
const action = JSON.parse(env.FORCE_SYNC) ? { force: true } : { alter: true };
|
||||
const actionMsg = JSON.parse(env.FORCE_SYNC) ? 'Forcing' : 'Altering';
|
||||
const spinner = ora(`${actionMsg} models...`).start();
|
||||
await sequelize.sync(action);
|
||||
spinner.succeed();
|
||||
}
|
||||
catch (err) {
|
||||
modSpinner.fail();
|
||||
if (spinner) spinner.fail();
|
||||
criticalError(err);
|
||||
}
|
||||
|
||||
|
@ -180,7 +174,7 @@ function createConn() {
|
|||
/**
|
||||
* Check if connection is ok
|
||||
*/
|
||||
async function checkConn() {
|
||||
async function checkCon() {
|
||||
try {
|
||||
await sequelize.authenticate();
|
||||
} catch (err) {
|
||||
|
@ -191,7 +185,7 @@ async function checkConn() {
|
|||
/**
|
||||
* Close the connection to the database
|
||||
*/
|
||||
async function closeConn() {
|
||||
async function closeCon() {
|
||||
const spinner = ora('Closing database connection...').start();
|
||||
try {
|
||||
await sequelize.close()
|
||||
|
@ -202,4 +196,4 @@ async function closeConn() {
|
|||
}
|
||||
}
|
||||
|
||||
export { models, checkConn, closeConn};
|
||||
export { models, checkCon, closeCon};
|
159
utils.js
159
utils.js
|
@ -8,8 +8,8 @@ import ora from 'ora';
|
|||
const env = process.env;
|
||||
|
||||
/**
|
||||
* Gets the Access Token
|
||||
*
|
||||
* Gets the Access Token.
|
||||
*
|
||||
* @param {Boolean} isForce Force to request new token
|
||||
*/
|
||||
export async function requestToken(isForce = false) {
|
||||
|
@ -56,8 +56,8 @@ export async function requestToken(isForce = false) {
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns the current token
|
||||
*
|
||||
* Returns the current token.
|
||||
*
|
||||
* @returns {string}
|
||||
*/
|
||||
export async function getCurrentToken() {
|
||||
|
@ -65,7 +65,7 @@ export async function getCurrentToken() {
|
|||
}
|
||||
|
||||
/**
|
||||
* Check the floriday data config
|
||||
* Check the floriday data config.
|
||||
*/
|
||||
export async function checkConfig() {
|
||||
const spinner = ora(`Checking config...`).start();
|
||||
|
@ -88,8 +88,8 @@ export async function checkConfig() {
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns the expiration of current token
|
||||
*
|
||||
* Returns the expiration of current token.
|
||||
*
|
||||
* @returns {string}
|
||||
*/
|
||||
export async function getCurrentTokenExpiration() {
|
||||
|
@ -97,8 +97,8 @@ export async function getCurrentTokenExpiration() {
|
|||
}
|
||||
|
||||
/**
|
||||
* Updates the access token in the client config table
|
||||
*
|
||||
* Updates the access token in the client config table.
|
||||
*
|
||||
* @param {Array} clientConfig [clientId, clientSecret, currenToken, tokenExpiration]
|
||||
*/
|
||||
export async function updateClientConfig(clientConfig) {
|
||||
|
@ -114,43 +114,38 @@ export async function updateClientConfig(clientConfig) {
|
|||
}
|
||||
|
||||
/**
|
||||
* pauses the execution of the script for the given amount of milliseconds
|
||||
*
|
||||
* @param {integer} ms
|
||||
* @returns A promise that resolves after ms milliseconds.
|
||||
* Pauses the execution of the script for the specified number of milliseconds.
|
||||
*
|
||||
* @param {Integer} ms
|
||||
*/
|
||||
export async function sleep(ms) {
|
||||
return new Promise((resolve) => {
|
||||
setTimeout(resolve, ms);
|
||||
});
|
||||
await new Promise(resolve => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
/**
|
||||
* Sync the suppliers
|
||||
* Sync the organizations.
|
||||
*/
|
||||
export async function syncSuppliers(){
|
||||
let spinner = ora('Preparing to load suppliers...').start();
|
||||
export async function syncOrganizations(){
|
||||
let spinner = ora('Syncing organizations...').start();
|
||||
let i = 1;
|
||||
try {
|
||||
const maxSequenceNumber = (await vnRequest('GET', `${env.API_URL}/organizations/current-max-sequence`)).data;
|
||||
|
||||
let timeFinish, timeToGoSec, timeToGoMin, timeLeft;
|
||||
for (let curSequenceNumber = 0; curSequenceNumber <= maxSequenceNumber; curSequenceNumber++) {
|
||||
let timeStart = new moment();
|
||||
let suppliers = (await vnRequest('GET', `${env.API_URL}/organizations/sync/${curSequenceNumber}?organizationType=SUPPLIER`)).data.results;
|
||||
for (let supplier of suppliers) {
|
||||
curSequenceNumber = supplier.sequenceNumber;
|
||||
spinner.text = `Syncing suppliers, ${maxSequenceNumber - curSequenceNumber} are missing`
|
||||
if (timeFinish) spinner.text = spinner.text + ` (${timeLeft})`
|
||||
if (JSON.parse(env.APPLY_ORG_FILTER) && supplier.companyGln) // Filtro temporal para quitar los que parecen test
|
||||
await insertOrganization(supplier);
|
||||
const params = new URLSearchParams({
|
||||
organizationType: 'SUPPLIER'
|
||||
}).toString();
|
||||
let response = (await vnRequest('GET', `${env.API_URL}/organizations/sync/${curSequenceNumber}?${params}`)).data;
|
||||
curSequenceNumber = response.maximumSequenceNumber;
|
||||
const orgs = response.results;
|
||||
for (let org of orgs) {
|
||||
spinner.text = `Syncing ${i} organizations, ${maxSequenceNumber - curSequenceNumber} missing...`
|
||||
if (JSON.parse(env.APPLY_ORG_FILTER) && org.companyGln && !org.endDate) { // Filtro para quitar los que parecen test
|
||||
await insertOrganization(org);
|
||||
spinner.text = `Syncing ${i++} organizations, ${maxSequenceNumber - curSequenceNumber} missing...`
|
||||
}
|
||||
};
|
||||
timeFinish = new moment();
|
||||
timeToGoSec = (timeFinish.diff(timeStart, 'seconds') * (maxSequenceNumber - curSequenceNumber) / 1000);
|
||||
timeToGoMin = Math.trunc(timeToGoSec / 60);
|
||||
(!timeToGoMin) ? timeLeft = `${Math.trunc(timeToGoSec)} sec` : timeLeft = `${timeToGoMin} min`;
|
||||
}
|
||||
spinner.text = `Syncing suppliers...`;
|
||||
spinner.succeed()
|
||||
spinner.succeed();
|
||||
}
|
||||
catch (err) {
|
||||
spinner.fail();
|
||||
|
@ -159,7 +154,7 @@ export async function syncSuppliers(){
|
|||
}
|
||||
|
||||
/**
|
||||
* Create the connections in Floriday
|
||||
* Create the connections in Floriday.
|
||||
*/
|
||||
export async function syncConnections(){
|
||||
await deleteConnections();
|
||||
|
@ -205,28 +200,28 @@ export async function syncConnections(){
|
|||
}
|
||||
|
||||
/**
|
||||
* Sync the trade items for organizations that are connected
|
||||
* Sync the trade items for organizations that are connected.
|
||||
*/
|
||||
export async function syncTradeItems(){
|
||||
const spinner = ora(`Syncing trade items...`).start();
|
||||
const suppliers = await models.organization.findAll({
|
||||
const orgs = await models.organization.findAll({
|
||||
attributes: ['organizationId'],
|
||||
where: { isConnected: true }
|
||||
});
|
||||
let i = 0, x = 0;
|
||||
for (let supplier of suppliers) {
|
||||
for (let org of orgs) {
|
||||
try {
|
||||
const params = new URLSearchParams({
|
||||
supplierOrganizationId: supplier.organizationId,
|
||||
supplierOrganizationId: org.organizationId,
|
||||
}).toString();
|
||||
let tradeItems = (await vnRequest('GET', `${env.API_URL}/trade-items?${params}`)).data
|
||||
|
||||
spinner.text = `Syncing ${i} trade items of [${x++}|${suppliers.length}] suppliers...`
|
||||
spinner.text = `Syncing ${i} trade items of [${x++}|${orgs.length}] organizations...`
|
||||
if (!tradeItems.length) continue;
|
||||
|
||||
for (let tradeItem of tradeItems) {
|
||||
await insertItem(tradeItem);
|
||||
spinner.text = `Syncing ${i++} trade items of [${x}|${suppliers.length}] suppliers...`
|
||||
spinner.text = `Syncing ${i++} trade items of [${x}|${orgs.length}] organizations...`
|
||||
};
|
||||
} catch (err) {
|
||||
spinner.fail();
|
||||
|
@ -237,23 +232,23 @@ export async function syncTradeItems(){
|
|||
}
|
||||
|
||||
/**
|
||||
* Sync the supply lines for organizations that are connected
|
||||
*
|
||||
* If necessary, create the item or the warehouse
|
||||
* Sync the supply lines for organizations that are connected.
|
||||
*
|
||||
* If necessary, create the dependences.
|
||||
*/
|
||||
export async function syncSupplyLines() {
|
||||
const spinner = ora(`Syncing supply lines...`).start();
|
||||
try {
|
||||
let connectedSuppliers = await models.organization.findAll({
|
||||
let conOrgs = await models.organization.findAll({
|
||||
attributes: ['organizationId'],
|
||||
where: { isConnected: true }
|
||||
});
|
||||
|
||||
let i = 0, x = 1;
|
||||
for (let supplier of connectedSuppliers) {
|
||||
spinner.text = `Syncing ${i} supply lines of [${x++}|${connectedSuppliers.length}] suppliers...`
|
||||
for (let org of conOrgs) {
|
||||
spinner.text = `Syncing ${i} supply lines of [${x++}|${conOrgs.length}] organizations...`
|
||||
const params = new URLSearchParams({
|
||||
supplierOrganizationId: supplier.organizationId,
|
||||
supplierOrganizationId: org.organizationId,
|
||||
}).toString();
|
||||
let supplyLines = (await vnRequest('GET',`${env.API_URL}/supply-lines?${params}`)).data;
|
||||
if (!supplyLines.length) continue
|
||||
|
@ -287,7 +282,7 @@ export async function syncSupplyLines() {
|
|||
await insertItem(tradeItem);
|
||||
}
|
||||
|
||||
spinner.text = `Syncing ${i++} supply lines of [${x}|${connectedSuppliers.length}] suppliers...`
|
||||
spinner.text = `Syncing ${i++} supply lines of [${x}|${conOrgs.length}] organizations...`
|
||||
await models.supplyLine.upsert({
|
||||
...supplyLine,
|
||||
organizationId: supplyLine.supplierOrganizationId,
|
||||
|
@ -304,8 +299,8 @@ export async function syncSupplyLines() {
|
|||
|
||||
for (let volumePrice of supplyLine.volumePrices)
|
||||
await models.volumePrices.upsert({
|
||||
...volumePrice,
|
||||
supplyLineId: supplyLine.supplyLineId,
|
||||
...volumePrice,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -318,15 +313,13 @@ export async function syncSupplyLines() {
|
|||
}
|
||||
|
||||
/**
|
||||
* Insert trade item and dependences in db
|
||||
*
|
||||
* Insert trade item and dependences in the database.
|
||||
*
|
||||
* @param {Array} tradeItem
|
||||
*/
|
||||
export async function insertItem(tradeItem) {
|
||||
let tx;
|
||||
const tx = await models.sequelize.transaction();
|
||||
try {
|
||||
tx = await models.sequelize.transaction();
|
||||
|
||||
// Upsert trade item
|
||||
await models.tradeItem.upsert({
|
||||
...tradeItem,
|
||||
|
@ -339,8 +332,8 @@ export async function insertItem(tradeItem) {
|
|||
if (tradeItem.characteristics.length)
|
||||
for (const characteristic of tradeItem.characteristics) {
|
||||
await models.characteristic.upsert({
|
||||
...characteristic,
|
||||
tradeItemId: tradeItem.tradeItemId,
|
||||
...characteristic,
|
||||
}, { transaction: tx });
|
||||
}
|
||||
// Upsert seasonal periods
|
||||
|
@ -348,8 +341,8 @@ export async function insertItem(tradeItem) {
|
|||
if (tradeItem.seasonalPeriods.length)
|
||||
for (const seasonalPeriod of tradeItem.seasonalPeriods) {
|
||||
await models.seasonalPeriod.upsert({
|
||||
...seasonalPeriod,
|
||||
tradeItemId: tradeItem.tradeItemId,
|
||||
...seasonalPeriod,
|
||||
}, { transaction: tx });
|
||||
}
|
||||
|
||||
|
@ -412,14 +405,13 @@ export async function insertItem(tradeItem) {
|
|||
}
|
||||
|
||||
/**
|
||||
* Insert warehouse in db
|
||||
*
|
||||
* Insert warehouse in the database.
|
||||
*
|
||||
* @param {Array} warehouse
|
||||
*/
|
||||
export async function insertWarehouse(warehouse) {
|
||||
let tx;
|
||||
const tx = await models.sequelize.transaction();
|
||||
try {
|
||||
tx = await models.sequelize.transaction();
|
||||
await models.warehouses.upsert({
|
||||
...warehouse,
|
||||
location_gln: warehouse.location.gln,
|
||||
|
@ -438,17 +430,16 @@ export async function insertWarehouse(warehouse) {
|
|||
}
|
||||
|
||||
/**
|
||||
* Insert organization in db
|
||||
*
|
||||
* Insert organization in the database.
|
||||
*
|
||||
* @param {Array} organization
|
||||
*/
|
||||
export async function insertOrganization(organization) {
|
||||
let tx;
|
||||
const tx = await models.sequelize.transaction();
|
||||
try {
|
||||
tx = await models.sequelize.transaction();
|
||||
await models.organization.upsert({
|
||||
...organization,
|
||||
isConnected: JSON.parse(env.SUPPLIERS_ALWAYS_CONN),
|
||||
isConnected: JSON.parse(env.ORGS_ALWAYS_CONN),
|
||||
lastSync: moment(),
|
||||
});
|
||||
await tx.commit();
|
||||
|
@ -459,21 +450,21 @@ export async function insertOrganization(organization) {
|
|||
}
|
||||
|
||||
/**
|
||||
* Sync the warehouses for organizations that are connected
|
||||
* Sync the warehouses for organizations that are connected.
|
||||
**/
|
||||
export async function syncWarehouses(){
|
||||
let spinner = ora('Syncing warehouses...').start();
|
||||
try {
|
||||
const suppliers = await models.organization.findAll({
|
||||
const orgs = await models.organization.findAll({
|
||||
where: { isConnected: true }
|
||||
});
|
||||
|
||||
let x = 0, i = 0;
|
||||
for (let supplier of suppliers) {
|
||||
spinner.text = `Syncing ${i} warehouses of [${x++}|${suppliers.length}] suppliers...`
|
||||
const warehouses = (await vnRequest('GET', `${env.API_URL}/organizations/supplier/${supplier.organizationId}/warehouses`)).data;
|
||||
for (let org of orgs) {
|
||||
spinner.text = `Syncing ${i} warehouses of [${x++}|${orgs.length}] organizations...`
|
||||
const warehouses = (await vnRequest('GET', `${env.API_URL}/organizations/supplier/${org.organizationId}/warehouses`)).data;
|
||||
for (let warehouse of warehouses) {
|
||||
spinner.text = `Syncing ${i++} warehouses of [${x}|${suppliers.length}] suppliers...`
|
||||
spinner.text = `Syncing ${i++} warehouses of [${x}|${orgs.length}] organizations...`
|
||||
await insertWarehouse(warehouse);
|
||||
}
|
||||
}
|
||||
|
@ -486,7 +477,7 @@ export async function syncWarehouses(){
|
|||
}
|
||||
|
||||
/**
|
||||
* Removes Floriday connections that we don't have in the database
|
||||
* Removes Floriday connections that we don't have in the database.
|
||||
**/
|
||||
export async function deleteConnections() {
|
||||
let spinner;
|
||||
|
@ -523,13 +514,13 @@ export async function deleteConnections() {
|
|||
}
|
||||
|
||||
/**
|
||||
* Perform a REST request
|
||||
*
|
||||
* Perform a REST request.
|
||||
*
|
||||
* @param {String} url
|
||||
* @param {String} method
|
||||
* @param {Array} body
|
||||
* @param {Array} header
|
||||
*
|
||||
*
|
||||
* @return {Array}
|
||||
**/
|
||||
export async function vnRequest(method, url, data, headers) {
|
||||
|
@ -541,11 +532,11 @@ export async function vnRequest(method, url, data, headers) {
|
|||
};
|
||||
|
||||
while(true) {
|
||||
try {
|
||||
try {
|
||||
return (['GET', 'DELETE'].includes(method))
|
||||
? await axios({method, url, headers})
|
||||
: await axios({method, url, data, headers});
|
||||
} catch (err) {
|
||||
} catch (err) {
|
||||
switch (err.code) {
|
||||
case 'ECONNRESET': // Client network socket TLS
|
||||
case 'EAI_AGAIN': // getaddrinfo
|
||||
|
@ -583,13 +574,13 @@ export async function vnRequest(method, url, data, headers) {
|
|||
await sleep(env.MS_RETRY_UNHANDLED_ERROR);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Critical error
|
||||
*
|
||||
* Critical error.
|
||||
*
|
||||
* @param {err}
|
||||
**/
|
||||
export async function criticalError(err) {
|
||||
|
@ -598,8 +589,8 @@ export async function criticalError(err) {
|
|||
}
|
||||
|
||||
/**
|
||||
* Warning
|
||||
*
|
||||
* Warning.
|
||||
*
|
||||
* @param {err}
|
||||
**/
|
||||
export async function warning(err) {
|
||||
|
|
Loading…
Reference in New Issue