Authored by 周奇琪

project init

  1 +node_modules/
  2 +package-lock.json
  1 +# influx-batch-sender
  2 +
  3 +buffer message and send them in a bulk.
  4 +
  5 +## useage
  6 +
  7 +``` javascript
  8 +const Sender = require('../index');
  9 +
  10 +const sender = new Sender({
  11 + host: 'influxd.yoho.cn',
  12 + db: 'web-apm',
  13 + measurement: 'api-duration',
  14 + duration: 2000,
  15 + records:200
  16 +});
  17 +
  18 +setInterval(() => {
  19 + sender.addMessage({
  20 + tags: {
  21 + reqid: 'sdasda'+Math.random(),
  22 + route: 'test'
  23 + },
  24 + fields: {
  25 + dasd: "d123asda"
  26 + }
  27 + });
  28 +}, Math.random() * 100)
  29 +
  30 +```
  1 +/**
  2 + * test addmessages
  3 + */
  4 +const Sender = require('../index');
  5 +
  6 +const sender = new Sender({
  7 + host: 'influxd.yoho.cn',
  8 + db: 'web-apm',
  9 + measurement: 'api-duration',
  10 + duration: 2000,
  11 + records:200
  12 +});
  13 +
  14 +setInterval(() => {
  15 + sender.addMessage({
  16 + tags: {
  17 + reqid: 'sdasda'+Math.random(),
  18 + route: 'test'
  19 + },
  20 + fields: {
  21 + dasd: "d123asda"
  22 + }
  23 + });
  24 +}, Math.random() * 100)
  1 +const http = require('http');
  2 +var _ = require('lodash');
  3 +/**
  4 + *
  5 + * const options =
  6 + * {
  7 + * host:'xxx.xxx.xxx',
  8 + * db:'xxxxx',
  9 + * measurement:'api-duration'
  10 + * duration:2000 //ms per 200 send message,
  11 + * records:100 //when message over 100 send them
  12 + * }
  13 + */
  14 +class Sender {
  15 + constructor(options) {
  16 + this.bulks = [];
  17 + this.batchMessages = [];
  18 + this.options = options;
  19 + this.options.duration = this.options.duration || 2000;
  20 + this.options.records = this.options.records || 100;
  21 +
  22 + let addBulkTime = new Date().getTime();
  23 +
  24 + setInterval(() => {
  25 + const now = new Date().getTime();
  26 + if ((addBulkTime + this.options.duration < now ||
  27 + this.batchMessages.length > this.options.records) &&
  28 + this.batchMessages.length > 0) {
  29 + this.bulks.push(this.batchMessages);
  30 + this.batchMessages = [];
  31 + addBulkTime = new Date().getTime();
  32 + }
  33 +
  34 + }, 100);
  35 +
  36 + setInterval(() => {
  37 + if (this.bulks.length > 0) {
  38 + this._send(this.bulks.shift());
  39 + }
  40 + }, 100);
  41 + }
  42 +
  43 + // add message to batchMessage
  44 + addMessage(message) {
  45 + message.time = message.time || new Date().getTime() * 1000000;
  46 + this.batchMessages.push(message);
  47 + }
  48 +
  49 +
  50 + _makeline(obj, withQuote = false) {
  51 + let arr = []
  52 + for (let key in obj) {
  53 + const value = this._escape(obj[key], withQuote);
  54 + arr.push(`${key}=${value}`);
  55 + }
  56 + return arr.join(',');
  57 + }
  58 +
  59 + //use tcp send message
  60 + _send(bulk) {
  61 + if (!this.options.db) {
  62 + throw new Error('db options must be set when inital Sender!');
  63 + return;
  64 + }
  65 +
  66 + if (!this.options.measurement) {
  67 + throw new Error('measurement options must be set when inital Sender!');
  68 + return;
  69 + }
  70 +
  71 + let bulkMessage = [];
  72 + for (let i = 0; i < bulk.length; i++) {
  73 + const tagStr = this._makeline(bulk[i].tags);
  74 + const fieldStr = this._makeline(bulk[i].fields, true);
  75 + const messageStr = `${this.options.measurement},${tagStr} ${fieldStr} ${bulk[i].time}`;
  76 + bulkMessage.push(messageStr);
  77 + }
  78 +
  79 + const data = bulkMessage.join('\n');
  80 +
  81 + const options = {
  82 + hostname: this.options.host,
  83 + port: this.options.port || 80,
  84 + path: `/write?db=${this.options.db}`,
  85 + method: 'POST',
  86 + headers: {
  87 + 'Content-Length': Buffer.byteLength(data)
  88 + }
  89 + };
  90 +
  91 + const req = http.request(options, (res) => {
  92 + console.log(`status code: ${res.statusCode}`);
  93 + });
  94 +
  95 + req.on('error', (e) => {
  96 + return Promise.reject(e);
  97 + });
  98 +
  99 + req.write(data);
  100 + req.end();
  101 + }
  102 +
  103 + //trans to line string
  104 + _escape(value, withQuote) {
  105 + if (_.isString(value)) {
  106 + if (withQuote) {
  107 + value = '"' + value + '"';
  108 + } else {
  109 + value = _.replace(value, /,/g, '\\,');
  110 + value = _.replace(value, /=/g, '\\=');
  111 + value = _.replace(value, /\s/g, '\\ ');
  112 + }
  113 + } else if (_.isInteger(value)) {
  114 + if (withQuote) {
  115 + value = value + 'i';
  116 + }
  117 + } else if (_.isObject(value)) {
  118 + value = '"' + _.replace(JSON.stringify(value), /"/g, '\\"') + '"';
  119 + } else if (_.isNull(value) || _.isNil(value)) {
  120 + value = '""';
  121 + }
  122 + return value;
  123 + };
  124 +}
  125 +
  126 +module.exports = Sender;
  1 +{
  2 + "name": "influx-batch-sender",
  3 + "version": "0.1.0",
  4 + "dependencies": {
  5 + "lodash": "^4.17.4"
  6 + }
  7 +}