import { models } from './models/sequelize.js'; import { v4 as uuidv4 } from 'uuid'; import axios from 'axios'; import moment from 'moment'; import chalk from 'chalk'; import ora from 'ora'; const env = process.env; /** * Gets the Access Token * * @param {Boolean} isForce Force to request new token */ export async function requestToken(isForce = false) { let spinner = ora(`Requesting new token...`).start(); try { const clientConfigData = await models.clientConfig.findOne(); let tokenExpirationDate, token; if (clientConfigData) { token = clientConfigData.currentToken; tokenExpirationDate = clientConfigData.tokenExpiration; } if (isForce || !token || !tokenExpirationDate || moment().isAfter(tokenExpirationDate)) { let clientId, clientSecret if (JSON.parse(env.USE_SECRETS_DB)) { clientId = clientConfigData.clientId; clientSecret = clientConfigData.clientSecret; } else { clientId = env.CLIENT_ID clientSecret = env.CLIENT_SECRET }; let data = { grant_type: 'client_credentials', client_id: clientId, client_secret: clientSecret, scope: 'role:app catalog:read supply:read organization:read network:write network:read' }; data = Object.keys(data) .map(key => `${encodeURIComponent(key)}=${encodeURIComponent(data[key])}`) .join('&') const headers = { 'Content-Type': 'application/x-www-form-urlencoded' }; const response = (await vnRequest('POST', env.API_ENDPOINT, data, headers)).data; if (response.statusText = 'OK') spinner.succeed(); else { spinner.fail(); criticalError(new Error(`Token request failed with status: ${response.status} - ${response.statusText}`)); } let tokenExpirationDate = moment() .add(response.expires_in, 's') .format('YYYY-MM-DD HH:mm:ss'); await updateClientConfig( clientId, clientSecret, response.access_token, tokenExpirationDate ); } else spinner.succeed('Using stored token...'); } catch (err) { spinner.fail(); throw err; } } /** * Returns the current token * * @returns {string} */ export async function getCurrentToken() { return (await models.clientConfig.findOne()).currentToken; } /** * Check the floriday data config */ export async function checkConfig() { const spinner = ora(`Checking config...`).start(); const excludedEnvVars = ['VSCODE_GIT_ASKPASS_EXTRA_ARGS']; const requiredEnvVars = Object.keys(env); const filteredEnvVars = requiredEnvVars.filter(reqEnvVar => !excludedEnvVars.includes(reqEnvVar)); for (const reqEnvVar of filteredEnvVars) { if (!process.env[reqEnvVar]) { spinner.fail(); throw new Error(`You haven't provided the ${reqEnvVar} environment variable`); } } const clientConfigData = await models.clientConfig.findOne(); if (!clientConfigData) await updateClientConfig(env.CLIENT_ID, env.CLIENT_SECRET); spinner.succeed(); } /** * Returns the expiration of current token * * @returns {string} */ export async function getCurrentTokenExpiration() { return (await models.clientConfig.findOne()).tokenExpiration; } /** * Updates the access token in the client config table * * @param {String} clientId * @param {String} clientSecret * @param {String} accessToken * @param {String} tokenExpirationDate */ export async function updateClientConfig(clientId, clientSecret, currentToken, tokenExpiration) { try { const spinner = ora(`Updating token...`).start(); if (!JSON.parse(process.env.USE_SECRETS_DB)) clientId = clientSecret = null await models.clientConfig.upsert({ id: 1, clientId, clientSecret, currentToken, tokenExpiration, }); spinner.succeed(); } catch (err) { spinner.fail(); throw(err); } } /** * pauses the execution of the script for the given amount of milliseconds * * @param {integer} ms * @returns A promise that resolves after ms milliseconds. */ export async function sleep(ms) { return new Promise((resolve) => { setTimeout(resolve, ms); }); } /** * Recieves an array of functions and executes them in a queue with the given concurrency * * @param {Array} fnArray * @param {Number} concurrency * @returns */ export async function asyncQueue(fnArray, concurrency = 1) { const results = []; // 1 const queue = fnArray.map((fn, index) => () => fn().then((result) => results[index] = result)); const run = async () => { // 2 const fn = queue.shift(); if (fn) { await fn(); await run(); } }; const promises = []; // 3 while (concurrency--) { // 4 promises.push(run()); } await Promise.all(promises); // 5 return results; } // 1. Create an array of functions that will be executed in a queue // 2. Create a function that will execute the functions in the queue // 3. Create an array of promises that will execute the run function // 4. Execute the run function while the concurrency is greater than 0 // 5. Return the results /** * Syncs the sequence number for the given model * if no params are given it will reset all the sequence number to 0 * * @param {Number} current - current sequence number * @param {String} model - model name * @param {Number} maximumSequenceNumber - maximum sequence number * @returns */ export async function syncSequence(current = 0, model = null , maximumSequenceNumber = 0){ if (model == null && current == 0){ try { const spinner = ora(`Syncing sequence...`).start(); let mockModels = [ 'supplier', 'tradeItems', 'supplyLines', ]; let i = 1; for (let mockModel in mockModels) { spinner.text = `Syncing ${i++} sequences...` const element = mockModels[mockModel]; await syncSequence(0, element); } spinner.succeed(); } catch (err) { spinner.fail(); throw new Error(err); } } else if (current) { let tx = await models.sequelize.transaction(); try { let sequence = await models.sequenceNumber.findOrCreate({ where: { model: model }, defaults: { model: model, sequenceNumber: current, maximumSequenceNumber: maximumSequenceNumber }, transaction: tx }); if (sequence[1] == false){ await models.sequenceNumber.update({ sequenceNumber: current, maximumSequenceNumber: maximumSequenceNumber }, { where: { model: model }, transaction: tx }); } await tx.commit(); } catch (error) { await tx.rollback(); console.log(`Error while syncing sequence number for: ${model}: ${error}`); } } } /** * Sync the suppliers */ export async function syncSuppliers(){ let spinner = ora('Preparing to load suppliers...').start(); try { let headers = { 'Content-Type': 'application/json', 'Authorization': `Bearer ${await getCurrentToken()}`, 'X-Api-Key': process.env.API_KEY, }; const maxSequenceNumber = (await vnRequest('GET', `${env.API_URL}/organizations/current-max-sequence`, null, headers)).data; let timeFinish, timeToGoSec, timeToGoMin, timeLeft; for (let curSequenceNumber = 0; curSequenceNumber <= maxSequenceNumber; curSequenceNumber++) { let timeStart = new moment(); let data = (await vnRequest('GET', `${env.API_URL}/organizations/sync/${curSequenceNumber}?organizationType=SUPPLIER`, null, headers)).data; let suppliers = data.results; for (let supplier of suppliers) { curSequenceNumber = supplier.sequenceNumber; spinner.text = `Syncing suppliers, ${maxSequenceNumber - curSequenceNumber} are missing` if (timeFinish) spinner.text = spinner.text + ` (${timeLeft})` await models.supplier.upsert({ ...supplier, supplierOrganizationId: supplier.organizationId, isConnected: JSON.parse(env.SUPPLIERS_ALWAYS_CONN), lastSync: moment(), }); }; await syncSequence(curSequenceNumber, 'supplier', maxSequenceNumber); timeFinish = new moment(); timeToGoSec = (timeFinish.diff(timeStart, 'seconds') * (maxSequenceNumber - curSequenceNumber) / 1000) timeToGoMin = Math.trunc(timeToGoSec / 60) if (!timeToGoMin) timeLeft = `${Math.trunc(timeToGoSec)} sec` else timeLeft = `${timeToGoMin} min` } spinner.text = `Syncing suppliers...`; spinner.succeed() } catch (err) { spinner.fail(); throw new Error(err); } } /** * Create the connections in Floriday */ export async function syncConnections(){ await deleteConnections(); const spinner = ora(`Creating connections...`).start(); try { let connections = await models.connection.findAll(); let headers = { 'Content-Type': 'application/json', 'Authorization': `Bearer ${await getCurrentToken()}`, 'X-Api-Key': process.env.API_KEY, }; const remoteConnections = (await vnRequest('GET', `${env.API_URL}/connections`, null, headers)).data; let i = 1; for (let connection of connections) { spinner.text = `Creating ${i++} of ${connections.length} connections...` let remoteConnection = remoteConnections.find(remoteConnection => remoteConnection == connection.supplierOrganizationId); if (!remoteConnection) await vnRequest('PUT', `${env.API_URL}/connections/${connection.supplierOrganizationId}`, null, headers); await models.connection.update({isConnected: true }, { where: { supplierOrganizationId: connection.supplierOrganizationId } }); await models.supplier.update({ isConnected: true }, { where: { supplierOrganizationId: connection.supplierOrganizationId } }); } spinner.succeed(); } catch (err) { spinner.fail(); throw new Error(err); } } /** * Sync the connections in Floriday */ export async function syncTradeItems(){ const spinner = ora(`Syncing trade items...`).start(); const suppliers = await models.supplier.findAll({ where: { isConnected: true } }); let i = 0, x = 1; for (let supplier of suppliers) { let headers = { 'Content-Type': 'application/json', 'Authorization': `Bearer ${await getCurrentToken()}`, 'X-Api-Key': process.env.API_KEY }; try { let tradeItems = (await vnRequest('GET', `${env.API_URL}/trade-items?supplierOrganizationId=${supplier.supplierOrganizationId}`, null, headers)).data spinner.text = `Syncing ${i} trade items of [${x++}|${suppliers.length}] suppliers...` if (!tradeItems.length) continue; for (let tradeItem of tradeItems) { await insertItem(tradeItem); spinner.text = `Syncing ${i++} trade items of [${x}|${suppliers.length}] suppliers...` }; } catch (err) { spinner.fail(); throw err; } } spinner.succeed() } /** * Sync the supply lines for suppliers that are connected */ export async function syncSupplyLines() { const spinner = ora(`(NEW) Syncing supply lines...`).start(); try { let headers = { 'Content-Type': 'application/json', 'Authorization': `Bearer ${await getCurrentToken()}`, 'X-Api-Key': process.env.API_KEY }; let suppliersWithTradeItem = await models.tradeItem.findAll({ attributes: ['supplierOrganizationId'], group: ['supplierOrganizationId'] }); let connectedSuppliers = await models.supplier.findAll({ attributes: ['supplierOrganizationId'], where: { isConnected: true } }); let suppliers = suppliersWithTradeItem.filter(supplier => { return connectedSuppliers.some(connectedSupplier => { return connectedSupplier.supplierOrganizationId === supplier.supplierOrganizationId; }); }).map(supplier => supplier.supplierOrganizationId); let i = 0, x = 1; for (let supplier of suppliers) { spinner.text = `(NEW) Syncing ${i} supply lines of [${x++}|${suppliers.length}] suppliers...` const params = new URLSearchParams({ supplierOrganizationId: supplier, }).toString(); let supplyLines = (await vnRequest('GET',`${env.API_URL}/supply-lines?${params}`, null, headers)).data; if (!supplyLines.length) continue for (let supplyLine of supplyLines) { let tradeItem = await models.tradeItem.findOne({ where: { tradeItemId: supplyLine.tradeItemId } }); if (!tradeItem) { let tradeItem = (await vnRequest('GET', `${env.API_URL}/trade-items?tradeItemIds=${supplyLine.tradeItemId}`, null, headers)).data; insertItem(tradeItem[0]) } spinner.text = `(NEW) Syncing ${i++} supply lines of [${x}|${suppliers.length}] suppliers...` await models.supplyLine.upsert({ ...supplyLine, pricePerPiece_currency: supplyLine.pricePerPiece ? supplyLine.pricePerPiece.currency : null, pricePerPiece_value: supplyLine.pricePerPiece ? supplyLine.pricePerPiece.value : null, deliveryPeriod_startDateTime: supplyLine.deliveryPeriod ? supplyLine.deliveryPeriod.startDateTime : null, deliveryPeriod_endDateTime: supplyLine.deliveryPeriod ? supplyLine.deliveryPeriod.endDateTime : null, orderPeriod_startDateTime: supplyLine.orderPeriod ? supplyLine.orderPeriod.startDateTime : null, orderPeriod_endDateTime: supplyLine.orderPeriod ? supplyLine.orderPeriod.endDateTime : null, agreementReference_code: supplyLine.agreementReference ? supplyLine.agreementReference.code : null, agreementReference_description: supplyLine.agreementReference ? supplyLine.agreementReference.description : null, lastSync: moment(), }); for (let volumePrice of supplyLine.volumePrices) await models.volumePrices.upsert({ ...volumePrice, supplyLineId: supplyLine.supplyLineId, }); } } spinner.succeed(); } catch (err) { spinner.fail(); throw err; } } /** * Insert the trade item * * @param {array} tradeItem */ export async function insertItem(tradeItem) { let tx; try { tx = await models.sequelize.transaction(); // Upsert supplier connection await models.connection.upsert({ supplierOrganizationId: tradeItem.supplierOrganizationId, }, { transaction: tx }); // Upsert trade item await models.tradeItem.upsert({ ...tradeItem, lastSync: moment(), }, { transaction: tx }); // Upsert characteristics if (tradeItem.characteristics) if (tradeItem.characteristics.length) for (const characteristic of tradeItem.characteristics) { await models.characteristic.upsert({ ...characteristic, tradeItemId: tradeItem.tradeItemId, }, { transaction: tx }); } // Upsert seasonal periods if (tradeItem.seasonalPeriods) if (tradeItem.seasonalPeriods.length) for (const seasonalPeriod of tradeItem.seasonalPeriods) { await models.seasonalPeriod.upsert({ ...seasonalPeriod, tradeItemId: tradeItem.tradeItemId, }, { transaction: tx }); } // Upsert photos if (tradeItem.photos) if (tradeItem.photos.length) for (const photo of tradeItem.photos) { await models.photo.upsert({ ...photo, tradeItemId: tradeItem.tradeItemId, }, { transaction: tx }); } // Upsert packing configurations if (tradeItem.packingConfigurations) if (tradeItem.packingConfigurations.length) for (const packingConfiguration of tradeItem.packingConfigurations) { const uuid = uuidv4(); await models.packingConfiguration.upsert({ packingConfigurationId: uuid, ...packingConfiguration, additionalPricePerPiece_currency: packingConfiguration.additionalPricePerPiece.currency, additionalPricePerPiece_value: packingConfiguration.additionalPricePerPiece.value, tradeItemId: tradeItem.tradeItemId, }, { transaction: tx }); await models.package.upsert({ ...packingConfiguration.package, packingConfigurationFk: uuid, }, { transaction: tx }); } // Upsert country of origin ISO codes if (tradeItem.countryOfOriginIsoCodes) if (tradeItem.countryOfOriginIsoCodes.length) for (const isoCode of tradeItem.countryOfOriginIsoCodes || []) { await models.countryOfOriginIsoCode.upsert({ isoCode, tradeItemId: tradeItem.tradeItemId, }, { transaction: tx }); } // Upsert botanical names if (tradeItem.botanicalNames) if (tradeItem.botanicalNames.length) for (const botanicalName of tradeItem.botanicalNames) { await models.botanicalName.upsert({ name: botanicalName, tradeItemId: tradeItem.tradeItemId, }, { transaction: tx }); } await tx.commit(); } catch (err) { await tx.rollback(); throw err; } } /** * Sync the warehouses **/ export async function syncWarehouses(){ let spinner = ora('Syncing warehouses...').start(); try { let headers = { 'Content-Type': 'application/json', 'Authorization': `Bearer ${await getCurrentToken()}`, 'X-Api-Key': process.env.API_KEY, }; const suppliers = await models.supplier.findAll(); let x = 0, i = 1; for (let supplier of suppliers) { spinner.text = `Syncing ${i} warehouses of [${x++}|${suppliers.length}]...` const warehouses = (await vnRequest('GET', `${env.API_URL}/organizations/supplier/${supplier.supplierOrganizationId}/warehouses`, null, headers)).data; for (let warehouse of warehouses) { spinner.text = `Syncing ${i++} warehouses of [${x}|${suppliers.length}]...` await models.warehouses.upsert({ ...warehouse, supplierOrganizationId: warehouse.organizationId, location_gln: warehouse.location.gln, location_address_addressLine: warehouse.location.address.addressLine, location_address_city: warehouse.location.address.city, location_address_countryCode: warehouse.location.address.countryCode, location_address_postalCode: warehouse.location.address.postalCode, location_address_stateOrProvince: warehouse.location.address.stateOrProvince, lastSync: moment(), }); } } } catch (err) { spinner.fail(); throw new Error(err); } } /** * Removes Floriday connections that we don't have in the database **/ export async function deleteConnections() { const spinner = ora(`Deleting connections that aren't in the db...`).start(); try { let i = 1; const headers = { 'Content-Type': 'application/json', 'Authorization': `Bearer ${await getCurrentToken()}`, 'X-Api-Key': process.env.API_KEY }; const connectionsInFloriday = (await vnRequest('GET', `${env.API_URL}/connections`, null, headers)).data; const connectionsInDb = await models.connection.findAll(); let isExists = false, ghostConnections = []; for (let connectionInFloriday of connectionsInFloriday) { for (let connectionInDb of connectionsInDb) if (connectionInFloriday == connectionInDb.supplierOrganizationId) { isExists = true; break; } if (!isExists) ghostConnections.push(connectionInFloriday) isExists = false; } for (let connection of ghostConnections) { await vnRequest('DELETE', `${env.API_URL}/connections/${connection}`, null, headers); spinner.text = `Deleting ${i++} of ${ghostConnections.length} that aren't in the db...` } spinner.succeed(); } catch (err) { spinner.fail(); criticalError(err); } } /** * Perform a REST request * * @param {string} url * @param {string} method * @param {array} body * @param {array} header * * @return {array} **/ export async function vnRequest(method, url, data, headers) { for(let i = 0; i < env.MAX_REQUEST_RETRIES; i++) { try { if (['GET', 'DELETE'].includes(method)) return await axios({method, url, headers}); else return await axios({method, url, data, headers}); } catch (err) { switch (err.code) { case 'ECONNRESET': // Client network socket TLS case 'EAI_AGAIN': // getaddrinfo warning(err); await sleep(1000); break; case 'ECONNABORTED': case 'ECONNREFUSED': case 'ERR_BAD_REQUEST': switch (err.response.status) { case 504: case 502: warning(err); await sleep(1000); break; case 429: // Too Many Requests warning(err); await sleep(3400); // Stipulated by floryday break; case 401: // Unauthorized warning(err); await requestToken(true); headers.Authorization ? headers.Authorization = `Bearer ${await getCurrentToken()}` : criticalError(err); break; default: criticalError(err); } break; default: criticalError(err); } } } } /** * Critical error * * @param {err} **/ export async function criticalError(err) { console.log(chalk.red.bold(`[CRITICAL]`), chalk.red(`${err.message}`)); process.exit(); } /** * Warning * * @param {err} **/ export async function warning(err) { console.log(chalk.yellow.bold(`[WARNING]`), chalk.yellow(`${err.message}`)); }