trace.js 5.52 KB
/**
 * 跟踪监控工具, 将监控数据写入influxdb
 *
 * @usage:
 * <code>
 *     let trace = new Trace({
 *          host: '54.222.219.223',
 *          port: 4444
 *     });
 *
 *     let testTrace = trace.trace('test_key');  // createOrChoose a measurement
 *
 *     // testTrace(someTags, someFields);
 *     testTrace({ foo: 'bar', foobar: 'baz2'}, {value: 123, value2: 'aaa 123', value3: 1.3, value4: false});
 *
 * </code>
 *
 * @author: jiangfeng<jeff.jiang@yoho.cn>
 * @date: 16/8/1
 */

'use strict';

const dgram = require('dgram');
const influx = require('influx');
const _ = require('lodash');

class Trace {

    /**
     * influxdb connection config
     *
     * @param options
     */
    constructor(options) {
        this.options = options;
    }

    /**
     * create or choose a measurement to write point in.
     * @param name {string} the measurement name
     * @param options {object} some options. the protocol of influxdb connection.
     * @returns {function()} a point write function
     */
    trace(name, options) {
        options = _.assign({
            protocol: 'udp'
        }, options);

        let self = this;

        if (options.protocol === 'udp') {
            return (key, fields) => {
                return self.udpTrace(name, options, key, fields);
            };
        } else if (options.protocol === 'http') {
            return (key, fields) => {
                return self.httpTrace(name, options, key, fields);
            };
        }
    }

    /**
     * write point into influxdb by UDP
     *
     * @param name {string} the measurement name
     * @param options {object}
     * @param key {object} some keys of data
     * @param fields {object} some fields of data
     * @returns {Promise}
     */
    udpTrace(name, options, key, fields) {

        if (_.isArray(key)) {
            key.forEach(p => {
                let line = `${this._escape(name)},${this._makeLine(p)}`;

                return this._updPostLine(line);
            });
        } else {
            let line = `${this._escape(name)},${this._makeLine(key, false)} ${this._makeLine(fields, true)}`;

            return this._updPostLine(line);
        }
    }

    /**
     * upd send.
     * @param line {string} @see infulxdb's line protocol
     * @returns {Promise}
     * @private
     */
    _updPostLine(line) {
        let self = this;

        return new Promise((resolve, reject) => {
            let socket = dgram.createSocket("udp4");
            let buff = new Buffer(line);


            socket.send(buff, 0, buff.length, self.options.port, self.options.host, (err, rp) => {
                socket.close();

                console.log(rp);
                if (err) {
                    reject(err);
                } else {
                    resolve();
                }
            });
        });
    }

    /**
     * make the data with influxdb's line protocol.
     * @see https://docs.influxdata.com/influxdb/v0.13/write_protocols/line/
     *
     * @param data {object}
     * @param withNoQuote {boolean}
     * @returns {string}
     * @private
     */
    _makeLine(data, withNoQuote) {
        if (_.isObject(data)) {
            return Object.keys(data).map(key => {
                return key + '=' + this._escape(data[key], withNoQuote);
            }).join(',');
        } else if (_.isArray(data)) {
            return data.map(d => {
                return this._makeLine(d, escape);
            }).join(' ');
        } else if (!_.isNil(data)) {
            return 'value=' + this._escape(data, withNoQuote);
        } else {
            return '';
        }
    }

    /**
     * data escape with influxdb's line protocol.
     *
     * @param value {*}
     * @param withQuote {boolean}
     * @returns {*}
     * @private
     */
    _escape(value, withQuote) {
        if (_.isString(value)) {
            value = _.replace(value, /,/g, '\\,');
            value = _.replace(value, /=/g, '\\=');

            if (withQuote) {
                value = '"' + value + '"';
            } else {
                value = _.replace(value, /\s/g, '\\ ');
            }
        } else if (_.isInteger(value)) {
            if (withQuote) {
                value = value + 'i';
            }
        }

        return value;
    }

    /**
     * write point into influxdb by HTTP. use the open source node-influx module.
     * @see https://github.com/node-influx/node-influx
     *
     * @param name {string} the measurement name
     * @param options {object}
     * @param key {object} some keys of data
     * @param fields {object} some fields of data
     * @returns {Promise}
     */
    httpTrace(name, options, key, fields) {
        let client = this.getHttpClient();

        return new Promise((resolve, reject) => {
            if (_.isArray(key)) {
                client.writePoints(name, key, options, (err, rp) => {
                    if (err) {
                        reject(err);
                    } else {
                        resolve(rp)
                    }
                });
            } else {
                client.writePoint(name, fields, key, options, (err, rp) => {
                    if (err) {
                        reject(err);
                    } else {
                        resolve(rp)
                    }
                });
            }
        });
    }

    /**
     * the singleton http client.
     *
     * @returns {*}
     */
    getHttpClient() {
        if (!this.httpClient) {
            this.httpClient = influx(this.options);
        }

        return this.httpClient;
    }
}


module.exports = Trace;