const mysql = require('mysql'); const _ = require('lodash'); const md5 = require('yoho-md5'); const qs = require('querystring'); class SqlHelper { constructor(database) { this.config = global.yoho.config.mysql; this.logger = global.yoho.logger; this.cache = global.yoho.cache; this.monitor = global.yoho.monitorSender; this.monitorType = _.get(this.monitor, 'type.SQL'); database = database || 'mysql'; this.createPool(database); } createPool(database) { this.pool = mysql.createPool(Object.assign(this.config.connect, { database, queryFormat: function(query, values) { if (!values) { return query; } return query.replace(/\:(\w+)/g, function(txt, key) { if (values.hasOwnProperty(key)) { return this.escape(values[key]); } return txt; }.bind(this)); }, connectTimeout: 2000 })); } getConnection() { return new Promise((resolve, reject) => { this.pool.getConnection((connErr, connection) => { if (connErr) { this.logger.error(connErr); reject(connErr); } else { resolve(connection); } }); }); } query(sql, params, options) { return this.execute(sql, params, options); } delete(sql, params) { return this.execute(sql, params).then(result => { return result.affectedRows; }); } update(sql, params) { return this.execute(sql, params).then(result => { return result.changedRows; }); } insert(sql, params) { return this.execute(sql, params).then(result => { return result.insertId; }); } async execute(sql, params, options = {}) { let cacheTime = parseInt(options.cache, 10) || 0; let cacheKey; // 读取缓存 if (cacheTime) { let cacheString = `${sql} {${qs.stringify(params)}} ${cacheTime}`; cacheKey = md5(cacheString); let cacheResult = await this.cache.get(cacheKey); if (cacheResult) { try { cacheResult = JSON.parse(cacheResult); this.logger.debug('get success from cache.' + cacheString); return Promise.resolve(cacheResult); } catch (e) { this.logger.debug('cache data parse fail.' + JSON.stringify(e)); } } } return new Promise((resolve, reject) => { this.getConnection().then(connection => { connection.query(sql, params, (queryErr, result) => { connection.release(); if (queryErr) { this.logger.error(queryErr); reject(queryErr); } else { // 写入缓存 cacheTime && this.cache.set(cacheKey, result, cacheTime); resolve(result); this.monitor && this.monitor.tallySuccess(this.monitorType); } }); }).catch(e => { reject(e); this.monitor && this.monitor.tallyFail(this.monitorType, e); }); }); } transaction(sqls, cb) { return new Promise((resolve, reject) => { this.getConnection().then(connection => { let promises = _.map(sqls, sql => { return new Promise((res, rej) => { connection.query(sql, (queryErr, result) => { if (queryErr) { connection.rollback(); this.logger.error(queryErr); rej(queryErr); } else { cb && cb(sql); // eslint-disable-line res(result); } }); }); }); Promise.all(promises).then(results => { connection.commit(err => { if (err) { connection.rollback(() => { connection.release(); }); reject(); } else { connection.release(); resolve(results); } }); }, () => { reject(); }); }); }); } changeDatabase(database) { return new Promise(resolve => { this.pool.end(() => { this.createPool(database); resolve(); }); }); } close() { this.pool.end(() => { this.logger.log('end'); }); } } module.exports = SqlHelper;