index.js 3.55 KB
const http = require('http');
var _ = require('lodash');
/**
 * 
 * const options = 
 * {
 *   host:'xxx.xxx.xxx',
 *   db:'xxxxx',
 *   measurement:'api-duration'
 *   duration:2000 //ms per 200 send message,
 *   records:100 //when message over 100 send them
 * }
 */
class Sender {
    constructor(options) {
        this.bulks = [];
        this.batchMessages = [];
        this.options = options;
        this.options.duration = this.options.duration || 2000;
        this.options.records = this.options.records || 100;

        let addBulkTime = new Date().getTime();

        setInterval(() => {
            const now = new Date().getTime();
            if ((addBulkTime + this.options.duration < now ||
                    this.batchMessages.length > this.options.records) &&
                this.batchMessages.length > 0) {
                this.bulks.push(this.batchMessages);
                this.batchMessages = [];
                addBulkTime = new Date().getTime();
            }

        }, 100);

        setInterval(() => {
            if (this.bulks.length > 0) {
                this._send(this.bulks.shift());
            }
        }, 100);
    }

    // add message to batchMessage
    addMessage(message) {
        message.time = message.time || new Date().getTime() * 1000000;
        this.batchMessages.push(message);
    }


    _makeline(obj, withQuote = false) {
        let arr = []
        for (let key in obj) {
            const value = this._escape(obj[key], withQuote);
            arr.push(`${key}=${value}`);
        }
        return arr.join(',');
    }

    //use tcp send message
    _send(bulk) {
        if (!this.options.db) {
            throw new Error('db options must be set when inital Sender!');
            return;
        }

        if (!this.options.measurement) {
            throw new Error('measurement options must be set when inital Sender!');
            return;
        }

        let bulkMessage = [];
        for (let i = 0; i < bulk.length; i++) {
            const tagStr = this._makeline(bulk[i].tags);
            const fieldStr = this._makeline(bulk[i].fields, true);
            const messageStr = `${this.options.measurement},${tagStr} ${fieldStr} ${bulk[i].time}`;
            bulkMessage.push(messageStr);
        }

        const data = bulkMessage.join('\n');

        const options = {
            hostname: this.options.host,
            port: this.options.port || 80,
            path: `/write?db=${this.options.db}`,
            method: 'POST',
            headers: {
                'Content-Length': Buffer.byteLength(data)
            }
        };

        const req = http.request(options, (res) => {
            console.log(`status code: ${res.statusCode}`);
        });

        req.on('error', (e) => {
            return Promise.reject(e);
        });

        req.write(data);
        req.end();
    }

    //trans to line string
    _escape(value, withQuote) {
        if (_.isString(value)) {
            if (withQuote) {
                value = '"' + value + '"';
            } else {
                value = _.replace(value, /,/g, '\\,');
                value = _.replace(value, /=/g, '\\=');
                value = _.replace(value, /\s/g, '\\ ');
            }
        } else if (_.isInteger(value)) {
            if (withQuote) {
                value = value + 'i';
            }
        } else if (_.isObject(value)) {
            value = '"' + _.replace(JSON.stringify(value), /"/g, '\\"') + '"';
        } else if (_.isNull(value) || _.isNil(value)) {
            value = '""';
        }
        return value;
    };
}

module.exports = Sender;