refs #4823 Used bulkCreate to insert all at once

This commit is contained in:
Guillermo Bonet 2023-06-14 12:22:34 +02:00
parent 6e2a304a69
commit 5a4ccb76fe
3 changed files with 263 additions and 123 deletions

View File

@ -1,13 +1,9 @@
import { checkCon, closeCon } from './models/sequelize.js';
import * as utils from './utils.js';
import moment from 'moment';
import chalk from 'chalk';
const env = process.env;
if (JSON.parse(env.TIME_STAMPS)) // Add time to all console.log
console.log = (...args) => console.info(chalk.gray(`[${new moment().format('YYYY-MM-DD hh:mm:ss A')}]`), ...args);
class Floriday {
async start() {
try {
@ -72,7 +68,7 @@ if (JSON.parse(env.TIME_STAMPS)) // Add time to all console.log
try {
this.stopSchedule = false;
await closeCon();
console.warn(chalk.dim('Bye, come back soon 👋'))
console.warn(chalk.dim('\nBye, come back soon 👋'))
} catch (err) {
utils.criticalError(err);
}

View File

@ -1,12 +1,12 @@
import { Sequelize } from 'sequelize';
import dotenv from 'dotenv';
import chalk from 'chalk';
import fs from 'fs';
import * as utils from '../utils.js';
dotenv.config();
const env = process.env;
let sequelize;
console.clear()
console.log(chalk.hex('#06c581')(
`
@ -86,6 +86,7 @@ try {
onDelete: 'CASCADE',
onUpdate: 'CASCADE',
*/
/*
models.characteristic.belongsTo(models.tradeItem, {
foreignKey: 'tradeItemId',
targetKey: 'tradeItemId',
@ -142,7 +143,6 @@ try {
foreignKey: 'supplyLineId',
targetKey: 'supplyLineId',
});
/*
models.warehouse.belongsTo(models.organization, {
foreignKey: 'organizationId',
targetKey: 'organizationId',
@ -158,7 +158,8 @@ try {
models.clockPresaleSupply.belongsTo(models.organization, {
foreignKey: 'organizationId',
targetKey: 'organizationId',
});*/
});
*/
} catch (err) {
utils.criticalError(err);
}
@ -169,11 +170,13 @@ try {
await utils.startSpin(`${actionMsg} models...`, true);
await sequelize.sync(action);
/*
// Create views
sequelize.query(fs.readFileSync('routines/views/supplyOffer.sql', 'utf-8'));
// Create procedures
sequelize.query(fs.readFileSync('routines/procedures/offerRefresh.sql', 'utf-8'));
*/
await utils.okSpin();
}
@ -216,7 +219,8 @@ async function checkCon() {
* Close the connection to the database
*/
async function closeCon() {
utils.startSpin('Closing database connection...', true);
await utils.failSpin(null, true);
await utils.startSpin('Closing database connection...', true);
try {
await sequelize.close()
await utils.okSpin();

342
utils.js
View File

@ -131,7 +131,7 @@ export async function sleep(ms) {
*/
export async function syncModel(model) {
await startSpin(`Syncing ${model}...`, true);
let i = 1;
let i = 0;
try {
const dbSeqNum = await models.sequenceNumber.findOne({ where: { model } })
let curSeqNum = dbSeqNum?.maxSequenceNumber ?? 0;
@ -169,40 +169,43 @@ export async function syncModel(model) {
params = new URLSearchParams({ organizationType: 'SUPPLIER'} ).toString();
else if (model === 'supplyLine' )
params = new URLSearchParams({ postFilterSelectedTradeItems: false }).toString();
else if (model === 'tradeItem')
params = new URLSearchParams({
postFilterSelectedTradeItems: false,
postFilterSelectedTradeItemPackingConfigurations: false,
}).toString();
const res = (await vnRequest('GET', `${syncUrl}${curSeqNum}${params ? `?${params}` : ''}`)).data;
curSeqNum = res.maximumSequenceNumber;
const objects = res.results;
misSeqNum = maxSeqNum - curSeqNum;
txtSpin(`Syncing ${i - 1} ${model}, ${misSeqNum} missing...`);
for (let object of objects) {
txtSpin(`Syncing ${i} ${model}, ${misSeqNum} missing...`);
switch (model) {
case 'organization':
await insertOrganization(object);
await insertOrganizations(objects);
break;
case 'warehouse':
await insertWarehouse(object);
await insertWarehouses(objects);
break;
case 'tradeItem':
await insertTradeItem(object);
await insertTradeItems(objects);
break;
case 'supplyLine':
await insertSupplyLine(object);
await insertSupplyLines(objects);
break;
case 'clockPresaleSupply':
await insertClockPresalesSupply(object);
await insertClockPresalesSupply(objects);
break;
default:
throw new Error('Unsupported model');
}
txtSpin(`Syncing ${i++} ${model}, ${misSeqNum} missing...`);
};
txtSpin(`Syncing ${i = i + objects.length} ${model}, ${misSeqNum} missing...`);
await insertSequenceNumber(model, curSeqNum);
}
await insertSequenceNumber(model, maxSeqNum);
txtSpin((i != 1)
txtSpin((i)
? `Syncing ${i} ${model}...`
: `Syncing ${model}...`);
await okSpin(null, true);
@ -279,90 +282,148 @@ export async function insertSequenceNumber(model, sequenceNumber) {
};
/**
* Insert trade item and dependences in the database.
* Insert trade items and dependencies in the database.
*
* @param {Array} tradeItem An array containing the tradeItem data to be inserted
* @param {Array} tradeItems An array containing the trade item data to be inserted
*/
export async function insertTradeItem(tradeItem) {
export async function insertTradeItems(tradeItems) {
const tx = await models.sequelize.transaction();
try {
// Upsert trade item
await models.tradeItem.upsert({
const tradeItemsData = tradeItems.map((tradeItem) => ({
...tradeItem,
organizationId: tradeItem.supplierOrganizationId,
lastSync: moment(),
}, { transaction: tx });
lastSync: moment().format('YYYY-MM-DD HH:mm:ss'),
}));
// Upsert characteristics
if (tradeItem.characteristics)
if (tradeItem.characteristics.length)
for (const characteristic of tradeItem.characteristics) {
await models.characteristic.upsert({
await models.tradeItem.bulkCreate(tradeItemsData, {
updateOnDuplicate: [
'tradeItemId',
'code',
'gtin',
'vbnProductCode',
'name',
'isDeleted',
'sequenceNumber',
'tradeItemVersion',
'isCustomerSpecific',
'isHiddenInCatalog',
'organizationId',
],
transaction: tx,
});
const characteristics = [];
const seasonalPeriods = [];
const photos = [];
const packingConfigurations = [];
const countryOfOriginIsoCodes = [];
const botanicalNames = [];
for (const tradeItem of tradeItemsData) {
if (tradeItem.characteristics?.length)
for (const characteristic of tradeItem.characteristics)
characteristics.push({
tradeItemId: tradeItem.tradeItemId,
...characteristic,
}, { transaction: tx });
}
// Upsert seasonal periods
if (tradeItem.seasonalPeriods)
if (tradeItem.seasonalPeriods.length)
for (const seasonalPeriod of tradeItem.seasonalPeriods) {
await models.seasonalPeriod.upsert({
});
if (tradeItem.seasonalPeriods?.length)
for (const seasonalPeriod of tradeItem.seasonalPeriods)
seasonalPeriods.push({
tradeItemId: tradeItem.tradeItemId,
...seasonalPeriod,
}, { transaction: tx });
}
});
// Upsert photos
if (tradeItem.photos)
if (tradeItem.photos.length)
for (const photo of tradeItem.photos) {
await models.photo.upsert({
if (tradeItem.photos?.length)
for (const photo of tradeItem.photos)
photos.push({
...photo,
tradeItemId: tradeItem.tradeItemId,
}, { transaction: tx });
}
});
// Upsert packing configurations
if (tradeItem.packingConfigurations)
if (tradeItem.packingConfigurations.length)
if (tradeItem.packingConfigurations?.length) {
for (const packingConfiguration of tradeItem.packingConfigurations) {
const uuid = uuidv4();
await models.packingConfiguration.upsert({
packingConfigurations.push({
packingConfigurationId: uuid,
...packingConfiguration,
additionalPricePerPiece_currency: packingConfiguration.additionalPricePerPiece.currency,
additionalPricePerPiece_value: packingConfiguration.additionalPricePerPiece.value,
tradeItemId: tradeItem.tradeItemId,
}, { transaction: tx });
});
await models.package.upsert({
models.package.upsert({
...packingConfiguration.package,
packingConfigurationId: uuid,
}, { transaction: tx });
}
// Upsert country of origin ISO codes
if (tradeItem.countryOfOriginIsoCodes)
if (tradeItem.countryOfOriginIsoCodes.length)
for (const isoCode of tradeItem.countryOfOriginIsoCodes || []) {
await models.countryOfOriginIsoCode.upsert({
isoCode,
tradeItemId: tradeItem.tradeItemId,
}, { transaction: tx });
}
// Upsert botanical names
if (tradeItem.botanicalNames)
if (tradeItem.botanicalNames.length)
for (const botanicalName of tradeItem.botanicalNames) {
await models.botanicalName.upsert({
if (tradeItem.countryOfOriginIsoCodes?.length)
for (const isoCode of tradeItem.countryOfOriginIsoCodes)
countryOfOriginIsoCodes.push({
isoCode,
tradeItemId: tradeItem.tradeItemId,
});
if (tradeItem.botanicalNames?.length)
for (const botanicalName of tradeItem.botanicalNames)
botanicalNames.push({
botanicalNameId: uuidv4(),
name: botanicalName,
tradeItemId: tradeItem.tradeItemId,
}, { transaction: tx });
});
}
if (characteristics?.length)
await models.characteristic.bulkCreate(characteristics, {
updateOnDuplicate: ['tradeItemId', 'vbnCode', 'vbnValueCode'],
transaction: tx,
});
if (seasonalPeriods?.length)
await models.seasonalPeriod.bulkCreate(seasonalPeriods, {
updateOnDuplicate: ['tradeItemId', 'startWeek', 'endWeek'],
transaction: tx,
});
if (photos?.length)
await models.photo.bulkCreate(photos, {
updateOnDuplicate: ['tradeItemId', 'url', 'type', 'primary'],
transaction: tx,
});
if (packingConfigurations?.length)
await models.packingConfiguration.bulkCreate(packingConfigurations, {
updateOnDuplicate: [
'packingConfigurationId',
'piecesPerPackage',
'bunchesPerPackage',
'photoUrl',
'packagesPerLayer',
'layersPerLoadCarrier',
'additionalPricePerPiece_currency',
'additionalPricePerPiece_value',
'transportHeightInCm',
'loadCarrierType',
'isPrimary',
],
transaction: tx,
});
if (countryOfOriginIsoCodes?.length)
await models.countryOfOriginIsoCode.bulkCreate(countryOfOriginIsoCodes, {
updateOnDuplicate: ['tradeItemId', 'isoCode'],
transaction: tx,
});
if (botanicalNames?.length)
await models.botanicalName.bulkCreate(botanicalNames, {
updateOnDuplicate: ['botanicalNameId', 'name', 'tradeItemId'],
transaction: tx,
});
await tx.commit();
} catch (err) {
await tx.rollback();
@ -373,17 +434,40 @@ export async function insertTradeItem(tradeItem) {
/**
* Insert clock presales supply in the database.
*
* @param {Array} clockPresaleSupply An array containing the clockPresaleSupply data to be inserted
* @param {Array} clockPresalesSupply An array containing the clockPresaleSupplies data to be inserted
*/
export async function insertClockPresalesSupply(clockPresaleSupply) {
export async function insertClockPresalesSupply(clockPresalesSupply) {
const tx = await models.sequelize.transaction();
try {
await models.clockPresaleSupply.upsert({
const clockPresalesSupplyWithDefaults = clockPresalesSupply.map((clockPresaleSupply) => ({
...clockPresaleSupply,
pricePerPiece_currency: clockPresaleSupply.pricePerPiece.currency,
pricePerPiece_value: clockPresaleSupply.pricePerPiece.value,
organizationId: clockPresaleSupply.supplierOrganizationId,
}, { transaction: tx });
}));
await models.clockPresaleSupply.bulkCreate(clockPresalesSupplyWithDefaults, {
updateOnDuplicate: [
'supplyLineId',
'status',
'tradeItemId',
'pricePerPiece_currency',
'pricePerPiece_value',
'deliveryNoteReference',
'numberOfPieces',
'packingConfigurations',
'tradePeriod_startDateTime',
'tradePeriod_endDateTime',
'organizationId',
'tradeInstrument',
'salesChannel',
'sequenceNumber',
'creationDateTime',
'lastModifiedDateTime',
],
transaction: tx,
});
await tx.commit();
} catch (err) {
await tx.rollback();
@ -392,14 +476,14 @@ export async function insertClockPresalesSupply(clockPresaleSupply) {
};
/**
* Insert warehouse in the database.
* Insert warehouses in the database.
*
* @param {Array} warehouse An array containing the warehouse data to be inserted
* @param {Array} warehouses An array containing the warehouses data to be inserted
*/
export async function insertWarehouse(warehouse) {
export async function insertWarehouses(warehouses) {
const tx = await models.sequelize.transaction();
try {
await models.warehouse.upsert({
const warehousesWithDefaults = warehouses.map((warehouse) => ({
...warehouse,
location_gln: warehouse.location.gln,
location_address_addressLine: warehouse.location.address.addressLine,
@ -407,8 +491,14 @@ export async function insertWarehouse(warehouse) {
location_address_countryCode: warehouse.location.address.countryCode,
location_address_postalCode: warehouse.location.address.postalCode,
location_address_stateOrProvince: warehouse.location.address.stateOrProvince,
lastSync: moment(),
}, { transaction: tx });
lastSync: moment().format('YYYY-MM-DD HH:mm:ss'),
}));
await models.warehouse.bulkCreate(warehousesWithDefaults, {
updateOnDuplicate: ['location_gln', 'location_address_addressLine', 'location_address_city', 'location_address_countryCode', 'location_address_postalCode', 'location_address_stateOrProvince', 'lastSync'],
transaction: tx,
});
await tx.commit();
} catch (err) {
await tx.rollback();
@ -417,18 +507,24 @@ export async function insertWarehouse(warehouse) {
};
/**
* Insert organization in the database.
* Insert organizations in the database.
*
* @param {Array} organization An array containing the organization data to be inserted
* @param {Array} organizations An array containing the organizations data to be inserted
*/
export async function insertOrganization(organization) {
export async function insertOrganizations(organizations) {
const tx = await models.sequelize.transaction();
try {
await models.organization.upsert({
const organizationsWithDefaults = organizations.map((organization) => ({
...organization,
isConnected: JSON.parse(env.ORGS_ALWAYS_CONN),
lastSync: moment(),
}, { transaction: tx });
lastSync: moment().format('YYYY-MM-DD HH:mm:ss'),
}));
await models.organization.bulkCreate(organizationsWithDefaults, {
updateOnDuplicate: ['isConnected', 'lastSync'],
transaction: tx,
});
await tx.commit();
} catch (err) {
await tx.rollback();
@ -436,14 +532,8 @@ export async function insertOrganization(organization) {
}
};
/**
* Insert supply line and dependences in the database.
*
* @param {Array} supplyLine An array containing the supply line data to be inserted
*/
export async function insertSupplyLine(supplyLine) {
const tx = await models.sequelize.transaction();
try {
/* Checkear dependecias supply line
// Check if the warehouse exists, and if it doesn't, create it
let warehouse = await models.warehouse.findOne({
where: { warehouseId: supplyLine.warehouseId }
@ -472,7 +562,17 @@ export async function insertSupplyLine(supplyLine) {
await insertTradeItem(tradeItem);
}
await models.supplyLine.upsert({
*/
/**
* Insert supply lines and dependencies in the database.
*
* @param {Array} supplyLines An array containing the supply line data to be inserted
*/
export async function insertSupplyLines(supplyLines) {
const tx = await models.sequelize.transaction();
try {
const supplyLinesData = supplyLines.map((supplyLine) => ({
...supplyLine,
organizationId: supplyLine.supplierOrganizationId,
deliveryPeriodStartDateTime: supplyLine.deliveryPeriod?.startDateTime ?? null,
@ -481,13 +581,18 @@ export async function insertSupplyLine(supplyLine) {
orderPeriodEndDateTime: supplyLine.orderPeriod?.endDateTime ?? null,
agreementReference_code: supplyLine.agreementReference?.code ?? null,
agreementReference_description: supplyLine.agreementReference?.description ?? null,
lastSync: moment(),
}, { transaction: tx });
lastSync: moment().format('YYYY-MM-DD HH:mm:ss'),
}));
// Upsert packing configurations
if (supplyLine.packingConfigurations.length)
await models.supplyLine.bulkCreate(supplyLinesData, { transaction: tx });
const packingConfigurations = [];
const volumePrices = [];
for (const supplyLine of supplyLinesData)
if (supplyLine.packingConfigurations?.length) {
for (const packingConfiguration of supplyLine.packingConfigurations)
await models.supplyLinePackingConfiguration.upsert({
packingConfigurations.push({
packingConfigurationId: uuidv4(),
...packingConfiguration,
packageVbnPackageCode: packingConfiguration.package.vbnPackageCode,
@ -495,14 +600,41 @@ export async function insertSupplyLine(supplyLine) {
additionalPricePerPieceCurrency: packingConfiguration.additionalPricePerPiece.currency,
additionalPricePerPieceValue: packingConfiguration.additionalPricePerPiece.value,
supplyLineId: supplyLine.supplyLineId,
}, { transaction: tx });
});
// Upsert volume price
for (let volumePrice of supplyLine.volumePrices)
await models.volumePrice.upsert({
if (supplyLine.volumePrices?.length)
for (const volumePrice of supplyLine.volumePrices)
volumePrices.push({
supplyLineId: supplyLine.supplyLineId,
...volumePrice,
}, { transaction: tx });
});
}
if (packingConfigurations.length)
await models.supplyLinePackingConfiguration.bulkCreate(packingConfigurations, {
updateOnDuplicate: [
'packingConfigurationId',
'packageVbnPackageCode',
'packageCustomPackageId',
'piecesPerPackage',
'bunchesPerPackage',
'photoUrl',
'packagesPerLayer',
'layersPerLoadCarrier',
'transportHeightInCm',
'loadCarrierType',
'additionalPricePerPieceCurrency',
'additionalPricePerPieceValue',
'isPrimary'
],
transaction: tx,
});
if (volumePrices.length)
await models.volumePrice.bulkCreate(volumePrices, {
updateOnDuplicate: ['supplyLineId', 'unit', 'pricePerPiece'],
transaction: tx,
});
await tx.commit();
} catch (err) {
@ -630,9 +762,15 @@ export async function startSpin(msg, isNew) {
if (JSON.parse(env.TIME_STAMPS) && msg)
msg = `${chalk.gray(`[${new moment().format('YYYY-MM-DD hh:mm:ss A')}]`)} ${msg}`;
(isNew)
? spinner = ora(msg).start()
: spinner.start();
(!isNew)
? spinner.start()
: spinner = ora({
text: msg,
indent: 1,
spinner: 'arc',
interval: 40,
color: 'white',
}).start();
};
/**
@ -696,7 +834,7 @@ export async function failSpin(err, clear) {
if (clear)
spinner.clear();
}
throw err;
if (err) throw err;
};
/**
@ -718,7 +856,8 @@ export async function criticalSpin(err) {
* @param {Error} err Error object
**/
export async function criticalError(err) {
console.log(chalk.red.bold(`[CRITICAL]`), chalk.red(err.message));
const msg = `${chalk.red.bold(' └─────')} ${chalk.red.bold('[CRITICAL]')}`;
console.log(`${msg} ${chalk.red(err.message)}`);
process.exit();
};
@ -728,5 +867,6 @@ export async function criticalError(err) {
* @param {Error} err
**/
export async function warning(err) {
console.log(chalk.yellow.bold(`[WARNING]`), (err.response?.status && err.response?.data?.message) ?? chalk.yellow(err.message));
const msg = `${chalk.yellow.bold(' └─────')} ${chalk.yellow.bold('[WARNING]')}`;
console.log(`${msg} ${chalk.yellow((err.response?.status && err.response?.data?.message) ?? err.message)}`);
};