page_cache.js 8.37 KB
/**
 *
 * @author: chenfeng<feng.chen@yoho.cn>
 * @date: 16/10/14
 */

'use strict';

import Model from './model';
import ssh from 'ssh2';
import InfluxDB from '../logger/influxdb';
import ws from '../../lib/ws';

class PageCahe extends Model {

    constructor() {
        super('page_cache');
    }
    /**
     * [替换url特殊字符用过influxdb拼接sql]
     */
    replaceUrl(uri) {
        return uri.replace(/\./g, '\\.').replace(/\//g, '\\/');
    }
    /**
     * [根据key清理缓存]
     * @param  {[array]} keys           [key列表]
     * @param  {[string]} storeTableName [pc和wap的标识]
     */
    async removeCache(queryUris, storeTableName) {
        let self = this;
        let queryUriList = queryUris.split('\n');

        self._broadcast(`共${queryUriList.length}条规则清理...`);
        for (let i = 0; i < queryUriList.length; i++) {
            let queryUri = queryUriList[i];

            self._broadcast(`规则${i+1}:${queryUri}`);
            queryUri = self.replaceUrl(queryUri);
            let sql = `select FIRST(cache_status) from ${storeTableName} where full_path =~ /${queryUri}.*/ group by "key"`;
            let result = await InfluxDB.query(sql);
            if (!result.length) {
                self._broadcast(`未查询到匹配的缓存信息`);
                continue;
            }
            let servers = await self.findAll();

            self._broadcast(`共${servers.length}nginx准备清理..`)
            for (let i = 0; i < servers.length; i++) {
                try {
                    self._broadcast(`第${i+1}nginx准备清理:`)
                    await self._removeCache(servers[i], result[0], storeTableName);
                } catch (err) {
                    self._broadcast(`错误信息:${err}`)
                }
            }
        }
        self._broadcast(`执行完毕`);
        
    }
    /**
     * [全量清理缓存]
     * @param  {[string]} storeTableName [pc和wap的标识]
     */
    async removeAllCache(storeTableName) {
        let self = this;
        let servers = await self.findAll();

        self._broadcast(`共${servers.length}nginx准备清理`)
        for (let i = 0; i < servers.length; i++) {
            let server = servers[i];

            try {
                //验证两个变量是否为空避免特殊bug造成为空后拼接的shell执行危险操作
                if (server.cachepath && storeTableName) {
                    self._broadcast(`第${i+1}台准备清理:`)
                    //连接ssh后执行操作
                    await self._connStart(async (conn) => {
                        let script = `rm -rf ${server.cachepath}/${storeTableName}`;
                        self._broadcast(`执行脚本:${script}`)
                        return await self._evalScript(conn, script);
                    }, server);
                }
            } catch (err) {
                console.log(err)
            }
        }
        self._broadcast(`执行完毕`);
    }
    /**
     * [清理单台nginx的缓存]
     * @param  {[object]} server         [nginx服务器配置信息]
     * @param  {[array]} keys           [key列表]
     * @param  {[string]} storeTableName [pc和wap的标识]
     */
    async _removeCache(server, keys, storeTableName) {
        let self = this;
        let limit = 2;
        let block = parseInt(keys.length / limit, 10) + (keys.length % limit ? 1 : 0);
        let blockIndex = 0;
        //连接ssh后执行操作
        await self._connStart(async (conn) => {
            for (var i = blockIndex; i < block; i++) {
                self._broadcast(`正在执行清理,进度:${i+1}/${block}`);
                blockIndex = i;
                let items = keys.slice(i * limit, i * limit + 2);
                let script = self._joinRemoveScript(server, items, storeTableName);
                await self._evalScript(conn, script);
            }
        }, server)
    }
    /**
     * [检查shell脚本中的危险操作]
     */
    _checkDangerScript(script) {
        if (/rm -rf \/($| )/.test(script)) {
            return false;
        }
        return true;
    }
    /**
     * [启用ssh链接后执行自定义行为]
     * @param  {Function} callback [自定义函数]
     * @param  {[object]}   server   [nginx服务器配置信息]
     */
    _connStart(callback, server) {
        let self = this;
        //ssh掉线重连
        return new Promise((resolve, reject) => {
            let connStart = (err) => {
                let conn = new ssh.Client();
                conn.on('ready', async() => {
                    self._broadcast(`连接${server.host}成功`);
                    await callback(conn)
                    resolve();
                }).on('error', (msg) => {
                    self._broadcast(`连接 ${server.host}失败, ${msg}`);
                    if (err < 20) {
                        self._broadcast(`尝试第${err+1}次重连`);
                        connStart(++err);
                    } else {
                        self._broadcast(`超出连接次数,服务器${server.host}执行失败`);
                        reject('limit error');
                    }
                }).connect({
                    host: server.host,
                    username: server.username,
                    password: server.password,
                    port: server.port
                });
                
            }
            connStart(0, 0);
        });
    }
    /**
     * [执行脚本]
     * @param  {[object]} conn   [ssh链接对象]
     * @param  {[string]} script [执行的shell脚本]
     */
   _evalScript(conn, script) {
        let self = this;
        return new Promise((resolve, reject) => {
            //检查shell脚本的危险操作
            if (self._checkDangerScript(script)) {
                    console.log(script)
                    let result = conn.exec(script, (err, stream) => {
                        if (!err) {
                            resolve();
                        } else {
                            self._broadcast('执行错误:${err}');
                        }
                    });
            } else {
                self._broadcast('检测到危险操作');
                reject('检测到危险操作');
            }
        
        })
    }
    /**
     * [根据key拼接shell脚本]
     * @param  {[object]} server         [nginx服务器配置]
     * @param  {[array]} keys           [key列表]
     * @param  {[string]} storeTableName [pc和wap的标识]
     */
    _joinRemoveScript(server, keys, storeTableName) {
        let script = 'rm';
        keys.forEach((key) => {
            let level1 = key.key.substr(key.key.length - 1, 1);
            let level2 = key.key.substr(key.key.length - 3, 2);
            script += ` ${server.cachepath}/${storeTableName}/${level1}/${level2}/${key.key}`;
        });
        return script;
    }
    _broadcast(message) {
        console.log(message)
        ws.broadcast(`/page_cache/log`, {
            message: message
        });
    }
    async init() {
        let count = await this.count({});
        if (count === 0) {
            await this.insert({
                host: '10.66.1.2',
                username: 'node',
                password: 'yoho9646',
                port: 22,
                tag: 'nginx2',
                cachepath: '/usr/local/nginx'
            });
            await this.insert({
                host: '10.66.1.3',
                username: 'node',
                password: 'yoho9646',
                port: 22,
                tag: 'nginx3',
                cachepath: '/usr/local/nginx'
            });
            await this.insert({
                host: '10.66.1.15',
                username: 'node',
                password: 'yoho9646',
                port: 22,
                tag: 'nginx15',
                cachepath: '/usr/local/nginx'
            });
            await this.insert({
                host: '10.66.1.84',
                username: 'node',
                password: 'yoho9646',
                port: 22,
                tag: 'nginx84',
                cachepath: '/usr/local/nginx'
            });
            await this.insert({
                host: '10.66.1.97',
                username: 'node',
                password: 'yoho9646',
                port: 22,
                tag: 'nginx97',
                cachepath: '/usr/local/nginx'
            });
        }
    }
}

export default PageCahe;