api_cache.js 8.24 KB
'use strict';
const Memcached = require('memcached');
const ssh = require('ssh2');
const ws = require('../../lib/ws');
const _ = require('lodash');

class ApiCache {

    constructor(host) {
        this.host = host;
        this.memcached = new Memcached(host);
    }

    setKey(key, value, ttl) {
        this._log(`setting ${key}`);

        this.memcached.set(key, value, ttl, (err) => {
            if (err) {
                this._log(`set ${key} fail`)
            } else {
                this._log(`set ${key} success`)
            }
        });
    }

    delKey(key) {
        this._log(`deleting ${key}`)

        this.memcached.del(key, (err) => {
            if (err) {
                this._log(`deleted ${key} fail`)
            } else {
                this._log(`deleted ${key} success`)
            }
        });
    }

    flushAll() {
        this.memcached.flush(err => {
            if (err) {
                this._log(`flush all fail.` + err.toString())
            } else {
                this._log(`flush all success`)
            }
            this.memcached.end();
        });
    }

    find(condition) {
        let count = 0;

        return new Promise((resolve, reject) => {
            this.memcached.items((err, result) => {
                if (err) console.error(err);

                if (result.length === 0) {
                    this._log('empty items');
                    resolve([]);
                }
                // for each server...
                result.forEach(itemSet => {
                    var keys = Object.keys(itemSet);
                    keys.pop(); // we don't need the "server" key, but the other indicate the slab id's

                    var len = keys.length;

                    if (keys.length === 0) {
                        this._log('empty item set');
                        resolve([]);
                    }
                    const keyList = [];

                    keys.forEach(stats => {
                        // get a cachedump for each slabid and slab.number
                        this.memcached.cachedump(itemSet.server, parseInt(stats, 10), itemSet[stats].number, (err, response) => {

                            if (response) {
                                if (_.isArray(response)) {
                                    _.each(response, (item) => {
                                        count++;

                                        if (condition(item.key)) {
                                            keyList.push(item.key);
                                        }
                                    });
                                }
                                else {
                                    count++;

                                    if (condition(response.key)) {
                                        keyList.push(response.key);
                                    }
                                }
                            }


                            if (--len === 0) {
                                this.memcached.getMulti(keyList, (err, data) => {
                                    this.memcached.end();

                                    if (err) {
                                        reject(err);
                                    }

                                    resolve(data);
                                });
                            }
                        });
                    })
                })
            });
        });

    }

    stats() {
        this.memcached.stats((err, stats) => {
            if (err) {

            } else {
                if (_.isArray(stats)) {
                    stats = stats[0];
                }

                stats.hits_percent = (stats.get_hits / (stats.get_hits + stats.get_misses) * 100).toFixed(2) + '%';
                stats.heap_info = (stats.bytes / 1024 / 1024).toFixed(4) + 'mb' + ' / ' + stats.limit_maxbytes / 1024 / 1024 + 'mb';

                ws.broadcast(`/api_cache/monit`, {
                    host: this.host,
                    stats: stats
                });
            }
        });
    }

    restart() {
        this.memcached.stats((err, stats) => {
            if (err) {

            } else {
                if (_.isArray(stats)) {
                    stats = stats[0];
                }

                let pid = stats.pid;
                let host = this.host.split(':')[0];

                console.log(`host:  ${host}   pid: ${pid}`);

                let conn = new ssh.Client();
                let self = this;
                conn.on('ready', async() => {
                    try {
                        conn.exec(`sudo kill ${pid}`, (err, stream) => {
                            if (err) {
                                console.log(err);
                            } else {
                                stream.on('close', () => {
                                    console.log(`sudo kill ${pid} --> success`);
                                    conn.end();
                                }).on('error', (err) => {
                                    console.log(err);
                                    conn.end();
                                })
                            }
                        });
                    } catch (e) {
                        console.log(e);
                        conn.end();
                    }
                }).on('error', (err) => {
                    console.log(err);
                    conn.end();
                }).connect({
                    host: host,
                    username: 'node',
                    password: 'yoho9646',
                    port: 22
                });
            }
        });
    }

    clean(key) {
        let begin = new Date();

        let key1 = `apiCache:${key}`;
        let key2 = `apiCache2:${key}`;

        let count = 0;

        this.memcached.items((err, result) => {
            if (err) console.error(err);

            if (result.length === 0) {
                this._log('empty items')
            }
            // for each server... 
            result.forEach(itemSet => {

                var keys = Object.keys(itemSet);
                keys.pop(); // we don't need the "server" key, but the other indicate the slab id's

                var len = keys.length;

                if (keys.length === 0) {
                    this._log('empty item set');
                }

                keys.forEach(stats => {

                    // get a cachedump for each slabid and slab.number
                    this.memcached.cachedump(itemSet.server, parseInt(stats, 10), itemSet[stats].number, (err, response) => {
                        // dump the shizzle

                        if (response) {
                            if (_.isArray(response)) {
                                _.each(response, keyObj => {
                                    count++;
                                    if (keyObj.key && (keyObj.key.indexOf(key1) >= 0 || keyObj.key.indexOf(key2) >= 0 || keyObj.key.indexOf(key) >= 0)) {
                                        this.delKey(keyObj.key);
                                    } else {
                                        // this._log(`skip ${keyObj.key}`)
                                    }
                                });
                            } else {
                                count++;
                                if (response.key && (response.key.indexOf(key1) >= 0 || response.key.indexOf(key2) >= 0 || response.key.indexOf(key) >= 0)) {
                                    this.delKey(response.key);
                                } else {
                                    // this._log(`skip ${response.key}`)
                                }
                            }
                        }

                        len--;

                        if (len === 0) {
                            this.memcached.end();
                            let end = new Date();
                            this._log(`>>>>>>  clean success in ${end.getTime() - begin.getTime()} , all keys: ${count}`)
                        }
                    })
                })
            })
        });
    }

    _log(message) {
        ws.broadcast(`/api_cache/log`, {
            host: this.host,
            msg: message
        });
    }
}

module.exports = ApiCache;