import { models } from './models/sequelize.js'; import { v4 as uuidv4 } from 'uuid'; import axios from 'axios'; import moment from 'moment'; import chalk from 'chalk'; import ora from 'ora'; import yml from 'js-yaml'; import fs from 'fs'; const env = process.env; const methods = yml.load(fs.readFileSync('./methods.yml', 'utf8')); const flModels = yml.load(fs.readFileSync('./models/models.yml', 'utf8')); const arrow = '└───────'; let spinner; /** * Check if the token is valid, and if it * is not, get a new one. * * @param {Boolean} isForce Force to request new token */ export async function checkToken(isForce) { try { await startSpin(`Checking token...`); const clientConfigData = await models.config.findOne(); let tokenExpiration, token, optionalMsg; if (clientConfigData) { token = clientConfigData.currentToken; tokenExpiration = clientConfigData.tokenExpiration; } if (isForce || !token || !tokenExpiration || moment().isAfter(tokenExpiration)) { await txtSpin(`Requesting new token...`); const clientId = JSON.parse(env.USE_SECRETS_DB) ? clientConfigData.clientId || env.CLIENT_ID : env.CLIENT_ID; const clientSecret = JSON.parse(env.USE_SECRETS_DB) ? clientConfigData.clientSecret || env.CLIENT_SECRET : env.CLIENT_SECRET; const data = new URLSearchParams({ grant_type: 'client_credentials', client_id: clientId, client_secret: clientSecret, scope: 'role:app catalog:read supply:read organization:read network:write network:read' }).toString(); const headers = { 'Content-Type': 'application/x-www-form-urlencoded' }; const res = (await vnRequest('POST', env.API_ENDPOINT, data, headers)).data; const tokenExpiration = moment() .add(res.expires_in, 's') .format('YYYY-MM-DD HH:mm:ss'); await updateClientConfig({ clientId, clientSecret, currentToken: res.access_token, tokenExpiration, }); } else optionalMsg = 'Using stored token...'; await okSpin(optionalMsg); } catch (err) { await await failSpin(err); } }; /** * Returns the current token. * * @returns {String} The current token */ export async function getCurrentToken() { return (await models.config.findOne()).currentToken; }; /** * Check the floriday data config. */ export async function checkConfig() { await startSpin(`Checking config...`); const excludedEnvVars = ['VSCODE_GIT_ASKPASS_EXTRA_ARGS']; const requiredEnvVars = Object.keys(env); const filteredEnvVars = requiredEnvVars.filter(reqEnvVar => !excludedEnvVars.includes(reqEnvVar)); for (const reqEnvVar of filteredEnvVars) if (!process.env[reqEnvVar]) await failSpin(new Error(`You haven't provided the ${reqEnvVar} environment variable`)); const clientConfigData = await models.config.findOne(); if (!clientConfigData) await updateClientConfig(env.CLIENT_ID, env.CLIENT_SECRET); await okSpin(); }; /** * Returns the expiration of current token. * * @returns {String} The expiration of current token */ export async function getCurrentTokenExpiration() { return (await models.config.findOne()).tokenExpiration; }; /** * Updates the access token in the client config table. * * @param {Array} clientConfig [clientId, clientSecret, currenToken, tokenExpiration] */ export async function updateClientConfig(clientConfig) { try { if (!JSON.parse(process.env.USE_SECRETS_DB)) clientId = clientSecret = null await models.config.upsert({ id: 1, ...clientConfig, }); } catch (err) { throw(err); } }; /** * Pauses the execution of the script for the specified number of milliseconds. * * @param {Integer} ms */ export async function sleep(ms) { await new Promise(resolve => setTimeout(resolve, ms)); }; /** * Sync a model. * * @param {String} model Supported models (./models/methods.yml) */ export async function syncModel(model) { await startSpin(`Syncing ${model}...`); try { const dbSeqNum = await models.sequenceNumber.findOne({ where: { model } }) let curSeqNum = dbSeqNum?.maxSequenceNumber ?? 0, i = 0; if (!flModels.includes(model)) throw new Error('Unsupported model'); const maxSeqNum = (await vnRequest('GET', `${env.API_URL}${methods[model].maxSeq.url}`)).data; for (curSeqNum; curSeqNum < maxSeqNum; curSeqNum++) { let params, misSeqNum; if (methods[model].sync.params) { params = new URLSearchParams(); for (const key in methods[model].sync.params) { params.append(key, methods[model].sync.params[key]); } params = params.toString(); } const res = (await vnRequest('GET', `${env.API_URL}${methods[model].sync.url}${curSeqNum}${params ? `?${params}` : ''}`)); if (res?.response?.status == 403 && res?.response?.data?.title === 'There are no connected suppliers.') { // Forbidden await await warnSpin(`Syncing ${model}...`, new Error(res.response.data.title), true); return; } const data = res.data.results; curSeqNum = res.data.maximumSequenceNumber; misSeqNum = maxSeqNum - curSeqNum; await insertModel(model, data); await txtSpin(`Syncing ${i = i + data.length} ${model}, ${misSeqNum} missing...`); await insertSequenceNumber(model, curSeqNum); } await insertSequenceNumber(model, maxSeqNum); await txtSpin((i) ? `Syncing ${i} ${model}...` : `Syncing ${model}...`); await okSpin(); } catch (err) { await await failSpin(err); } }; /** * Insert data to a model. * * @param {String} model Supported models (./models/methods.yml) * @param {Array} data An array containing the data to be inserted */ export async function insertModel(model, data) { try { switch (model) { case 'organizations': await insertOrganizations(data); break; case 'warehouses': await insertWarehouses(data); break; case 'tradeItems': await insertTradeItems(data); break; case 'supplyLines': await insertSupplyLines(data); break; case 'clockPresalesSupply': await insertClockPresalesSupply(data); break; default: throw new Error('Unsupported model'); } } catch (err) { throw err; } }; /** * Check (create and/or remove) the connections in Floriday. */ export async function checkConnections(){ try { await startSpin('Checking connections...'); await createConnections(); await deleteConnections(); if (spinner) await okSpin(); } catch (err) { await failSpin(err); } }; /** * Insert sequence number in the database. * * @param {String} model The model identifier * @param {Number} sequenceNumber The sequence number */ export async function insertSequenceNumber(model, sequenceNumber) { const tx = await models.sequelize.transaction(); try { await models.sequenceNumber.upsert({ model: model, maxSequenceNumber: sequenceNumber, }, { transaction: tx }); await tx.commit(); } catch (err) { await tx.rollback(); throw err; } }; /** * Insert trade items and dependencies in the database. * * @param {Array} tradeItems An array containing the trade item data to be inserted */ export async function insertTradeItems(tradeItems) { const tx = await models.sequelize.transaction(); try { const tradeItemsData = tradeItems.map((tradeItem) => ({ ...tradeItem, organizationId: tradeItem.supplierOrganizationId, lastSync: moment().format('YYYY-MM-DD HH:mm:ss'), })); await models.tradeItem.bulkCreate(tradeItemsData, { updateOnDuplicate: [ 'tradeItemId', 'code', 'gtin', 'vbnProductCode', 'name', 'isDeleted', 'sequenceNumber', 'tradeItemVersion', 'isCustomerSpecific', 'isHiddenInCatalog', 'organizationId', ], transaction: tx, }); const characteristics = []; const seasonalPeriods = []; const photos = []; const packingConfigurations = []; const countryOfOriginIsoCodes = []; const botanicalNames = []; for (const tradeItem of tradeItemsData) { if (tradeItem.characteristics?.length) for (const characteristic of tradeItem.characteristics) characteristics.push({ tradeItemId: tradeItem.tradeItemId, ...characteristic, }); if (tradeItem.seasonalPeriods?.length) for (const seasonalPeriod of tradeItem.seasonalPeriods) seasonalPeriods.push({ tradeItemId: tradeItem.tradeItemId, ...seasonalPeriod, }); if (tradeItem.photos?.length) for (const photo of tradeItem.photos) photos.push({ ...photo, tradeItemId: tradeItem.tradeItemId, }); if (tradeItem.packingConfigurations?.length) { for (const packingConfiguration of tradeItem.packingConfigurations) { const uuid = uuidv4(); packingConfigurations.push({ packingConfigurationId: uuid, ...packingConfiguration, additionalPricePerPiece_currency: packingConfiguration.additionalPricePerPiece.currency, additionalPricePerPiece_value: packingConfiguration.additionalPricePerPiece.value, tradeItemId: tradeItem.tradeItemId, }); models.package.upsert({ ...packingConfiguration.package, packingConfigurationId: uuid, }, { transaction: tx }); } } if (tradeItem.countryOfOriginIsoCodes?.length) for (const isoCode of tradeItem.countryOfOriginIsoCodes) countryOfOriginIsoCodes.push({ isoCode, tradeItemId: tradeItem.tradeItemId, }); if (tradeItem.botanicalNames?.length) for (const botanicalName of tradeItem.botanicalNames) botanicalNames.push({ botanicalNameId: uuidv4(), name: botanicalName, tradeItemId: tradeItem.tradeItemId, }); } if (characteristics?.length) await models.characteristic.bulkCreate(characteristics, { updateOnDuplicate: ['tradeItemId', 'vbnCode', 'vbnValueCode'], transaction: tx, }); if (seasonalPeriods?.length) await models.seasonalPeriod.bulkCreate(seasonalPeriods, { updateOnDuplicate: ['tradeItemId', 'startWeek', 'endWeek'], transaction: tx, }); if (photos?.length) await models.photo.bulkCreate(photos, { updateOnDuplicate: ['tradeItemId', 'url', 'type', 'primary'], transaction: tx, }); if (packingConfigurations?.length) await models.packingConfiguration.bulkCreate(packingConfigurations, { updateOnDuplicate: [ 'packingConfigurationId', 'piecesPerPackage', 'bunchesPerPackage', 'photoUrl', 'packagesPerLayer', 'layersPerLoadCarrier', 'additionalPricePerPiece_currency', 'additionalPricePerPiece_value', 'transportHeightInCm', 'loadCarrierType', 'isPrimary', ], transaction: tx, }); if (countryOfOriginIsoCodes?.length) await models.countryOfOriginIsoCode.bulkCreate(countryOfOriginIsoCodes, { updateOnDuplicate: ['tradeItemId', 'isoCode'], transaction: tx, }); if (botanicalNames?.length) await models.botanicalName.bulkCreate(botanicalNames, { updateOnDuplicate: ['botanicalNameId', 'name', 'tradeItemId'], transaction: tx, }); await tx.commit(); } catch (err) { await tx.rollback(); throw err; } }; /** * Insert clock presales supply in the database. * * @param {Array} clockPresalesSupply An array containing the clockPresaleSupplies data to be inserted */ export async function insertClockPresalesSupply(clockPresalesSupply) { const tx = await models.sequelize.transaction(); try { const clockPresalesSupplyWithDefaults = clockPresalesSupply.map((clockPresaleSupply) => ({ ...clockPresaleSupply, pricePerPiece_currency: clockPresaleSupply.pricePerPiece.currency, pricePerPiece_value: clockPresaleSupply.pricePerPiece.value, organizationId: clockPresaleSupply.supplierOrganizationId, })); await models.clockPresaleSupply.bulkCreate(clockPresalesSupplyWithDefaults, { updateOnDuplicate: [ 'supplyLineId', 'status', 'tradeItemId', 'pricePerPiece_currency', 'pricePerPiece_value', 'deliveryNoteReference', 'numberOfPieces', 'packingConfigurations', 'tradePeriod_startDateTime', 'tradePeriod_endDateTime', 'organizationId', 'tradeInstrument', 'salesChannel', 'sequenceNumber', 'creationDateTime', 'lastModifiedDateTime', ], transaction: tx, }); await tx.commit(); } catch (err) { await tx.rollback(); throw err; } }; /** * Insert warehouses in the database. * * @param {Array} warehouses An array containing the warehouses data to be inserted */ export async function insertWarehouses(warehouses) { const tx = await models.sequelize.transaction(); try { const warehousesWithDefaults = warehouses.map((warehouse) => ({ ...warehouse, location_gln: warehouse.location.gln, location_address_addressLine: warehouse.location.address.addressLine, location_address_city: warehouse.location.address.city, location_address_countryCode: warehouse.location.address.countryCode, location_address_postalCode: warehouse.location.address.postalCode, location_address_stateOrProvince: warehouse.location.address.stateOrProvince, lastSync: moment().format('YYYY-MM-DD HH:mm:ss'), })); await models.warehouse.bulkCreate(warehousesWithDefaults, { updateOnDuplicate: [ 'warehouseId', 'name', 'location_gln', 'location_address_addressLine', 'location_address_city', 'location_address_countryCode', 'location_address_postalCode', 'location_address_stateOrProvince', 'isDeleted', 'sequenceNumber', 'organizationId', 'lastSync', ], transaction: tx, }); await tx.commit(); } catch (err) { await tx.rollback(); throw err; } }; /** * Insert organizations in the database. * * @param {Array} organizations An array containing the organizations data to be inserted */ export async function insertOrganizations(organizations) { const tx = await models.sequelize.transaction(); try { const organizationsWithDefaults = organizations.map((organization) => ({ ...organization, isConnected: JSON.parse(env.ORGS_ALWAYS_CONN), lastSync: moment().format('YYYY-MM-DD HH:mm:ss'), })); await models.organization.bulkCreate(organizationsWithDefaults, { updateOnDuplicate: [ 'organizationId', 'sequenceNumber', 'companyGln', 'name', 'commercialName', 'email', 'phone', 'website', 'rfhRelationId', 'paymentProviders', 'endDate', 'mailingAddress', 'physicalAddress', 'isConnected', 'lastSync', ], transaction: tx, }); await tx.commit(); } catch (err) { await tx.rollback(); throw err; } }; /* Checkear dependecias supply line // Check if the warehouse exists, and if it doesn't, create it let warehouse = await models.warehouse.findOne({ where: { warehouseId: supplyLine.warehouseId } }); if (!warehouse) { let warehouse = (await vnRequest('GET', `${env.API_URL}/warehouses/${supplyLine.warehouseId}`)).data; // Check if the organization exists, and if it doesn't, create it let organization = await models.organization.findOne({ where: { organizationId: warehouse.organizationId } }, { transaction: tx }); if (!organization) { let organization = (await vnRequest('GET', `${env.API_URL}/organizations/${warehouse.organizationId}`)).data; await insertOrganization(organization); } await insertWarehouse(warehouse); } // Check if the trade item exists, and if it doesn't, create it let tradeItem = await models.tradeItem.findOne({ where: { tradeItemId: supplyLine.tradeItemId } }, { transaction: tx }); if (!tradeItem) { let tradeItem = (await vnRequest('GET', `${env.API_URL}/trade-items/${supplyLine.tradeItemId}`)).data; await insertTradeItem(tradeItem); } */ /** * Insert supply lines and dependencies in the database. * * @param {Array} supplyLines An array containing the supply line data to be inserted */ export async function insertSupplyLines(supplyLines) { const tx = await models.sequelize.transaction(); try { const supplyLinesData = supplyLines.map((supplyLine) => ({ ...supplyLine, organizationId: supplyLine.supplierOrganizationId, deliveryPeriodStartDateTime: supplyLine.deliveryPeriod?.startDateTime ?? null, deliveryPeriodEndDateTime: supplyLine.deliveryPeriod?.endDateTime ?? null, orderPeriodStartDateTime: supplyLine.orderPeriod?.startDateTime ?? null, orderPeriodEndDateTime: supplyLine.orderPeriod?.endDateTime ?? null, agreementReference_code: supplyLine.agreementReference?.code ?? null, agreementReference_description: supplyLine.agreementReference?.description ?? null, lastSync: moment().format('YYYY-MM-DD HH:mm:ss'), })); await models.supplyLine.bulkCreate(supplyLinesData, { updateOnDuplicate: [ 'supplyLineId', 'status', 'numberOfPieces', 'deliveryPeriodStartDateTime', 'deliveryPeriodEndDateTime', 'orderPeriodStartDateTime', 'orderPeriodEndDateTime', 'warehouseId', 'sequenceNumber', 'type', 'isDeleted', 'salesUnit', 'agreementReferenceCode', 'agreementReferenceDescription', 'isLimited', 'isCustomerSpecific', 'tradeItemId', 'organizationId', 'lastSync', ], transaction: tx, }); const packingConfigurations = []; const volumePrices = []; for (const supplyLine of supplyLinesData) if (supplyLine.packingConfigurations?.length) { for (const packingConfiguration of supplyLine.packingConfigurations) packingConfigurations.push({ packingConfigurationId: uuidv4(), ...packingConfiguration, packageVbnPackageCode: packingConfiguration.package.vbnPackageCode, packageCustomPackageId: packingConfiguration.package.customPackageId, additionalPricePerPieceCurrency: packingConfiguration.additionalPricePerPiece.currency, additionalPricePerPieceValue: packingConfiguration.additionalPricePerPiece.value, supplyLineId: supplyLine.supplyLineId, }); if (supplyLine.volumePrices?.length) for (const volumePrice of supplyLine.volumePrices) volumePrices.push({ supplyLineId: supplyLine.supplyLineId, ...volumePrice, }); } if (packingConfigurations.length) await models.supplyLinePackingConfiguration.bulkCreate(packingConfigurations, { updateOnDuplicate: [ 'packingConfigurationId', 'packageVbnPackageCode', 'packageCustomPackageId', 'piecesPerPackage', 'bunchesPerPackage', 'photoUrl', 'packagesPerLayer', 'layersPerLoadCarrier', 'transportHeightInCm', 'loadCarrierType', 'additionalPricePerPieceCurrency', 'additionalPricePerPieceValue', 'isPrimary' ], transaction: tx, }); if (volumePrices.length) await models.volumePrice.bulkCreate(volumePrices, { updateOnDuplicate: ['supplyLineId', 'unit', 'pricePerPiece'], transaction: tx, }); await tx.commit(); } catch (err) { await tx.rollback(); throw err; } }; /** * Create the connections in Floriday of the connected organizations. **/ export async function createConnections() { try { const flConnections = (await vnRequest('GET', `${env.API_URL}${methods.connections.base.url}`)).data; const dbConnections = await models.organization.findAll({ where: { isConnected: true } }); let connectionsToPut = [], i = 0; dbConnections.forEach(value => { if (!flConnections.includes(value.organizationId)) connectionsToPut.push(value.organizationId); }); if (connectionsToPut.length && !spinner) await startSpin('Creating connections...'); for (let connection of connectionsToPut) { await vnRequest('PUT', `${env.API_URL}${methods.connections.base.url}${connection}`); await txtSpin(`Creating ${i++} connections, ${connectionsToPut.length - i} missing...`); } if (spinner && i) await okSpin(`Creating ${i++} connections...`); } catch (err) { throw err; } } /** * Removes Floriday connections that we don't have in the database. **/ export async function deleteConnections() { try { const flConnections = (await vnRequest('GET', `${env.API_URL}/connections`)).data; const dbConnections = await models.organization.findAll({ where: { isConnected: false } }); let ghostConnections = [], i = 0; dbConnections.forEach(value => { if (flConnections.includes(value.organizationId)) ghostConnections.push(value.organizationId); }); if (ghostConnections.length && !spinner) await startSpin('Deleting connections...'); for (let connection of ghostConnections) { await vnRequest('DELETE', `${env.API_URL}/connections/${connection}`); await txtSpin(`Deleting ${i++} connections, ${ghostConnections.length - i} missing...`) } if (spinner && i) await okSpin(`Deleting ${i++} connections...`); } catch (err) { await criticalSpin(err); } }; /** * Perform a REST request. * * @param {String} url The URL of the REST request * @param {String} method The HTTP method of the request (e.g., GET, POST, PUT, DELETE) * @param {Array} body The body of the request, typically an array of data to be sent * @param {Array} header The headers of the request, typically an array of key-value pairs * * @return {Array} An array containing the response data from the REST request **/ export async function vnRequest(method, url, data, headers) { if (!headers) headers = { 'Content-Type': 'application/json', 'Authorization': `Bearer ${await getCurrentToken()}`, 'X-Api-Key': process.env.API_KEY, }; while(true) { try { return (['GET', 'DELETE'].includes(method)) ? await axios({method, url, headers}) : await axios({method, url, data, headers}); } catch (err) { switch (err.code) { case 'ECONNRESET': // Client network socket TLS case 'EAI_AGAIN': // getaddrinfo || Lost connection await warnSpin(null, err, false); await sleep(env.MS_RETRY_LOST_CONNECTION); break; case 'ECONNABORTED': case 'ECONNREFUSED': case 'ERR_BAD_REQUEST': switch (err.response.status) { case 404, 403: // Not found and Forbidden return err; case 504: case 502: await warnSpin(null, err, false); await sleep(1000); break; case 429: // Too Many Requests await warnSpin(null, err, false); await sleep(60000); break; case 401: // Unauthorized await warnSpin(null, err, false); await checkToken(true); headers.Authorization ? headers.Authorization = `Bearer ${await getCurrentToken()}` : handler('critical', err); break; default: await warnSpin(null, err, false); await sleep(env.MS_RETRY_UNHANDLED_ERROR); break; } break; default: await warnSpin(null, err, false); await sleep(env.MS_RETRY_UNHANDLED_ERROR); break; } if (!err?.code === 'EAI_AGAIN') await startSpin(); } } }; /** * Sets the text of spinner. * * @param {String} msg Text of spinner * @param {Boolean} isNew Reinstantiate the object **/ export async function startSpin(msg, isKeep) { if (JSON.parse(env.TIME_STAMPS) && msg) msg = `${chalk.gray(`[${new moment().format('YYYY-MM-DD hh:mm:ss A')}]`)} ${msg}`; (isKeep) ? spinner.start() : spinner = ora({ text: msg, spinner: 'arc', interval: 40, color: 'white', }).start(); }; /** * Sets the text of spinner. * * @param {String} msg Text of spinner **/ export async function txtSpin(msg) { if (!spinner) { startSpin(msg); return; } if (JSON.parse(env.TIME_STAMPS) && msg) msg = `${chalk.gray(`[${new moment().format('YYYY-MM-DD hh:mm:ss A')}]`)} ${msg}`; spinner.text = msg; }; /** * Sets the spinner to ok. * * @param {String} msg Text of spinner **/ export async function okSpin(msg) { if (JSON.parse(env.TIME_STAMPS) && msg) msg = `${chalk.gray(`[${new moment().format('YYYY-MM-DD hh:mm:ss A')}]`)} ${msg ?? ''}`; if (spinner) { spinner.succeed(msg); spinner = null; } }; /** * Sets the spinner to waning and throw a warning. * * @param {String} msg Text of spinner * @param {Error} err Error object * @param {Boolean} clear Clean the instance **/ export async function warnSpin(msg, err, clear) { if (JSON.parse(env.TIME_STAMPS) && msg) msg = `${chalk.gray(`[${new moment().format('YYYY-MM-DD hh:mm:ss A')}]`)} ${msg}`; if (spinner) { spinner.warn(msg); if (clear) spinner = null; } if (err) await handler('warning', err); }; /** * Sets the spinner to fail and throw a error. * * @param {Error} err Error object **/ export async function failSpin(err) { if (spinner) { spinner.fail(); spinner = null; } if (err) throw err; }; /** * Sets the spinner to fail and throw a critical error. * * @param {Error} err Error object **/ export async function criticalSpin(err) { if (spinner) { spinner.fail(); spinner = null; } handler('critical', err); }; /** * Separator. * * @param {String} msg String to show **/ export async function separator(msg) { console.log(chalk.gray(`↺ ──────────────────────── ${msg}`)); }; /** * Function to handle error messages. * * @param {String} type Type name * @param {Error} err Error object **/ export async function handler(type, err) { let header = (`${arrow} [${type.toUpperCase()}]`); let msg = (err.response?.status && err.response?.data?.message) ? `${err.response.status} - ${err.response?.data?.message}` : err.message; switch (type) { case 'critical': header = chalk.red.bold(header); msg = chalk.red(msg); break; case 'warning': header = chalk.yellow.bold(header); msg = chalk.yellow(msg); break; default: console.error('Unhandled handler type'); process.exit(); }; console.log(header, msg); if (type === 'critical') process.exit(); };