674 lines
21 KiB
JavaScript
674 lines
21 KiB
JavaScript
const mysql = require('mysql');
|
|
const MySQL = require('loopback-connector-mysql').MySQL;
|
|
const EnumFactory = require('loopback-connector-mysql').EnumFactory;
|
|
const { Transaction, SQLConnector, ParameterizedSQL } = require('loopback-connector');
|
|
const fs = require('fs');
|
|
|
|
const limitSet = new Set([
|
|
'save',
|
|
'updateOrCreate',
|
|
'replaceOrCreate',
|
|
'replaceById',
|
|
'update'
|
|
]);
|
|
|
|
const opOpts = {
|
|
update: [
|
|
'update',
|
|
'replaceById',
|
|
// |insert
|
|
'save',
|
|
'updateOrCreate',
|
|
'replaceOrCreate'
|
|
],
|
|
delete: [
|
|
'destroy',
|
|
'destroyAll'
|
|
],
|
|
insert: [
|
|
'create'
|
|
]
|
|
};
|
|
|
|
const opMap = new Map();
|
|
for (const op in opOpts) {
|
|
for (const met of opOpts[op])
|
|
opMap.set(met, op);
|
|
}
|
|
|
|
class VnMySQL extends MySQL {
|
|
/**
|
|
* Promisified version of execute().
|
|
*
|
|
* @param {String} query The SQL query string
|
|
* @param {Array} params The query parameters
|
|
* @param {Object} options The loopback options
|
|
* @param {Function} cb The callback
|
|
* @return {Promise} The operation promise
|
|
*/
|
|
executeP(query, params, options = {}, cb) {
|
|
return new Promise((resolve, reject) => {
|
|
this.execute(query, params, options, (error, response) => {
|
|
if (cb)
|
|
cb(error, response);
|
|
if (error)
|
|
reject(error);
|
|
else
|
|
resolve(response);
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Executes an SQL query from an Stmt.
|
|
*
|
|
* @param {ParameterizedSql} stmt - Stmt object
|
|
* @param {Object} options Query options (Ex: {transaction})
|
|
* @return {Object} Connector promise
|
|
*/
|
|
executeStmt(stmt, options) {
|
|
return this.executeP(stmt.sql, stmt.params, options);
|
|
}
|
|
|
|
/**
|
|
* Executes a query from an SQL script.
|
|
*
|
|
* @param {String} sqlScript The sql script file
|
|
* @param {Array} params The query parameters
|
|
* @param {Object} options Query options (Ex: {transaction})
|
|
* @return {Object} Connector promise
|
|
*/
|
|
executeScript(sqlScript, params, options) {
|
|
return new Promise((resolve, reject) => {
|
|
fs.readFile(sqlScript, 'utf8', (err, contents) => {
|
|
if (err) return reject(err);
|
|
this.execute(contents, params, options)
|
|
.then(resolve, reject);
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Build the SQL WHERE clause for the where object without checking that
|
|
* properties exists in the model.
|
|
*
|
|
* @param {object} where An object for the where conditions
|
|
* @return {ParameterizedSQL} The SQL WHERE clause
|
|
*/
|
|
makeWhere(where) {
|
|
let wrappedConnector = Object.create(this);
|
|
Object.assign(wrappedConnector, {
|
|
getModelDefinition() {
|
|
return {
|
|
properties: new Proxy({}, {
|
|
get: () => true
|
|
})
|
|
};
|
|
},
|
|
toColumnValue(_, val) {
|
|
return val;
|
|
},
|
|
columnEscaped(_, property) {
|
|
return this.escapeName(property);
|
|
}
|
|
});
|
|
|
|
return wrappedConnector.buildWhere(null, where);
|
|
}
|
|
|
|
/**
|
|
* Constructs SQL GROUP BY clause from Loopback filter.
|
|
*
|
|
* @param {Object} group The group by definition
|
|
* @return {String} Built SQL group by
|
|
*/
|
|
makeGroupBy(group) {
|
|
if (!group)
|
|
return '';
|
|
if (typeof group === 'string')
|
|
group = [group];
|
|
|
|
let clauses = [];
|
|
|
|
for (let clause of group) {
|
|
let sqlGroup = '';
|
|
let t = clause.split(/[\s,]+/);
|
|
|
|
sqlGroup += this.escapeName(t[0]);
|
|
|
|
clauses.push(sqlGroup);
|
|
}
|
|
|
|
return `GROUP BY ${clauses.join(', ')}`;
|
|
}
|
|
|
|
/**
|
|
* Constructs SQL order clause from Loopback filter.
|
|
*
|
|
* @param {Object} order The order definition
|
|
* @return {String} Built SQL order
|
|
*/
|
|
makeOrderBy(order) {
|
|
if (!order)
|
|
return '';
|
|
if (typeof order === 'string')
|
|
order = [order];
|
|
|
|
let clauses = [];
|
|
|
|
for (let clause of order) {
|
|
let sqlOrder = '';
|
|
let t = clause.split(/[\s,]+/);
|
|
|
|
sqlOrder += this.escapeName(t[0]);
|
|
|
|
if (t.length > 1)
|
|
sqlOrder += ' ' + (t[1].toUpperCase() == 'ASC' ? 'ASC' : 'DESC');
|
|
|
|
clauses.push(sqlOrder);
|
|
}
|
|
|
|
return `ORDER BY ${clauses.join(', ')}`;
|
|
}
|
|
|
|
/**
|
|
* Constructs SQL limit clause from Loopback filter.
|
|
*
|
|
* @param {Object} filter The loopback filter
|
|
* @return {String} Built SQL limit
|
|
*/
|
|
makeLimit(filter) {
|
|
let limit = parseInt(filter.limit);
|
|
let offset = parseInt(filter.offset || filter.skip);
|
|
return this._buildLimit(null, limit, offset);
|
|
}
|
|
|
|
/**
|
|
* Constructs SQL pagination from Loopback filter.
|
|
*
|
|
* @param {Object} filter The loopback filter
|
|
* @return {String} Built SQL pagination
|
|
*/
|
|
makePagination(filter) {
|
|
return ParameterizedSQL.join([
|
|
this.makeOrderBy(filter.order),
|
|
this.makeLimit(filter)
|
|
]);
|
|
}
|
|
|
|
/**
|
|
* Constructs SQL filter including where, order and limit
|
|
* clauses from Loopback filter.
|
|
*
|
|
* @param {Object} filter The loopback filter
|
|
* @return {String} Built SQL filter
|
|
*/
|
|
makeSuffix(filter) {
|
|
return ParameterizedSQL.join([
|
|
this.makeWhere(filter.where),
|
|
this.makePagination(filter)
|
|
]);
|
|
}
|
|
|
|
/**
|
|
* Constructs SQL where clause from Loopback filter discarding
|
|
* properties that not pertain to the model. If defined, appends
|
|
* the table alias to each field.
|
|
*
|
|
* @param {String} model The model name
|
|
* @param {Object} where The loopback where filter
|
|
* @param {String} tableAlias Query main table alias
|
|
* @return {String} Built SQL where
|
|
*/
|
|
buildModelWhere(model, where, tableAlias) {
|
|
let parent = this;
|
|
let wrappedConnector = Object.create(this);
|
|
Object.assign(wrappedConnector, {
|
|
columnEscaped(model, property) {
|
|
let sql = tableAlias
|
|
? this.escapeName(tableAlias) + '.'
|
|
: '';
|
|
return sql + parent.columnEscaped(model, property);
|
|
}
|
|
});
|
|
|
|
return wrappedConnector.buildWhere(model, where);
|
|
}
|
|
|
|
/**
|
|
* Constructs SQL where clause from Loopback filter discarding
|
|
* properties that not pertain to the model. If defined, appends
|
|
* the table alias to each field.
|
|
*
|
|
* @param {String} model The model name
|
|
* @param {Object} filter The loopback filter
|
|
* @param {String} tableAlias Query main table alias
|
|
* @return {String} Built SQL suffix
|
|
*/
|
|
buildModelSuffix(model, filter, tableAlias) {
|
|
return ParameterizedSQL.join([
|
|
this.buildModelWhere(model, filter.where, tableAlias),
|
|
this.makePagination(filter)
|
|
]);
|
|
}
|
|
|
|
create(model, data, opts, cb) {
|
|
const ctx = { data };
|
|
this.invokeMethod('create',
|
|
arguments, model, ctx, opts, cb);
|
|
}
|
|
|
|
createAll(model, data, opts, cb) {
|
|
const ctx = { data };
|
|
this.invokeMethod('createAll',
|
|
arguments, model, ctx, opts, cb);
|
|
}
|
|
|
|
save(model, data, opts, cb) {
|
|
const ctx = { data };
|
|
this.invokeMethod('save',
|
|
arguments, model, ctx, opts, cb);
|
|
}
|
|
|
|
updateOrCreate(model, data, opts, cb) {
|
|
const ctx = { data };
|
|
this.invokeMethod('updateOrCreate',
|
|
arguments, model, ctx, opts, cb);
|
|
}
|
|
|
|
replaceOrCreate(model, data, opts, cb) {
|
|
const ctx = { data };
|
|
this.invokeMethod('replaceOrCreate',
|
|
arguments, model, ctx, opts, cb);
|
|
}
|
|
|
|
destroyAll(model, where, opts, cb) {
|
|
const ctx = { where };
|
|
this.invokeMethod('destroyAll',
|
|
arguments, model, ctx, opts, cb);
|
|
}
|
|
|
|
update(model, where, data, opts, cb) {
|
|
const ctx = { where, data };
|
|
this.invokeMethod('update',
|
|
arguments, model, ctx, opts, cb);
|
|
}
|
|
|
|
replaceById(model, id, data, opts, cb) {
|
|
const ctx = { id, data };
|
|
this.invokeMethod('replaceById',
|
|
arguments, model, ctx, opts, cb);
|
|
}
|
|
|
|
isLoggable(model) {
|
|
const Model = this.getModelDefinition(model).model;
|
|
const settings = Model.definition.settings;
|
|
return settings.base && settings.base === 'Loggable';
|
|
}
|
|
|
|
invokeMethod(method, args, model, ctx, opts, cb) {
|
|
if (!this.isLoggable(model))
|
|
return super[method].apply(this, args);
|
|
|
|
this.invokeMethodP(method, [...args], model, ctx, opts)
|
|
.then(res => cb(...res), cb);
|
|
}
|
|
|
|
async invokeMethodP(method, args, model, ctx, opts) {
|
|
const Model = this.getModelDefinition(model).model;
|
|
const settings = Model.definition.settings;
|
|
let tx;
|
|
if (!opts.transaction) {
|
|
tx = await Transaction.begin(this, {});
|
|
opts = Object.assign({ transaction: tx, httpCtx: opts.httpCtx }, opts);
|
|
}
|
|
|
|
try {
|
|
// Fetch old values (update|delete) or login
|
|
let where, id, data, idName, limit, op, oldInstances, newInstances;
|
|
const hasGrabUser = settings.log && settings.log.grabUser;
|
|
if (hasGrabUser) {
|
|
const userId = opts.httpCtx && opts.httpCtx.active.accessToken.userId;
|
|
const user = await Model.app.models.Account.findById(userId, { fields: ['name'] }, opts);
|
|
await this.executeP(`CALL account.myUser_loginWithName(?)`, [user.name], opts);
|
|
}
|
|
else {
|
|
where = ctx.where;
|
|
id = ctx.id;
|
|
data = ctx.data;
|
|
idName = this.idName(model);
|
|
|
|
limit = limitSet.has(method);
|
|
|
|
op = opMap.get(method);
|
|
|
|
if (!where) {
|
|
if (id) where = { [idName]: id };
|
|
else where = { [idName]: data[idName] };
|
|
}
|
|
|
|
// Fetch old values
|
|
switch (op) {
|
|
case 'update':
|
|
case 'delete':
|
|
// Single entity operation
|
|
const stmt = this.buildSelectStmt(op, data, idName, model, where, limit);
|
|
stmt.merge(`FOR UPDATE`);
|
|
oldInstances = await this.executeStmt(stmt, opts);
|
|
}
|
|
}
|
|
|
|
const res = await new Promise(resolve => {
|
|
const fnArgs = args.slice(0, -2);
|
|
fnArgs.push(opts, (...args) => resolve(args));
|
|
super[method].apply(this, fnArgs);
|
|
});
|
|
|
|
if (hasGrabUser)
|
|
await this.executeP(`CALL account.myUser_logout()`, null, opts);
|
|
else {
|
|
// Fetch new values
|
|
const ids = [];
|
|
|
|
switch (op) {
|
|
case 'insert':
|
|
case 'update': {
|
|
switch (method) {
|
|
case 'createAll':
|
|
for (const row of res[1])
|
|
ids.push(row[idName]);
|
|
break;
|
|
case 'create':
|
|
ids.push(res[1]);
|
|
break;
|
|
case 'update':
|
|
if (data[idName] != null)
|
|
ids.push(data[idName]);
|
|
break;
|
|
}
|
|
|
|
const newWhere = ids.length ? { [idName]: ids } : where;
|
|
|
|
const stmt = this.buildSelectStmt(op, data, idName, model, newWhere, limit);
|
|
newInstances = await this.executeStmt(stmt, opts);
|
|
}
|
|
}
|
|
|
|
await this.createLogRecord(oldInstances, newInstances, model, opts);
|
|
}
|
|
if (tx) await tx.commit();
|
|
return res;
|
|
} catch (err) {
|
|
if (tx) tx.rollback();
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
buildSelectStmt(op, data, idName, model, where, limit) {
|
|
const Model = this.getModelDefinition(model).model;
|
|
const properties = Object.keys(Model.definition.properties);
|
|
|
|
const fields = data ? Object.keys(data) : [];
|
|
if (op == 'delete')
|
|
properties.forEach(property => fields.push(property));
|
|
else {
|
|
const log = Model.definition.settings.log;
|
|
fields.push(idName);
|
|
if (log.relation) fields.push(Model.relations[log.relation].keyFrom);
|
|
if (log.showField) fields.push(log.showField);
|
|
else {
|
|
const showFieldNames = ['name', 'description', 'code', 'nickname'];
|
|
for (const field of showFieldNames) {
|
|
if (properties.includes(field)) {
|
|
log.showField = field;
|
|
fields.push(field);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
const stmt = new ParameterizedSQL(
|
|
'SELECT ' +
|
|
this.buildColumnNames(model, { fields }) +
|
|
' FROM ' +
|
|
this.tableEscaped(model)
|
|
);
|
|
stmt.merge(this.buildWhere(model, where));
|
|
if (limit) stmt.merge(`LIMIT 1`);
|
|
|
|
return stmt;
|
|
}
|
|
|
|
async createLogRecord(oldInstances, newInstances, model, opts) {
|
|
function setActionType() {
|
|
if (oldInstances && newInstances)
|
|
return 'update';
|
|
else if (!oldInstances && newInstances)
|
|
return 'insert';
|
|
return 'delete';
|
|
}
|
|
|
|
const action = setActionType();
|
|
if (!newInstances && action != 'delete') return;
|
|
|
|
const Model = this.getModelDefinition(model).model;
|
|
const models = Model.app.models;
|
|
const definition = Model.definition;
|
|
const log = definition.settings.log;
|
|
|
|
const primaryKey = this.idName(model);
|
|
const originRelation = log.relation;
|
|
const originFkField = originRelation
|
|
? Model.relations[originRelation].keyFrom
|
|
: primaryKey;
|
|
|
|
// Prevent adding logs when deleting a principal entity (Client, Zone...)
|
|
if (action == 'delete' && !originRelation) return;
|
|
|
|
function map(instances) {
|
|
const map = new Map();
|
|
if (!instances) return;
|
|
for (const instance of instances)
|
|
map.set(instance[primaryKey], instance);
|
|
return map;
|
|
}
|
|
|
|
const changedModel = definition.name;
|
|
const userFk = opts.httpCtx && opts.httpCtx.active.accessToken.userId;
|
|
const oldMap = map(oldInstances);
|
|
const newMap = map(newInstances);
|
|
const ids = (oldMap || newMap).keys();
|
|
|
|
const logEntries = [];
|
|
|
|
function insertValuesLogEntry(logEntry, instance) {
|
|
logEntry.originFk = instance[originFkField];
|
|
logEntry.changedModelId = instance[primaryKey];
|
|
if (log.showField) logEntry.changedModelValue = instance[log.showField];
|
|
}
|
|
|
|
for (const id of ids) {
|
|
const oldI = oldMap && oldMap.get(id);
|
|
const newI = newMap && newMap.get(id);
|
|
|
|
const logEntry = {
|
|
action,
|
|
userFk,
|
|
changedModel,
|
|
};
|
|
|
|
if (newI) {
|
|
insertValuesLogEntry(logEntry, newI);
|
|
// Delete unchanged properties
|
|
if (oldI) {
|
|
Object.keys(oldI).forEach(prop => {
|
|
const hasChanges = oldI[prop] instanceof Date ?
|
|
oldI[prop]?.getTime() != newI[prop]?.getTime() :
|
|
oldI[prop] != newI[prop];
|
|
|
|
if (!hasChanges) {
|
|
delete oldI[prop];
|
|
delete newI[prop];
|
|
}
|
|
});
|
|
}
|
|
} else
|
|
insertValuesLogEntry(logEntry, oldI);
|
|
|
|
logEntry.oldInstance = oldI;
|
|
logEntry.newInstance = newI;
|
|
logEntries.push(logEntry);
|
|
}
|
|
await models[log.model].create(logEntries, opts);
|
|
}
|
|
}
|
|
|
|
exports.VnMySQL = VnMySQL;
|
|
|
|
exports.initialize = function initialize(dataSource, callback) {
|
|
dataSource.driver = mysql;
|
|
dataSource.connector = new VnMySQL(dataSource.settings);
|
|
dataSource.connector.dataSource = dataSource;
|
|
|
|
const modelBuilder = dataSource.modelBuilder;
|
|
const defineType = modelBuilder.defineValueType ?
|
|
modelBuilder.defineValueType.bind(modelBuilder) :
|
|
modelBuilder.constructor.registerType.bind(modelBuilder.constructor);
|
|
|
|
defineType(function Point() { });
|
|
|
|
dataSource.EnumFactory = EnumFactory;
|
|
|
|
if (callback) {
|
|
if (dataSource.settings.lazyConnect) {
|
|
process.nextTick(function () {
|
|
callback();
|
|
});
|
|
} else
|
|
dataSource.connector.connect(callback);
|
|
}
|
|
};
|
|
|
|
MySQL.prototype.connect = function (callback) {
|
|
const self = this;
|
|
const options = generateOptions(this.settings);
|
|
|
|
if (this.client) {
|
|
if (callback) {
|
|
process.nextTick(function () {
|
|
callback(null, self.client);
|
|
});
|
|
}
|
|
} else
|
|
this.client = connectionHandler(options, callback);
|
|
|
|
function connectionHandler(options, callback) {
|
|
const client = mysql.createPool(options);
|
|
client.getConnection(function (err, connection) {
|
|
const conn = connection;
|
|
if (!err) {
|
|
if (self.debug)
|
|
debug('MySQL connection is established: ' + self.settings || {});
|
|
|
|
connection.release();
|
|
} else {
|
|
if (err.code == 'ECONNREFUSED' || err.code == 'PROTOCOL_CONNECTION_LOST') { // PROTOCOL_CONNECTION_LOST
|
|
console.error(`MySQL connection lost (${err.code}). Retrying..`);
|
|
|
|
return setTimeout(() =>
|
|
connectionHandler(options, callback), 5000);
|
|
}
|
|
if (self.debug || !callback)
|
|
console.error('MySQL connection is failed: ' + self.settings || {}, err);
|
|
}
|
|
callback && callback(err, conn);
|
|
});
|
|
|
|
return client;
|
|
}
|
|
};
|
|
|
|
function generateOptions(settings) {
|
|
const s = settings || {};
|
|
if (s.collation) {
|
|
// Charset should be first 'chunk' of collation.
|
|
s.charset = s.collation.substr(0, s.collation.indexOf('_'));
|
|
} else {
|
|
s.collation = 'utf8_general_ci';
|
|
s.charset = 'utf8';
|
|
}
|
|
|
|
s.supportBigNumbers = (s.supportBigNumbers || false);
|
|
s.timezone = (s.timezone || 'local');
|
|
|
|
if (isNaN(s.connectionLimit))
|
|
s.connectionLimit = 10;
|
|
|
|
let options;
|
|
if (s.url) {
|
|
// use url to override other settings if url provided
|
|
options = s.url;
|
|
} else {
|
|
options = {
|
|
host: s.host || s.hostname || 'localhost',
|
|
port: s.port || 3306,
|
|
user: s.username || s.user,
|
|
password: s.password,
|
|
timezone: s.timezone,
|
|
socketPath: s.socketPath,
|
|
charset: s.collation.toUpperCase(), // Correct by docs despite seeming odd.
|
|
supportBigNumbers: s.supportBigNumbers,
|
|
connectionLimit: s.connectionLimit,
|
|
};
|
|
|
|
// Don't configure the DB if the pool can be used for multiple DBs
|
|
if (!s.createDatabase)
|
|
options.database = s.database;
|
|
|
|
// Take other options for mysql driver
|
|
// See https://github.com/strongloop/loopback-connector-mysql/issues/46
|
|
for (const p in s) {
|
|
if (p === 'database' && s.createDatabase)
|
|
continue;
|
|
|
|
if (options[p] === undefined)
|
|
options[p] = s[p];
|
|
}
|
|
// Legacy UTC Date Processing fallback - SHOULD BE TRANSITIONAL
|
|
if (s.legacyUtcDateProcessing === undefined)
|
|
s.legacyUtcDateProcessing = true;
|
|
|
|
if (s.legacyUtcDateProcessing)
|
|
options.timezone = 'Z';
|
|
}
|
|
return options;
|
|
}
|
|
|
|
|
|
SQLConnector.prototype.all = function find(model, filter, options, cb) {
|
|
const self = this;
|
|
// Order by id if no order is specified
|
|
filter = filter || {};
|
|
const stmt = this.buildSelect(model, filter, options);
|
|
this.execute(stmt.sql, stmt.params, options, function (err, data) {
|
|
if (err) {
|
|
return cb(err, []);
|
|
}
|
|
|
|
try {
|
|
const objs = data.map(function (obj) {
|
|
return self.fromRow(model, obj);
|
|
});
|
|
if (filter && filter.include) {
|
|
self.getModelDefinition(model).model.include(
|
|
objs, filter.include, options, cb,
|
|
);
|
|
} else {
|
|
cb(null, objs);
|
|
}
|
|
} catch (error) {
|
|
cb(error, [])
|
|
}
|
|
});
|
|
}; |