import moment from 'moment'; import fetch from 'node-fetch'; import { models } from './models/index.js'; import { v4 as uuidv4 } from 'uuid'; import chalk from 'chalk'; import ora from 'ora'; // The Endpoint where the Token is requested const tokenEndpoint = 'https://idm.staging.floriday.io/oauth2/ausmw6b47z1BnlHkw0h7/v1/token'; // URL of API const url = 'https://api.staging.floriday.io/customers-api/2022v2'; /** * Gets the Access Token from the client config table * * @param {sequelize.models} models * @returns {Date} tokenExpirationDate formated as YYYY-MM-DD HH:mm:ss */ export async function requestToken() { const clientConfigData = await models.clientConfig.findOne(); if (!clientConfigData) throw new Error('No data found in the client config table') const spinner = ora(`Requesting new token...`).start(); const tokenExpirationDate = clientConfigData.tokenExpiration; if (clientConfigData.tokenExpiration == null || moment().isAfter(tokenExpirationDate)) { let clientId = clientConfigData.clientId; let clientSecret = clientConfigData.clientSecret; const data = { grant_type: 'client_credentials', client_id: clientId, client_secret: clientSecret, scope: 'role:app catalog:read supply:read organization:read network:write network:read' }; const body = Object.keys(data) .map(key => `${encodeURIComponent(key)}=${encodeURIComponent(data[key])}`) .join('&'); const response = await fetch(tokenEndpoint, { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded', }, body, }); const responseData = await response.json(); if (response.ok) { spinner.succeed(); } else { spinner.fail(); criticalError(new Error(`Token request failed with status: ${response.status} - ${response.statusText}`)); } let tokenExpirationDate = moment() .add(responseData.expires_in, 's') .format('YYYY-MM-DD HH:mm:ss'); await updateClientConfig( clientId, clientSecret, responseData.access_token, tokenExpirationDate ); return tokenExpirationDate; } else { spinner.text = 'Using stored token...' spinner.succeed(); return tokenExpirationDate; } } /** * Returns the current token * * @returns {string} */ export async function getCurrentToken() { let data = await models.clientConfig.findOne(); return data.currentToken } /** * Returns the expiration data of current token * * @returns {string} */ export async function getCurrentTokenExpiration() { let data = await models.clientConfig.findOne(); return data.tokenExpiration } /** * Updates the Access Token in the client config table * * @param {sequelize.models} models * @param {String} clientId * @param {String} clientSecret * @param {String} accessToken * @param {String} tokenExpirationDate */ export async function updateClientConfig(clientId, clientSecret, accessToken, tokenExpirationDate) { try { const spinner = ora(`Updating token...`).start(); await models.clientConfig.upsert({ id: 1, clientId: clientId, clientSecret: clientSecret, currentToken: accessToken, tokenExpiration: tokenExpirationDate, requestLimit: 500, }); spinner.succeed(); } catch (error) { spinner.fail(); console.log('There was a error while updating the client config'); console.log(error); } } /** * pauses the execution of the script for the given amount of milliseconds * * @param {integer} ms * @returns A promise that resolves after ms milliseconds. */ export async function sleep(ms) { return new Promise((resolve) => { setTimeout(resolve, ms); }); } /** * Recieves an array of functions and executes them in a queue with the given concurrency * * @param {Array} fnArray * @param {Number} concurrency * @returns */ export async function asyncQueue(fnArray, concurrency = 1) { const results = []; // 1 const queue = fnArray.map((fn, index) => () => fn().then((result) => results[index] = result)); const run = async () => { // 2 const fn = queue.shift(); if (fn) { await fn(); await run(); } }; const promises = []; // 3 while (concurrency--) { // 4 promises.push(run()); } await Promise.all(promises); // 5 return results; } // 1. Create an array of functions that will be executed in a queue // 2. Create a function that will execute the functions in the queue // 3. Create an array of promises that will execute the run function // 4. Execute the run function while the concurrency is greater than 0 // 5. Return the results /** * Syncs the sequence number for the given model * if no params are given it will reset all the sequence number to 0 * * @param {Number} current - current sequence number * @param {String} model - model name * @param {Number} maximumSequenceNumber - maximum sequence number * @returns */ export async function syncSequence(current = 0, model = null , maximumSequenceNumber = 0){ if (model == null && current == 0){ try { let mockModels = [ 'suppliers', 'tradeItems', 'supplyLines', ]; const spinner = ora(`Syncing sequence...`).start(); for (let mockModel in mockModels) { const element = mockModels[mockModel]; await syncSequence(0, element); } spinner.succeed(); } catch (err) { spinner.fail(); throw new Error(err); } } else if (current) { let tx = await models.sequelize.transaction(); try { let sequence = await models.sequenceNumber.findOrCreate({ where: { model: model }, defaults: { model: model, sequenceNumber: current, maximumSequenceNumber: maximumSequenceNumber }, transaction: tx }); if (sequence[1] == false){ await models.sequenceNumber.update({ sequenceNumber: current, maximumSequenceNumber: maximumSequenceNumber }, { where: { model: model }, transaction: tx }); } await tx.commit(); } catch (error) { await tx.rollback(); console.log(`Error while syncing sequence number for: ${model}: ${error}`); } } } export async function syncSuppliers(){ let spinner = ora('Preparing to load suppliers...').start(); try { let headers = { 'Content-Type': 'application/json', 'Authorization': `Bearer ${await getCurrentToken()}`, 'X-Api-Key': process.env.API_KEY }; const response = await fetch(`${url}/organizations/current-max-sequence`, { method: 'GET', headers }); if (!response.ok) { spinner.fail(); criticalError(new Error(`Max sequence request failed with status: ${response.status} - ${response.statusText}`)); } const maxSequenceNumber = await response.json(); let timeFinish, timeToGoSec, timeToGoMin, timeLeft; for (let curSequenceNumber = 0; curSequenceNumber <= maxSequenceNumber; curSequenceNumber++) { let timeStart = new moment(); let query = `${url}/organizations/sync/${curSequenceNumber}?organizationType=SUPPLIER`; let response = await fetch(query, { method: 'GET', headers }); let data = await response.json(); let suppliers = data.results; for (let supplier of suppliers) { curSequenceNumber = supplier.sequenceNumber; spinner.text = `Syncing suppliers, ${maxSequenceNumber - curSequenceNumber} are missing` if (timeFinish) spinner.text = spinner.text + ` (${timeLeft})` await models.supplier.upsert({ sequenceNumber: supplier.sequenceNumber, companyGln: supplier.companyGln, name: supplier.name, commercialName: supplier.commercialName, email: supplier.email, phone: supplier.phone, website: supplier.website, organizationId: supplier.organizationId, rfhRelationId: supplier.rfhRelationId, paymentProviders: `${supplier.paymentProviders}`, endDate: supplier.endDate, mailingAddress: supplier.mailingAddress, physicalAddress: supplier.physicalAddress, pythosanitaryNumber: supplier.pythosanitaryNumber, organizationType: supplier.organizationType }); } await syncSequence(curSequenceNumber, 'suppliers', maxSequenceNumber); timeFinish = new moment(); timeToGoSec = (timeFinish.diff(timeStart, 'seconds') * (maxSequenceNumber - curSequenceNumber) / 1000) timeToGoMin = Math.trunc(timeToGoSec / 60) if (!timeToGoMin) timeLeft = `${Math.trunc(timeToGoSec)} sec` else timeLeft = `${timeToGoMin} min` } spinner.succeed() } catch (err) { spinner.fail(); throw new Error(err); } } export async function syncConn(){ const spinner = ora(`Syncing connections...`).start(); try { let connections = await models.connection.findAll(); let headers = { 'Content-Type': 'application/json', 'Authorization': `Bearer ${await getCurrentToken()}`, 'X-Api-Key': process.env.API_KEY }; let remoteConnections = await fetch(`${url}/connections`, { method: 'GET', headers }); remoteConnections = await remoteConnections.json(); let i = 1; for (let connection of connections){ spinner.text = `Syncing ${i++} connections...` if (connection.isConnected == false){ continue; } let remoteConnection = remoteConnections.find(remoteConnection => remoteConnection == connection.organizationId); if (remoteConnection == undefined){ console.log('Connection: ', connection, 'does not exist in the remote server'); console.log('Creating remote connection'); await fetch(`${url}/connections/${connection.organizationId}`, { method: 'PUT', headers }); await models.connection.update({ isConnected: true }, { where: { organizationId: connection.organizationId } }); await models.supplier.update({ isConnected: true }, { where: { organizationId: connection.organizationId } }); } else { await models.connection.update({ isConnected: true }, { where: { organizationId: connection.organizationId } }); await models.supplier.update({ isConnected: true }, { where: { organizationId: connection.organizationId } }); } } spinner.succeed(); } catch (err) { spinner.fail(); throw new Error(err); } } export async function syncTradeItems(){ const spinner = ora(`Syncing trade items...`).start(); const suppliers = await models.supplier.findAll(); let i = 0; for (let supplier of suppliers) { i++; if (!supplier.isConnected) continue; let query = `${url}/trade-items?supplierOrganizationId=${supplier.organizationId}`; let headers = { 'Content-Type': 'application/json', 'Authorization': `Bearer ${await getCurrentToken()}`, 'X-Api-Key': process.env.API_KEY }; try { let request = await fetch(query, { method: 'GET', headers }); let tradeItems = await request.json(); if (!tradeItems.length) { continue; } for (let tradeItem of tradeItems) { await insertItem(tradeItem, supplier); } console.log('Synced trade items for: ', supplier.commercialName); console.log('Remaining suppliers: ', suppliers.length - i, 'of', suppliers.length); console.log('Total trade items: ', tradeItems.length); } catch (error) { spinner.fail(); console.log('Error while syncing trade items for: ', supplier.commercialName); console.log(error); } } spinner.succeed() } /** * Syncs the supply lines for suppliers that are connected * to do this, it fetches all the supply lines for every tradeitem of the suppliers */ export async function syncSupplyLines(){ try { let currentSequenceNumber = await models.sequenceNumber.findOne({ // TODO: Mirar como manejar este error where: { model: 'supplyLines' } }); const spinner = ora(`Syncing trade items...`).start(); let suppliers = await models.supplier.findAll({ where: { isConnected: true } }); let tradeItems = await models.tradeItem.findAll({ where: { supplierOrganizationId: suppliers.map(supplier => supplier.organizationId) } }); let promises = []; let headers = { 'Content-Type': 'application/json', 'Authorization': `Bearer ${await getCurrentToken()}`, 'X-Api-Key': process.env.API_KEY }; // Launch a promise for each supplier for (let tradeItem of tradeItems) { let supplier = suppliers.find(supplier => supplier.organizationId == tradeItem.supplierOrganizationId); console.log('Requesting supply lines for ' + supplier.commercialName + ' - ' + tradeItem.name); // eslint-disable-next-line no-async-promise-executor let promise = new Promise(async (resolve) => { try { let url = `${url}/supply-lines/sync/0` const params = new URLSearchParams({ supplierOrganizationId: supplier.organizationId, tradeItemId: tradeItem.tradeItemId, postFilterSelectedTradeItems: false }); url.search = params; let request = await fetch(url.toString(), { method: 'GET', headers }); let supplyLines = await request.json(); if (supplyLines.length == 0) { console.log('No supply lines for supplier: ', supplier.commercialName, ' - ' , tradeItem.name); resolve([]); return; } resolve(supplyLines); } catch (error) { console.log('Error while syncing supply lines for: ', supplier.commercialName, ' - ' , tradeItem.name); console.log(error); resolve([]); } }); promises.push(promise); } let supplyLines = await Promise.all(promises); let maximumSequenceNumber; for (let supplyLine of supplyLines) { maximumSequenceNumber = supplyLine.maximumSequenceNumber; supplyLine = supplyLine.results; try { for (let line of supplyLine) { let tradeItem = await models.tradeItem.findOne({ where: { tradeItemId: line.tradeItemId } }); if (!tradeItem) { console.log('Trade item not found for supply line: ', line.supplyLineId); console.log('Requesting data for trade item id: ', line.tradeItemId); let urlTradeItem = `${url}/trade-items?tradeItemIds=${line.tradeItemId}`; let queryTradeItem = await fetch(urlTradeItem, { method: 'GET', headers }); let tradeItem = await queryTradeItem.json(); if (tradeItem.length == 0) { console.log('Trade item not found for supply line: ', line.supplyLineId); console.log('Trade item id: ', line.tradeItemId); continue; } let supplier = await models.supplier.findOne({ where: { organizationId: tradeItem[0].supplierOrganizationId } }); await insertItem(tradeItem[0], supplier); tradeItem = await models.tradeItem.findOne({ where: { tradeItemId: line.tradeItemId } }); if (!tradeItem) { console.log('Trade item not found for supply line: ', line.supplyLineId); console.log('Trade item id: ', line.tradeItemId); continue; } } await models.supplyLine.upsert({ supplyLineId: line.supplyLineId, status: line.status, supplierOrganizationId: line.supplierOrganizationId, pricePerPiece: line.pricePerPiece, numberOfPieces: line.numberOfPieces, deliveryPeriod: line.deliveryPeriod, orderPeriod: line.orderPeriod, wharehouseId: line.wharehouseId, sequenceNumber: line.sequenceNumber, type: line.type, isDeleted: line.isDeleted, salesUnit: line.salesUnit, agreementReferenceCode: line.agreementReference.code, agreementReferenceDescription: line.agreementReference.description, isLimited: line.isLimited, isCustomerSpecific: line.isCustomerSpecific, tradeItemFk: line.tradeItemId, }); } } catch (err) { spinner.fail(); console.log('Error while syncing supply lines - ' + supplyLine.results[0].tradeItemId); throw new Error(err); } } spinner.succeed(); console.log('Found', suppliers.length, 'connected suppliers'); await syncSequence(currentSequenceNumber,'supplyLines' ,maximumSequenceNumber); } catch (err) { throw new Error(err); } } export async function insertItem(tradeItem, supplier) { let tx = await models.sequelize.transaction(); console.log('Syncing trade item: ', tradeItem.name); try { // Temporal connection to all suppliers that have trade items let currentSupp = await models.supplier.findOne({ where: { organizationId: tradeItem.supplierOrganizationId } }); currentSupp.isConnected = true; await currentSupp.save({transaction: tx}); await models.connection.upsert({ organizationId: tradeItem.supplierOrganizationId, connect: true, }, {transaction: tx}); // ----- await models.tradeItem.upsert({ tradeItemId: tradeItem.tradeItemId, supplierOrganizationId: tradeItem.supplierOrganizationId, code: tradeItem.code, gtin: tradeItem.gtin, vbnProductCode: tradeItem.vbnProductCode, name: tradeItem.name, isDeleted: tradeItem.isDeleted, sequenceNumber: tradeItem.sequenceNumber, tradeItemVersion: tradeItem.tradeItemVersion, isCustomerSpecific: tradeItem.isCustomerSpecific, isHiddenInCatalog: tradeItem.isHiddenInCatalog, }, {transaction: tx}); let characteristics = tradeItem.characteristics; for (let characteristic of characteristics) { await models.characteristic.upsert({ vbnCode: characteristic.vbnCode, vbnValueCode: characteristic.vbnValueCode, tradeItemFk: tradeItem.tradeItemId, }, {transaction: tx}); } let seasonalPeriods = tradeItem.seasonalPeriods; for (let seasonalPeriod of seasonalPeriods) { await models.seasonalPeriod.upsert({ startWeek: seasonalPeriod.startWeek, endWeek: seasonalPeriod.endWeek, tradeItemFk: tradeItem.tradeItemId, }, {transaction: tx}); } let photos = tradeItem.photos; for (let photo of photos) { await models.photo.upsert({ id: photo.id, url: photo.url, type: photo.type, primary: photo.primary, tradeItemFk: tradeItem.tradeItemId, }, {transaction: tx}); } let packingConfigurations = tradeItem.packingConfigurations; for (let packingConfiguration of packingConfigurations) { let uuid = uuidv4(); await models.packingConfiguration.upsert({ packingConfigurationId: uuid, piecesPerPackage: packingConfiguration.piecesPerPackage, bunchesPerPackage: packingConfiguration.bunchesPerPackage, photoUrl: packingConfiguration.photoUrl, packagesPerLayer: packingConfiguration.packagesPerLayer, layersPerLoadCarrier: packingConfiguration.layersPerLoadCarrier, additionalPricePerPiece : JSON.stringify(packingConfiguration.additionalPricePerPiece), transportHeightInCm: packingConfiguration.transportHeightInCm, loadCarrierType: packingConfiguration.loadCarrierType, isPrimary: packingConfiguration.isPrimary, tradeItemFk: tradeItem.tradeItemId, }, {transaction: tx}); await models.package.upsert({ vbnPackageCode: packingConfiguration.package.vbnPackageCode, customPackageId: packingConfiguration.package.customPackageId, packingConfigurationFk: uuid, }, {transaction: tx}); } let countryOfOriginIsoCodes = tradeItem.countryOfOriginIsoCodes; countryOfOriginIsoCodes ??= 0; for (let countryOfOriginIsoCode of countryOfOriginIsoCodes) { await models.countryOfOriginIsoCode.upsert({ isoCode: countryOfOriginIsoCode, tradeItemFk: tradeItem.tradeItemId, }, {transaction: tx}); } let botanicalNames = tradeItem.botanicalNames; for (let botanicalName of botanicalNames) { await models.botanicalName.upsert({ name: botanicalName.name, tradeItemFk: tradeItem.tradeItemId, }, {transaction: tx}); } await tx.commit(); } catch (error) { await tx.rollback(); console.log('Error while syncing trade items for: ', supplier.commercialName); console.log(error); } } /** * Throw critical error * * @param {err} **/ export async function criticalError(err) { console.log(chalk.red.bold(`[ERROR]`), chalk.red(`${err.name}: ${err.message}`)); process.exit(); }