mysql.js 5.17 KB
const mysql = require('mysql');
const _ = require('lodash');
const md5 = require('yoho-md5');
const qs = require('querystring');

class SqlHelper {
    constructor(database) {
        this.config = global.yoho.config.mysql;
        this.logger = global.yoho.logger;
        this.cache = global.yoho.cache;
        this.monitor = global.yoho.monitorSender;
        this.monitorType = _.get(this.monitor, 'type.SQL');
        database = database || 'mysql';
        this.createPool(database);
    }
    createPool(database) {
        this.pool = mysql.createPool(Object.assign(this.config.connect, {
            database,
            queryFormat: function(query, values) {
                if (!values) {
                    return query;
                }
                return query.replace(/\:(\w+)/g, function(txt, key) {
                    if (values.hasOwnProperty(key)) {
                        return this.escape(values[key]);
                    }
                    return txt;
                }.bind(this));
            },
            connectTimeout: 2000
        }));
    }
    getConnection() {
        return new Promise((resolve, reject) => {
            this.pool.getConnection((connErr, connection) => {
                if (connErr) {
                    this.logger.error(connErr);
                    reject(connErr);
                } else {
                    resolve(connection);
                }
            });
        });
    }
    query(sql, params, options) {
        return this.execute(sql, params, options);
    }
    delete(sql, params) {
        return this.execute(sql, params).then(result => {
            return result.affectedRows;
        });
    }
    update(sql, params) {
        return this.execute(sql, params).then(result => {
            return result.changedRows;
        });
    }
    insert(sql, params) {
        return this.execute(sql, params).then(result => {
            return result.insertId;
        });
    }
    async execute(sql, params, options = {}) {
        let cacheTime = parseInt(options.cache, 10) || 0;
        let cacheKey;

        // 读取缓存
        if (cacheTime) {
            let cacheString = `${sql} {${qs.stringify(params)}} ${cacheTime}`;

            cacheKey = md5(cacheString);

            let cacheResult = await this.cache.get(cacheKey);


            if (cacheResult) {
                try {
                    cacheResult = JSON.parse(cacheResult);

                    this.logger.debug('get success from cache.' + cacheString);

                    return Promise.resolve(cacheResult);
                } catch (e) {
                    this.logger.debug('cache data parse fail.' + JSON.stringify(e));
                }
            }
        }

        return new Promise((resolve, reject) => {
            this.getConnection().then(connection => {
                connection.query(sql, params, (queryErr, result) => {
                    connection.release();
                    if (queryErr) {
                        this.logger.error(queryErr);
                        reject(queryErr);
                    } else {
                        // 写入缓存
                        cacheTime && this.cache.set(cacheKey, result, cacheTime);
                        resolve(result);
                        this.monitor && this.monitor.tallySuccess(this.monitorType);
                    }
                });
            }).catch(e => {
                reject(e);
                this.monitor && this.monitor.tallyFail(this.monitorType, e);
            });
        });
    }
    transaction(sqls, cb) {
        return new Promise((resolve, reject) => {
            this.getConnection().then(connection => {
                let promises = _.map(sqls, sql => {
                    return new Promise((res, rej) => {
                        connection.query(sql, (queryErr, result) => {
                            if (queryErr) {
                                connection.rollback();
                                this.logger.error(queryErr);
                                rej(queryErr);
                            } else {
                                cb && cb(sql); // eslint-disable-line
                                res(result);
                            }
                        });
                    });
                });

                Promise.all(promises).then(results => {
                    connection.commit(err => {
                        if (err) {
                            connection.rollback(() => {
                                connection.release();
                            });
                            reject();
                        } else {
                            connection.release();
                            resolve(results);
                        }
                    });
                }, () => {
                    reject();
                });
            });
        });
    }
    changeDatabase(database) {
        return new Promise(resolve => {
            this.pool.end(() => {
                this.createPool(database);
                resolve();
            });
        });
    }
    close() {
        this.pool.end(() => {
            this.logger.log('end');
        });
    }
}

module.exports = SqlHelper;