index.js 3.03 KB
const debug = require('debug')('apm-agent');
const EventEmitter = require('events');
const request = require('request');

/**
 * buffer message and send them in a bulk
 *
 * const options =
 * {
 *   host:'xxx.xxx.xxx',
 *   port:80,
 *   db:'xxxxx',
 *   measurement:'api-duration'
 *   duration:2000 //ms per 200 send message,
 *   records:100 //when message over 100 send them
 *   immediate: true // send message immediate not wart records count if true
 *   path: "/url", // set target path default is /write
 * }
 */

class Sender extends EventEmitter {
  constructor(options) {
    super();
    this.bulks = [];
    this.batchMessages = [];
    this.options = options;
    this.options.duration = this.options.duration || 2000;
    this.options.records = this.options.records || 100;
    this.options.path = this.options.path || '/write';
    this.options.port = this.options.port || 80;

    if (!this.options.db || !this.options.host) {
      console.error('config error: db or host undefined!');
      return;
    }

    if (this.options.immediate) {
      return;
    }

    // pre duration send array
    setInterval(() => {
      this._send();
      debug('batchMessages duration sent!');
    }, this.options.duration);
  }

  /**
     * add message to batchMessage
     *
     * @example
     * {
     *   measurement:'test',
     *   tags:{
     *     type:'api',
     *     preqID:'MdHy21313',
     *     api:'app.brand.newBrandList',
     *     route:'/sada/dsa/test'
     *   },
     *   fields:{
     *     duration:123
     *   }
     * }
     * @param {Object} message
     */
  addMessage(message) {
    message.time = message.time || new Date().getTime();
    debug('add message! %O', message);
    this.batchMessages.push(message);

    if ((this.batchMessages.length > this.options.records) || this.options.immediate) {
      this._send();
      debug('batchMessages over records and sent!');
    }
  }

  /**
     * use tcp send message
     * @private
     */
  _send() {
    if (!this.options.db || !this.options.host) {
      return;
    }

    const len = this.batchMessages.length;

    if (len < 1) {
      debug('batchMessages is empty!');
      return;
    }

    const bulk = this.batchMessages.splice(0, len);

    debug('send bulkMessages! %O', bulk);
    const options = {
      headers: {
        'content-type': 'application/json'
      },
      url: `http://${this.options.host}:${this.options.port}${this.options.path}`,
      qs: {
        db: this.options.db
      },
      method: 'POST',
      body: JSON.stringify(bulk)
    };

    request(options, (error, res, body) => {
      if (error) {
        debug('send error: %O', error);
        console.error(error);
        this.emit('sendError', error);
        return;
      }
      debug('status code: %o', res.statusCode);
      if (res.statusCode !== 204) {
        console.error(`Send failed! statusCode:${res.statusCode}`);
        console.error(body);
        this.emit('failed', res.statusCode);
      } else {
        this.emit('ok', res.statusCode);
      }
    });
  }
}

module.exports = Sender;