index.js
5.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
const http = require('http');
const _ = require('lodash');
const debug = require('debug')('influx-sender');
const EventEmitter = require('events');
const request = require('request');
/**
* buffer message and send them in a bulk
*
* const options =
* {
* host:'xxx.xxx.xxx',
* port:80,
* db:'xxxxx',
* measurement:'api-duration'
* duration:2000 //ms per 200 send message,
* records:100 //when message over 100 send them
* }
*/
class Sender extends EventEmitter {
constructor(options) {
super();
this.bulks = [];
this.batchMessages = [];
this.options = options;
this.options.duration = this.options.duration || 2000;
this.options.records = this.options.records || 100;
if (!this.options.db || !this.options.host) {
console.error('config error: db or host undefined!');
return;
}
if (this.options.immediate) {
return;
}
// pre duration send array
setInterval(() => {
this._send();
debug('batchMessages duration sent!');
}, this.options.duration);
}
/**
* add message to batchMessage
*
* @example
* {
* measurement:'test',
* tags:{
* type:'api',
* preqID:'MdHy21313',
* api:'app.brand.newBrandList',
* route:'/sada/dsa/test'
* },
* fields:{
* duration:123
* }
* }
* @param {Object} message
*/
addMessage(message) {
message.time = message.time || new Date().getTime() * 1000000;
debug('add message! %O', message);
this.batchMessages.push(message);
if ((this.batchMessages.length > this.options.records) || this.options.immediate) {
this._send();
debug('batchMessages over records and sent!');
}
}
/**
* make the data with influxdb's line protocol.
* @see https://docs.influxdata.com/influxdb/v0.13/write_protocols/line/
* @param {Object} obj
* @param {Boolean} withQuote
* @private
*/
_makeline(obj, withQuote = false) {
let arr = [];
_.forEach(obj, (n, key) => {
const value = this._escape(obj[key], withQuote);
arr.push(`${key}=${value}`);
});
return arr.join(',');
}
/**
* use tcp send message
* @private
*/
_send() {
if (!this.options.db || !this.options.host) {
return;
}
const len = this.batchMessages.length;
if (len < 1) {
debug('batchMessages is empty!');
return;
}
const bulk = this.batchMessages.splice(0, len);
let bulkMessage = [];
for (let i = 0; i < bulk.length; i++) {
const measurement = bulk[i].measurement || this.options.measurement;
if (!measurement) {
throw new Error('measurement options must be set when inital Sender!');
}
const tagStr = this._makeline(bulk[i].tags);
const fieldStr = this._makeline(bulk[i].fields, true);
const messageStr = `${measurement},${tagStr} ${fieldStr} ${bulk[i].time}`;
bulkMessage.push(messageStr);
}
const data = bulkMessage.join('\n');
debug('send bulkMessages! %O', data);
const options = {
headers: {
'content-type': 'text/plain'
},
url: `http://${this.options.host}:${this.options.port || 80}/write`,
qs: {
db: this.options.db
},
method: 'POST',
body: data
};
request(options, (error, res, body) => {
if (error) {
debug('send error: %O', error);
console.error(error);
this.emit('sendError', error);
}
debug('status code: %o', res.statusCode);
if (res.statusCode !== 204) {
console.error(`Send failed! statusCode:${res.statusCode}`);
console.error(body);
this.emit('failed', res.statusCode);
} else {
this.emit('ok', res.statusCode);
}
});
}
/**
* trans to line string
* @param {String} value
* @param {Boolean} withQuote
* @private
*/
_escape(value, withQuote) {
if (_.isString(value)) {
if (withQuote) {
value = '"' + value + '"';
} else {
value = _.replace(value, /,/g, '\\,');
value = _.replace(value, /=/g, '\\=');
value = _.replace(value, /\s/g, '\\ ');
}
} else if (_.isInteger(value)) {
if (withQuote) {
value = value + 'i';
}
} else if (_.isObject(value)) {
value = '"' + _.replace(JSON.stringify(value), /"/g, '\\"') + '"';
} else if (_.isNull(value) || _.isNil(value)) {
value = '""';
}
return value;
}
}
module.exports = Sender;