refs #4550 Concurrency improved
gitea/printnatura/pipeline/head This commit looks good Details

This commit is contained in:
Juan Ferrer 2022-12-25 14:28:42 +01:00
parent a3c79aa9b9
commit 0a32ef9041
1 changed files with 39 additions and 35 deletions

View File

@ -18,6 +18,7 @@ class PrintServer {
if (fs.existsSync(localConfFile))
conf = Object.assign({}, conf, yml(localConfFile));
this.conf = conf;
this.jobs = [];
console.clear();
const decoration = '△▽'.repeat(10);
@ -33,7 +34,7 @@ class PrintServer {
process.on('unhandledRejection', this.rejectionHandler);
this.serverLog('log', 'Ready to print'.green);
setTimeout(() => this.poll());
await this.poll();
}
async stop() {
this.serverLog('log', 'Bye ( ◕ ‿ ◕ )っ'.green);
@ -102,8 +103,16 @@ class PrintServer {
console.error(err);
}
async poll() {
const conf = this.conf;
this.pollTimeout = null;
let delay = this.conf.refreshRate;
await this.getJobs();
if (this.dbDown) delay = this.conf.reconnectTimeout;
this.pollTimeout = setTimeout(() => this.poll(), delay);
}
async getJobs() {
if (this.polling) return;
this.polling = true;
const conf = this.conf;
if (this.dbDown) {
try {
@ -121,37 +130,27 @@ class PrintServer {
if (!this.dbDown) {
try {
let jobs;
let nJobs = 0;
do {
jobs = [];
for (let i = 0; i < conf.concurrency; i++) {
const jobId = await this.getJob();
if (jobId) {
const job = this.printJob(jobId);
// XXX: Workaround for Promise.all() unhandledRejection
// https://stackoverflow.com/questions/67789309/why-do-i-get-an-unhandled-promise-rejection-with-await-promise-all
job.catch(() => {});
jobs.push(job);
}
else
break;
}
nJobs += jobs.length;
await Promise.all(jobs);
} while (jobs.length);
while (this.jobs.length < conf.concurrency) {
const jobId = await this.getJob();
if (jobId) {
nJobs++;
this.jobs.push(jobId);
const jobP = this.printJob(jobId);
// XXX: Workaround for Promise unhandledRejection
// https://stackoverflow.com/questions/67789309/why-do-i-get-an-unhandled-promise-rejection-with-await-promise-all
jobP.catch(err => this.errorHandler(err));
} else
break;
}
if (nJobs > 0)
this.serverLog('debug', `${nJobs} jobs printed`);
this.serverLog('debug', `${nJobs} jobs buffered`);
} catch (err) {
this.errorHandler(err);
}
}
let delay = this.conf.refreshRate;
if (this.dbDown) delay = this.conf.reconnectTimeout;
this.pollTimeout = setTimeout(() => this.poll(), delay);
this.polling = false;
}
async getJob() {
let jobId;
@ -281,17 +280,22 @@ class PrintServer {
throw jobErr;
} finally {
conn.release();
if (!conf.keepFile) {
try {
const shouldDelete = tmpFileCreated
|| (tmpFilePath && await fs.pathExists(tmpFilePath));
if (shouldDelete) await fs.unlink(tmpFilePath);
} catch (err) {
this.jobLog(jobId, 'error', err.message);
}
}
const index = this.jobs.indexOf(jobId);
if (index !== -1) this.jobs.splice(index, 1);
setTimeout(() => this.getJobs());
}
if (!conf.keepFile) {
try {
const shouldDelete = tmpFileCreated
|| (tmpFilePath && await fs.pathExists(tmpFilePath));
if (shouldDelete) await fs.unlink(tmpFilePath);
} catch (err) {
this.jobLog(jobId, 'error', err.message);
}
}
}
jobLog(jobId, realm, message) {
this.log(`Job[${colors.yellow(jobId)}]`, realm, message);