Authored by htoooth

init

  1 +{
  2 + "extends": "yoho",
  3 + "parserOptions": {
  4 + "sourceType": "module",
  5 + "ecmaVersion": 2017
  6 + }
  7 +}
  1 +node_modules/
  2 +package-lock.json
  3 +coverage/
  4 +.nyc_output/
  1 +# influx-batch-sender
  2 +
  3 +buffer message and send them in a bulk.
  4 +
  5 +## useage
  6 +
  7 +``` javascript
  8 +const Sender = require('../index');
  9 +
  10 +const sender = new Sender({
  11 + host: 'influxd.yoho.cn',
  12 + db: 'web-apm',
  13 + measurement: 'api-duration',
  14 + duration: 2000, // 多长时间发送一次,默认值是 2000ms
  15 + records: 200, // 累积多少条消息发送一次,默认值是100
  16 + immediate: true // 是否立刻发送消息,设置为 true 会忽略 records 设置,默认值是 false
  17 + path: "/url", // 重新设置发送消息的路径,默认值是 /write
  18 +});
  19 +
  20 +setInterval(() => {
  21 + sender.addMessage({
  22 + tags: {
  23 + reqid: 'sdasda'+Math.random(),
  24 + route: 'test'
  25 + },
  26 + fields: {
  27 + dasd: "d123asda"
  28 + }
  29 + });
  30 +}, Math.random() * 100)
  31 +
  32 +```
  1 +/**
  2 + * test addmessages
  3 + */
  4 +const Sender = require('../index');
  5 +
  6 +const sender = new Sender({
  7 + host: 'influxd.yoho.cn',
  8 + db: 'web-apm',
  9 + measurement: 'api-duration',
  10 + duration: 2000,
  11 + records: 200
  12 +});
  13 +
  14 +let num = 1;
  15 +
  16 +let num2 = 1;
  17 +
  18 +setInterval(() => {
  19 + if (num > 10000) {
  20 + console.log('1 done!');
  21 + return;
  22 + }
  23 + sender.addMessage({
  24 + tags: {
  25 + reqid: num + '',
  26 + route: 'test'
  27 + },
  28 + fields: {
  29 + dasd: 'd123asda'
  30 + }
  31 + });
  32 + num++;
  33 +}, Math.random() * 10);
  34 +
  35 +
  36 +setInterval(() => {
  37 + if (num2 > 10000) {
  38 + console.log('2 done!');
  39 + return;
  40 + }
  41 + sender.addMessage({
  42 + tags: {
  43 + reqid: num2 + '',
  44 + route: 'test2'
  45 + },
  46 + fields: {
  47 + dasd: '12131'
  48 + }
  49 + });
  50 + num2++;
  51 +}, Math.random() * 10);
  1 +const debug = require('debug')('apm-agent');
  2 +const EventEmitter = require('events');
  3 +const request = require('request');
  4 +
  5 +/**
  6 + * buffer message and send them in a bulk
  7 + *
  8 + * const options =
  9 + * {
  10 + * host:'xxx.xxx.xxx',
  11 + * port:80,
  12 + * db:'xxxxx',
  13 + * measurement:'api-duration'
  14 + * duration:2000 //ms per 200 send message,
  15 + * records:100 //when message over 100 send them
  16 + * immediate: true // send message immediate not wart records count if true
  17 + * path: "/url", // set target path default is /write
  18 + * }
  19 + */
  20 +
  21 +class Sender extends EventEmitter {
  22 + constructor(options) {
  23 + super();
  24 + this.bulks = [];
  25 + this.batchMessages = [];
  26 + this.options = options;
  27 + this.options.duration = this.options.duration || 2000;
  28 + this.options.records = this.options.records || 100;
  29 + this.options.path = this.options.path || '/write';
  30 + this.options.port = this.options.port || 80;
  31 +
  32 + if (!this.options.db || !this.options.host) {
  33 + console.error('config error: db or host undefined!');
  34 + return;
  35 + }
  36 +
  37 + if (this.options.immediate) {
  38 + return;
  39 + }
  40 +
  41 + // pre duration send array
  42 + setInterval(() => {
  43 + this._send();
  44 + debug('batchMessages duration sent!');
  45 + }, this.options.duration);
  46 + }
  47 +
  48 + /**
  49 + * add message to batchMessage
  50 + *
  51 + * @example
  52 + * {
  53 + * measurement:'test',
  54 + * tags:{
  55 + * type:'api',
  56 + * preqID:'MdHy21313',
  57 + * api:'app.brand.newBrandList',
  58 + * route:'/sada/dsa/test'
  59 + * },
  60 + * fields:{
  61 + * duration:123
  62 + * }
  63 + * }
  64 + * @param {Object} message
  65 + */
  66 + addMessage(message) {
  67 + message.time = message.time || new Date().getTime() * 1000000;
  68 + debug('add message! %O', message);
  69 + this.batchMessages.push(message);
  70 +
  71 + if ((this.batchMessages.length > this.options.records) || this.options.immediate) {
  72 + this._send();
  73 + debug('batchMessages over records and sent!');
  74 + }
  75 + }
  76 +
  77 + /**
  78 + * use tcp send message
  79 + * @private
  80 + */
  81 + _send() {
  82 + if (!this.options.db || !this.options.host) {
  83 + return;
  84 + }
  85 +
  86 + const len = this.batchMessages.length;
  87 +
  88 + if (len < 1) {
  89 + debug('batchMessages is empty!');
  90 + return;
  91 + }
  92 +
  93 + const bulk = this.batchMessages.splice(0, len);
  94 +
  95 + debug('send bulkMessages! %O', bulk);
  96 + const options = {
  97 + headers: {
  98 + 'content-type': 'application/json'
  99 + },
  100 + url: `http://${this.options.host}:${this.options.port}${this.options.path}`,
  101 + qs: {
  102 + db: this.options.db
  103 + },
  104 + method: 'POST',
  105 + body: JSON.stringify(bulk)
  106 + };
  107 +
  108 + request(options, (error, res, body) => {
  109 + if (error) {
  110 + debug('send error: %O', error);
  111 + console.error(error);
  112 + this.emit('sendError', error);
  113 + return;
  114 + }
  115 + debug('status code: %o', res.statusCode);
  116 + if (res.statusCode !== 204) {
  117 + console.error(`Send failed! statusCode:${res.statusCode}`);
  118 + console.error(body);
  119 + this.emit('failed', res.statusCode);
  120 + } else {
  121 + this.emit('ok', res.statusCode);
  122 + }
  123 + });
  124 + }
  125 +}
  126 +
  127 +module.exports = Sender;
  1 +{
  2 + "name": "yoho-apm-agent",
  3 + "version": "0.1.12",
  4 + "main": "index.js",
  5 + "scripts": {
  6 + "test": "node_modules/.bin/nyc node_modules/.bin/ava",
  7 + "posttest": "node_modules/.bin/nyc report --reporter=html",
  8 + "lint": "node node_modules/eslint\bin/eslint ./*.js ./example/*.js"
  9 + },
  10 + "dependencies": {
  11 + "debug": "^3.0.1",
  12 + "lodash": "^4.17.4",
  13 + "request": "^2.83.0"
  14 + },
  15 + "devDependencies": {
  16 + "ava": "^0.15.2",
  17 + "eslint": "^4.4.1",
  18 + "eslint-config-yoho": "^1.0.1",
  19 + "nyc": "^11.1.0"
  20 + }
  21 +}
  1 +const test = require('ava');
  2 +const Sender = require('../index');
  3 +
  4 +test.before(() => {
  5 + const http = require('http');
  6 +
  7 + const server = http.createServer((req, res) => {
  8 + res.statusCode = 204;
  9 + res.end('');
  10 + });
  11 +
  12 + server.listen(3003);
  13 +});
  14 +
  15 +test('test normal send message', t => {
  16 + const sender = new Sender({
  17 + host: 'localhost',
  18 + port: 3003,
  19 + db: 'test',
  20 + measurement: 'test2'
  21 + });
  22 +
  23 + sender.addMessage({
  24 + tags: {
  25 + a: 'a',
  26 + b: 'b'
  27 + },
  28 + fields: {
  29 + a: 'a',
  30 + b: 'b'
  31 + }
  32 + });
  33 +
  34 + sender.on('ok', code => {
  35 + t.is(code, 204);
  36 + });
  37 +
  38 +});
  39 +
  40 +
  41 +test('test normal send message immediately', t => {
  42 + const sender = new Sender({
  43 + host: 'localhost',
  44 + port: 3003,
  45 + db: 'test',
  46 + measurement: 'test2',
  47 + immediate: true
  48 + });
  49 +
  50 + sender.addMessage({
  51 + tags: {
  52 + a: 'a',
  53 + b: 'b'
  54 + },
  55 + fields: {
  56 + a: 'a',
  57 + b: 'b'
  58 + }
  59 + });
  60 +
  61 + sender.on('ok', code => {
  62 + t.is(code, 204);
  63 + });
  64 +
  65 +});
  66 +
  67 +
  68 +test('test send json message immediately', t => {
  69 + const sender = new Sender({
  70 + host: 'localhost',
  71 + port: 3003,
  72 + db: 'test',
  73 + measurement: 'test2',
  74 + immediate: true,
  75 + path: '/alert'
  76 + });
  77 +
  78 + sender.addMessage({
  79 + tags: {
  80 + a: 'a',
  81 + b: 'b'
  82 + },
  83 + fields: {
  84 + a: 'a',
  85 + b: 'b'
  86 + }
  87 + });
  88 +
  89 + sender.on('ok', code => {
  90 + t.is(code, 204);
  91 + });
  92 +
  93 +});
This diff could not be displayed because it is too large.