const Myt = require('./myt'); const Command = require('./lib/command'); const fs = require('fs-extra'); const nodegit = require('nodegit'); const ExporterEngine = require('./lib/exporter-engine'); const connExt = require('./lib/conn'); const repoExt = require('./lib/repo'); const SqlString = require('sqlstring'); /** * Pushes changes to remote. */ class Push extends Command { static usage = { description: 'Apply changes into database', params: { force: 'Answer yes to all questions', commit: 'Wether to save the commit SHA into database', sums: 'Save SHA sums of pushed objects', triggers: 'Wether to exclude triggers, used to generate local DB' }, operand: 'remote' }; static opts = { alias: { force: 'f', commit: 'c', sums: 's', triggers: 't' }, boolean: [ 'force', 'commit', 'sums', 'triggers' ] }; static reporter = { applyingVersions: 'Applying versions.', applyingRoutines: 'Applying changed routines.', dbInfo: function(version) { console.log( `Database information:` + `\n -> Version: ${version.number}` + `\n -> Commit: ${version.gitCommit}` ); }, version(data, action) { let {version} = data; let name = data.dir; let num, color; switch(action) { case 'apply': num = version.number; name = version.name; color = 'cyan'; break; case 'badVersion': num = '?????'; color = 'yellow'; break; case 'wrongDirectory': num = '*****'; color = 'gray'; break; } console.log('', `[${num[color].bold}]`, name); }, logScript(script, action, error) { let actionMsg; switch(action) { case 'apply': actionMsg = '[+]'.green; break; case 'ignore': actionMsg = '[I]'.blue; break; default: actionMsg = '[W]'.yellow; break; } console.log(' ', actionMsg.bold, script); }, change(status, ignore, change) { let statusMsg; switch(status) { case 'added': statusMsg = '[+]'.green; break; case 'deleted': statusMsg = '[-]'.red; break; case 'modified': statusMsg = '[ยท]'.yellow; break; } let actionMsg; if (ignore) actionMsg = '[I]'.blue; else actionMsg = '[A]'.green; const typeMsg = `[${change.type.abbr}]`[change.type.color]; console.log('', (statusMsg + actionMsg).bold, typeMsg.bold, change.fullName ); }, routinesApplied: function(nRoutines) { if (nRoutines > 0) { console.log(` -> ${nRoutines} routines have changed.`); } else { console.log(` -> No routines changed.`); } } }; async run(myt, opts) { const conn = await myt.dbConnect(); this.conn = conn; if (opts.remote == 'local') opts.commit = true; // Obtain exclusive lock const [[row]] = await conn.query( `SELECT GET_LOCK('myt_push', 30) getLock`); const pingTimeout = setInterval(async() => { try { await conn.ping() } catch (e) {} }, 60 * 1000); if (!row.getLock) { let isUsed = 0; if (row.getLock == 0) { const [[row]] = await conn.query( `SELECT IS_USED_LOCK('myt_push') isUsed`); isUsed = row.isUsed; } throw new Error(`Cannot obtain exclusive lock, used by connection ${isUsed}`); } async function releaseLock() { await conn.query(`DO RELEASE_LOCK('myt_push')`); } try { await this.push(myt, opts, conn); } catch(err) { try { await releaseLock(); } catch (e) {} throw err; } finally { clearInterval(pingTimeout); } await releaseLock(); } async cli(myt, opts) { // Prevent push to production by mistake if (opts.remote == 'production') { console.log( '\n ( ( ) ( ( ) ) ' + '\n )\\ ))\\ ) ( /( )\\ ) ( ))\\ ) ( /( ( /( ' + '\n(()/(()/( )\\()|()/( ( )\\ ) /(()/( )\\()) )\\())' + '\n /(_))(_)|(_)\\ /(_)) )\\ (((_) ( )(_))(_)|(_)\\ ((_)\\ ' + '\n(_))(_)) ((_|_))_ _ ((_))\\___(_(_()|__)) ((_) _((_)' + '\n| _ \\ _ \\ / _ \\| \\| | | ((/ __|_ _|_ _| / _ \\| \\| |' + '\n| _/ /| (_) | |) | |_| || (__ | | | | | (_) | . |' + '\n|_| |_|_\\ \\___/|___/ \\___/ \\___| |_| |___| \\___/|_|\\_|' + '\n' ); if (!opts.force) { const readline = require('readline'); const rl = readline.createInterface({ input: process.stdin, output: process.stdout }); const answer = await new Promise(resolve => { rl.question('Are you sure? (Default: no) [yes|no] ', resolve); }); rl.close(); if (answer !== 'yes') throw new Error('Changes aborted'); } } await super.cli(myt, opts); } async push(myt, opts, conn) { const pushConn = await myt.createConnection(); // Get database version const version = await myt.fetchDbVersion() || {}; this.emit('dbInfo', version); if (!version.number) version.number = String('0').padStart(opts.versionDigits, '0'); if (!/^[0-9]*$/.test(version.number)) throw new Error('Wrong database version'); // Apply versions this.emit('applyingVersions'); let nChanges = 0; let silent = true; const versionsDir = opts.versionsDir; function isUndoScript(script) { return /\.undo\.sql$/.test(script); } const skipFiles = new Set([ 'README.md', '.archive' ]); if (await fs.pathExists(versionsDir)) { const versionDirs = await fs.readdir(versionsDir); const [[row]] = await this.conn.query( `SELECT realm FROM versionConfig` ); const realm = row?.realm; for (const versionDir of versionDirs) { if (skipFiles.has(versionDir)) continue; const dirVersion = myt.parseVersionDir(versionDir); const versionData = { version: dirVersion, current: version }; if (!dirVersion) { this.emit('version', versionData, 'wrongDirectory'); continue; } const versionNumber = dirVersion.number; if (versionNumber.length != version.number.length) { this.emit('version', versionData, 'badVersion'); continue; } const scriptsDir = `${versionsDir}/${versionDir}`; const scripts = await fs.readdir(scriptsDir); const [versionLog] = await conn.query( `SELECT file FROM versionLog WHERE code = ? AND number = ? AND errorNumber IS NULL`, [opts.code, versionNumber] ); for (const script of scripts) if (!isUndoScript(script) && versionLog.findIndex(x => x.file == script) === -1) { silent = false; break; } if (silent) continue; this.emit('version', versionData, 'apply'); for (const script of scripts) { const match = script.match(/^[0-9]{2}-[a-zA-Z0-9_]+(?:\.(?!undo)([a-zA-Z0-9_]+))?(?:\.undo)?\.sql$/); if (!match) { this.emit('logScript', script, 'warn', 'wrongFile'); continue; } const skipRealm = match[1] && match[1] !== realm; if (isUndoScript(script) || skipRealm) continue; const [[row]] = await conn.query( `SELECT errorNumber FROM versionLog WHERE code = ? AND number = ? AND file = ?`, [ opts.code, versionNumber, script ] ); const apply = !row || row.errorNumber; this.emit('logScript', script, apply ? 'apply' : 'ignore'); if (!apply) continue; let err; try { await connExt.queryFromFile(pushConn, `${scriptsDir}/${script}`); } catch (e) { err = e; } await conn.query( `INSERT INTO versionLog SET code = ?, number = ?, file = ?, user = USER(), updated = NOW(), errorNumber = ?, errorMessage = ? ON DUPLICATE KEY UPDATE updated = VALUES(updated), user = VALUES(user), errorNumber = VALUES(errorNumber), errorMessage = VALUES(errorMessage)`, [ opts.code, versionNumber, script, err && err.errno, err && err.message ] ); if (err) throw err; nChanges++; } await this.updateVersion('number', versionNumber); } } // Apply routines this.emit('applyingRoutines'); let nRoutines = 0; const changes = await this.changedRoutines(version.gitCommit); const routines = []; for (const change of changes) if (change.isRoutine) routines.push([ change.schema, change.name, change.type.name ]); if (routines.length) { await conn.query( `DROP TEMPORARY TABLE IF EXISTS tProcsPriv` ); await conn.query( `CREATE TEMPORARY TABLE tProcsPriv ENGINE = MEMORY SELECT * FROM mysql.procs_priv WHERE (Db, Routine_name, Routine_type) IN (?)`, [routines] ); } const engine = new ExporterEngine(conn, opts); await engine.init(); async function finalize() { await engine.saveInfo(); if (routines.length) { await conn.query('FLUSH PRIVILEGES'); await conn.query(`DROP TEMPORARY TABLE tProcsPriv`); } } for (const change of changes) try { if (opts.triggers && change.type.name === 'TRIGGER') continue; const schema = change.schema; const name = change.name; const type = change.type.name.toLowerCase(); const fullPath = `${opts.routinesDir}/${change.path}.sql`; const exists = await fs.pathExists(fullPath); let newSql; if (exists) newSql = await fs.readFile(fullPath, 'utf8'); const oldSql = await engine.fetchRoutine(type, schema, name); const oldSum = engine.getShaSum(type, schema, name); const isMockFn = type == 'function' && schema == opts.versionSchema && opts.remote == 'local' && opts.mockDate && opts.mockFunctions && opts.mockFunctions.indexOf(name) !== -1; const ignore = newSql == oldSql || isMockFn; let status; if (exists && !oldSql) status = 'added'; else if (!exists) status = 'deleted'; else status = 'modified'; this.emit('change', status, ignore, change); if (!ignore) { const scapedSchema = SqlString.escapeId(schema, true); if (exists) { if (change.type.name === 'VIEW') await pushConn.query(`USE ${scapedSchema}`); await connExt.multiQuery(pushConn, newSql); if (change.isRoutine) { await conn.query( `INSERT IGNORE INTO mysql.procs_priv SELECT * FROM tProcsPriv WHERE Db = ? AND Routine_name = ? AND Routine_type = ?`, [schema, name, change.type.name] ); } if (opts.sums || oldSum || (opts.sumViews && type === 'view')) await engine.fetchShaSum(type, schema, name); } else { const escapedName = scapedSchema + '.' + SqlString.escapeId(name, true); const query = `DROP ${change.type.name} IF EXISTS ${escapedName}`; await pushConn.query(query); engine.deleteShaSum(type, schema, name); } nRoutines++; } } catch (err) { try { await finalize(); } catch (e) { console.error(e); } throw err; } await finalize(); this.emit('routinesApplied', nRoutines); const gitExists = await fs.pathExists(`${opts.workspace}/.git`); if (gitExists && opts.commit) { const repo = await nodegit.Repository.open(this.opts.workspace); const head = await repo.getHeadCommit(); if (head && version.gitCommit !== head.sha()) await this.updateVersion('gitCommit', head.sha()); } // End await pushConn.end(); } async updateVersion(column, value) { column = SqlString.escapeId(column, true); await this.conn.query( `INSERT INTO version SET code = ?, ${column} = ?, updated = NOW() ON DUPLICATE KEY UPDATE ${column} = VALUES(${column}), updated = VALUES(updated)`, [ this.opts.code, value ] ); } async changedRoutines(commitSha) { const repo = await this.myt.openRepo(); const changes = []; const changesMap = new Map(); async function pushChanges(diff) { if (!diff) return; const patches = await diff.patches(); for (const patch of patches) { const path = patch.newFile().path(); const match = path.match(/^routines\/(.+)\.sql$/); if (!match) continue; let change = changesMap.get(match[1]); if (!change) { change = {path: match[1]}; changes.push(change); changesMap.set(match[1], change); } change.mark = patch.isDeleted() ? '-' : '+'; } } const head = await repo.getHeadCommit(); if (head && commitSha) { let commit; let notFound; try { commit = await repo.getCommit(commitSha); notFound = false; } catch (err) { if (err.errorFunction == 'Commit.lookup') notFound = true; else throw err; } if (notFound) { console.warn(`Database commit not found, trying git fetch`.yellow); await repo.fetchAll(); commit = await repo.getCommit(commitSha); } const commitTree = await commit.getTree(); const headTree = await head.getTree(); const diff = await headTree.diff(commitTree); await pushChanges(diff); } await pushChanges(await repoExt.getUnstaged(repo)); await pushChanges(await repoExt.getStaged(repo)); const routines = []; for (const change of changes) routines.push(new Routine(change)); return routines.sort((a, b) => { if (b.mark != a.mark) return b.mark == '-' ? -1 : 1; if (b.type.name !== a.type.name) { if (b.type.name == 'VIEW') return -1; if (a.type.name == 'VIEW') return 1; } return a.path.localeCompare(b.path); }); } } const typeMap = { events: { name: 'EVENT', abbr: 'EVNT', color: 'cyan' }, functions: { name: 'FUNCTION', abbr: 'FUNC', color: 'cyan' }, procedures: { name: 'PROCEDURE', abbr: 'PROC', color: 'yellow' }, triggers: { name: 'TRIGGER', abbr: 'TRIG', color: 'blue' }, views: { name: 'VIEW', abbr: 'VIEW', color: 'magenta' }, }; const routineTypes = new Set([ 'FUNCTION', 'PROCEDURE' ]); class Routine { constructor(change) { const path = change.path; const split = path.split('/'); const schema = split[0]; const type = typeMap[split[1]]; const name = split[2]; if (split.length !== 3 || !type) throw new Error(`Wrong routine path for '${path}', check that the sql file is located in the correct directory`); Object.assign(this, { path, mark: change.mark, type, schema, name, fullName: `${schema}.${name}`, isRoutine: routineTypes.has(type.name) }); } } module.exports = Push; if (require.main === module) new Myt().cli(Push);