const mysql = require('mysql'); const ParameterizedSQL = require('loopback-connector').ParameterizedSQL; const MySQL = require('loopback-connector-mysql').MySQL; const EnumFactory = require('loopback-connector-mysql').EnumFactory; const Transaction = require('loopback-connector').Transaction; const fs = require('fs'); class VnMySQL extends MySQL { /** * Promisified version of execute(). * * @param {String} query The SQL query string * @param {Array} params The query parameters * @param {Object} options The loopback options * @param {Function} cb The callback * @return {Promise} The operation promise */ executeP(query, params, options = {}, cb) { return new Promise((resolve, reject) => { this.execute(query, params, options, (error, response) => { if (cb) cb(error, response); if (error) reject(error); else resolve(response); }); }); } /** * Executes an SQL query from an Stmt. * * @param {ParameterizedSql} stmt - Stmt object * @param {Object} options Query options (Ex: {transaction}) * @return {Object} Connector promise */ executeStmt(stmt, options) { return this.executeP(stmt.sql, stmt.params, options); } /** * Executes a query from an SQL script. * * @param {String} sqlScript The sql script file * @param {Array} params The query parameters * @param {Object} options Query options (Ex: {transaction}) * @return {Object} Connector promise */ executeScript(sqlScript, params, options) { return new Promise((resolve, reject) => { fs.readFile(sqlScript, 'utf8', (err, contents) => { if (err) return reject(err); this.execute(contents, params, options) .then(resolve, reject); }); }); } /** * Build the SQL WHERE clause for the where object without checking that * properties exists in the model. * * @param {object} where An object for the where conditions * @return {ParameterizedSQL} The SQL WHERE clause */ makeWhere(where) { let wrappedConnector = Object.create(this); Object.assign(wrappedConnector, { getModelDefinition() { return { properties: new Proxy({}, { get: () => true }) }; }, toColumnValue(_, val) { return val; }, columnEscaped(_, property) { return this.escapeName(property); } }); return wrappedConnector.buildWhere(null, where); } /** * Constructs SQL GROUP BY clause from Loopback filter. * * @param {Object} group The group by definition * @return {String} Built SQL group by */ makeGroupBy(group) { if (!group) return ''; if (typeof group === 'string') group = [group]; let clauses = []; for (let clause of group) { let sqlGroup = ''; let t = clause.split(/[\s,]+/); sqlGroup += this.escapeName(t[0]); clauses.push(sqlGroup); } return `GROUP BY ${clauses.join(', ')}`; } /** * Constructs SQL order clause from Loopback filter. * * @param {Object} order The order definition * @return {String} Built SQL order */ makeOrderBy(order) { if (!order) return ''; if (typeof order === 'string') order = [order]; let clauses = []; for (let clause of order) { let sqlOrder = ''; let t = clause.split(/[\s,]+/); sqlOrder += this.escapeName(t[0]); if (t.length > 1) sqlOrder += ' ' + (t[1].toUpperCase() == 'ASC' ? 'ASC' : 'DESC'); clauses.push(sqlOrder); } return `ORDER BY ${clauses.join(', ')}`; } /** * Constructs SQL limit clause from Loopback filter. * * @param {Object} filter The loopback filter * @return {String} Built SQL limit */ makeLimit(filter) { let limit = parseInt(filter.limit); let offset = parseInt(filter.offset || filter.skip); return this._buildLimit(null, limit, offset); } /** * Constructs SQL pagination from Loopback filter. * * @param {Object} filter The loopback filter * @return {String} Built SQL pagination */ makePagination(filter) { return ParameterizedSQL.join([ this.makeOrderBy(filter.order), this.makeLimit(filter) ]); } /** * Constructs SQL filter including where, order and limit * clauses from Loopback filter. * * @param {Object} filter The loopback filter * @return {String} Built SQL filter */ makeSuffix(filter) { return ParameterizedSQL.join([ this.makeWhere(filter.where), this.makePagination(filter) ]); } /** * Constructs SQL where clause from Loopback filter discarding * properties that not pertain to the model. If defined, appends * the table alias to each field. * * @param {String} model The model name * @param {Object} where The loopback where filter * @param {String} tableAlias Query main table alias * @return {String} Built SQL where */ buildModelWhere(model, where, tableAlias) { let parent = this; let wrappedConnector = Object.create(this); Object.assign(wrappedConnector, { columnEscaped(model, property) { let sql = tableAlias ? this.escapeName(tableAlias) + '.' : ''; return sql + parent.columnEscaped(model, property); } }); return wrappedConnector.buildWhere(model, where); } /** * Constructs SQL where clause from Loopback filter discarding * properties that not pertain to the model. If defined, appends * the table alias to each field. * * @param {String} model The model name * @param {Object} filter The loopback filter * @param {String} tableAlias Query main table alias * @return {String} Built SQL suffix */ buildModelSuffix(model, filter, tableAlias) { return ParameterizedSQL.join([ this.buildModelWhere(model, filter.where, tableAlias), this.makePagination(filter) ]); } create(model, data, opts, cb) { const ctx = {data}; this.invokeMethod('create', arguments, model, ctx, opts, cb); } createAll(model, data, opts, cb) { const ctx = {data}; this.invokeMethod('createAll', arguments, model, ctx, opts, cb); } save(model, data, opts, cb) { const ctx = {data}; this.invokeMethod('save', arguments, model, ctx, opts, cb); } updateOrCreate(model, data, opts, cb) { const ctx = {data}; this.invokeMethod('updateOrCreate', arguments, model, ctx, opts, cb); } replaceOrCreate(model, data, opts, cb) { const ctx = {data}; this.invokeMethod('replaceOrCreate', arguments, model, ctx, opts, cb); } destroyAll(model, where, opts, cb) { const ctx = {where}; this.invokeMethod('destroyAll', arguments, model, ctx, opts, cb); } update(model, where, data, opts, cb) { const ctx = {where, data}; this.invokeMethod('update', arguments, model, ctx, opts, cb); } replaceById(model, id, data, opts, cb) { const ctx = {id, data}; this.invokeMethod('replaceById', arguments, model, ctx, opts, cb); } isLoggable(model) { const Model = this.getModelDefinition(model).model; const settings = Model.definition.settings; return settings.base && settings.base === 'Loggable'; } invokeMethod(method, args, model, ctx, opts, cb) { if (!this.isLoggable(model)) return super[method].apply(this, args); this.invokeMethodP(method, [...args], model, ctx, opts) .then(res => cb(...res), cb); } async invokeMethodP(method, args, model, ctx, opts) { let tx; if (!opts.transaction) { tx = await Transaction.begin(this, {}); opts = Object.assign({transaction: tx, httpCtx: opts.httpCtx}, opts); } try { // Fetch old values (update|delete) or login await this.grabUserLog(model, opts, 'login'); let where = ctx.where; const id = ctx.id; const data = ctx.data; const idName = this.idName(model); const limitSet = new Set([ 'save', 'updateOrCreate', 'replaceOrCreate', 'replaceById', 'updateAttributes', 'update' ]); const limit = limitSet.has(method); const opOpts = { update: [ 'update', 'replaceById', 'updateAttributes', // |insert 'save', 'updateOrCreate', 'replaceOrCreate' ], delete: [ 'destroy', 'destroyAll' ], insert: [ 'create' ] }; const opMap = new Map(); for (const op in opOpts) { for (const met of opOpts[op]) opMap.set(met, op); } const op = opMap.get(method); if (!where) { if (id) where = {[idName]: id}; else where = {[idName]: data[idName]}; } let oldInstances; let newInstances; // Fetch old values switch (op) { case 'update': case 'delete': // Single entity operation const stmt = this.buildSelectStmt(op, data, idName, model, where, limit); stmt.merge(`FOR UPDATE`); oldInstances = await this.executeStmt(stmt, opts); } const res = await new Promise(resolve => { const fnArgs = args.slice(0, -2); fnArgs.push(opts); fnArgs.push((...args) => resolve(args)); super[method].apply(this, fnArgs); }); // Fetch new values const ids = []; switch (op) { case 'insert': case 'update': { switch (method) { case 'createAll': for (const row of res[1]) ids.push(row[idName]); break; case 'create': ids.push(res[1]); break; case 'update': if (data[idName] != null) ids.push(data[idName]); break; } const newWhere = ids.length ? {[idName]: ids} : where; const stmt = this.buildSelectStmt(op, data, idName, model, newWhere, limit); newInstances = await this.executeStmt(stmt, opts); } } await this.createLogRecord(oldInstances, newInstances, model, opts); await this.grabUserLog(model, opts, 'logout'); if (tx) await tx.commit(); return res; } catch (err) { if (tx) tx.rollback(); throw err; } } async grabUserLog(model, opts, logAction) { const Model = this.getModelDefinition(model).model; const settings = Model.definition.settings; if (!(settings.log && settings.log.grabUser)) return; if (logAction === 'login') { const userId = opts.httpCtx && opts.httpCtx.active.accessToken.userId; const user = await Model.app.models.Account.findById(userId, {fields: ['name']}, opts); await this.executeP(`CALL account.myUser_loginWithName(?)`, [user.name], opts); } else await this.executeP(`CALL account.myUser_logout()`, null, opts); } buildSelectStmt(op, data, idName, model, where, limit) { const Model = this.getModelDefinition(model).model; const settings = Model.definition.settings; const properties = Object.keys(Model.definition.properties); const log = settings.log; const fields = data ? Object.keys(data) : []; if (op == 'delete') properties.forEach(property => fields.push(property)); else { fields.push(idName); if (log.relation) fields.push(Model.relations[log.relation].keyFrom); if (log.showField) fields.push(log.showField); else { const showFieldNames = ['name', 'description', 'code', 'nickname']; for (const field of showFieldNames) { if (properties.includes(field)) { log.showField = field; fields.push(field); break; } } } } const stmt = new ParameterizedSQL( 'SELECT ' + this.buildColumnNames(model, {fields}) + ' FROM ' + this.tableEscaped(model) ); stmt.merge(this.buildWhere(model, where)); if (limit) stmt.merge(`LIMIT 1`); return stmt; } async createLogRecord(oldInstances, newInstances, model, opts) { function setActionType() { if (oldInstances && newInstances) return 'update'; else if (!oldInstances && newInstances) return 'insert'; return 'delete'; } const action = setActionType(); if (!newInstances && action != 'delete') return; const Model = this.getModelDefinition(model).model; const models = Model.app.models; const definition = Model.definition; const log = definition.settings.log; const primaryKey = this.idName(model); const originRelation = log.relation; const originFkField = originRelation ? Model.relations[originRelation].keyFrom : primaryKey; // Prevent adding logs when deleting a principal entity (Client, Zone...) if (action == 'delete' && !originRelation) return; function map(instances) { const map = new Map(); if (!instances) return; for (const instance of instances) map.set(instance[primaryKey], instance); return map; } const changedModel = definition.name; const userFk = opts.httpCtx && opts.httpCtx.active.accessToken.userId; const oldMap = map(oldInstances); const newMap = map(newInstances); const ids = (oldMap || newMap).keys(); const logEntries = []; function insertValuesLogEntry(logEntry, instance) { logEntry.originFk = instance[originFkField]; logEntry.changedModelId = instance[primaryKey]; if (log.showField) logEntry.changedModelValue = instance[log.showField]; } for (const id of ids) { const oldI = oldMap && oldMap.get(id); const newI = newMap && newMap.get(id); const logEntry = { action, userFk, changedModel, }; if (newI) { insertValuesLogEntry(logEntry, newI); // Delete unchanged properties if (oldI) { Object.keys(oldI).forEach(prop => { if (newI[prop] == oldI[prop]) { delete newI[prop]; delete oldI[prop]; } }); } } else insertValuesLogEntry(logEntry, oldI); logEntry.oldInstance = oldI; logEntry.newInstance = newI; logEntries.push(logEntry); } await models[log.model].create(logEntries, opts); } } exports.VnMySQL = VnMySQL; exports.initialize = function initialize(dataSource, callback) { dataSource.driver = mysql; dataSource.connector = new VnMySQL(dataSource.settings); dataSource.connector.dataSource = dataSource; const modelBuilder = dataSource.modelBuilder; const defineType = modelBuilder.defineValueType ? modelBuilder.defineValueType.bind(modelBuilder) : modelBuilder.constructor.registerType.bind(modelBuilder.constructor); defineType(function Point() {}); dataSource.EnumFactory = EnumFactory; if (callback) { if (dataSource.settings.lazyConnect) { process.nextTick(function() { callback(); }); } else dataSource.connector.connect(callback); } }; MySQL.prototype.connect = function(callback) { const self = this; const options = generateOptions(this.settings); if (this.client) { if (callback) { process.nextTick(function() { callback(null, self.client); }); } } else this.client = connectionHandler(options, callback); function connectionHandler(options, callback) { const client = mysql.createPool(options); client.getConnection(function(err, connection) { const conn = connection; if (!err) { if (self.debug) debug('MySQL connection is established: ' + self.settings || {}); connection.release(); } else { if (err.code == 'ECONNREFUSED' || err.code == 'PROTOCOL_CONNECTION_LOST') { // PROTOCOL_CONNECTION_LOST console.error(`MySQL connection lost (${err.code}). Retrying..`); return setTimeout(() => connectionHandler(options, callback), 5000); } if (self.debug || !callback) console.error('MySQL connection is failed: ' + self.settings || {}, err); } callback && callback(err, conn); }); return client; } }; function generateOptions(settings) { const s = settings || {}; if (s.collation) { // Charset should be first 'chunk' of collation. s.charset = s.collation.substr(0, s.collation.indexOf('_')); } else { s.collation = 'utf8_general_ci'; s.charset = 'utf8'; } s.supportBigNumbers = (s.supportBigNumbers || false); s.timezone = (s.timezone || 'local'); if (isNaN(s.connectionLimit)) s.connectionLimit = 10; let options; if (s.url) { // use url to override other settings if url provided options = s.url; } else { options = { host: s.host || s.hostname || 'localhost', port: s.port || 3306, user: s.username || s.user, password: s.password, timezone: s.timezone, socketPath: s.socketPath, charset: s.collation.toUpperCase(), // Correct by docs despite seeming odd. supportBigNumbers: s.supportBigNumbers, connectionLimit: s.connectionLimit, }; // Don't configure the DB if the pool can be used for multiple DBs if (!s.createDatabase) options.database = s.database; // Take other options for mysql driver // See https://github.com/strongloop/loopback-connector-mysql/issues/46 for (const p in s) { if (p === 'database' && s.createDatabase) continue; if (options[p] === undefined) options[p] = s[p]; } // Legacy UTC Date Processing fallback - SHOULD BE TRANSITIONAL if (s.legacyUtcDateProcessing === undefined) s.legacyUtcDateProcessing = true; if (s.legacyUtcDateProcessing) options.timezone = 'Z'; } return options; }