|
|
/**
|
|
|
* 跟踪监控工具, 将监控数据写入influxdb
|
|
|
*
|
|
|
* @usage:
|
|
|
* <code>
|
|
|
* let trace = new Trace({
|
|
|
* host: '54.222.219.223',
|
|
|
* port: 4444
|
|
|
* });
|
|
|
*
|
|
|
* let testTrace = trace.trace('test_key'); // createOrChoose a measurement
|
|
|
*
|
|
|
* // testTrace(someTags, someFields);
|
|
|
* testTrace({ foo: 'bar', foobar: 'baz2'}, {value: 123, value2: 'aaa 123', value3: 1.3, value4: false});
|
|
|
*
|
|
|
* </code>
|
|
|
*
|
|
|
* @author: jiangfeng<jeff.jiang@yoho.cn>
|
|
|
* @date: 16/8/1
|
|
|
*/
|
|
|
|
|
|
'use strict';
|
|
|
|
|
|
const dgram = require('dgram');
|
|
|
const influx = require('influx');
|
|
|
const _ = require('lodash');
|
|
|
|
|
|
class Trace {
|
|
|
|
|
|
/**
|
|
|
* influxdb connection config
|
|
|
*
|
|
|
* @param options
|
|
|
*/
|
|
|
constructor(options) {
|
|
|
this.options = options;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* create or choose a measurement to write point in.
|
|
|
* @param name {string} the measurement name
|
|
|
* @param options {object} some options. the protocol of influxdb connection.
|
|
|
* @returns {function()} a point write function
|
|
|
*/
|
|
|
trace(name, options) {
|
|
|
options = _.assign({
|
|
|
protocol: 'udp'
|
|
|
}, options);
|
|
|
|
|
|
let self = this;
|
|
|
|
|
|
if (options.protocol === 'udp') {
|
|
|
return (key, fields) => {
|
|
|
return self.udpTrace(name, options, key, fields);
|
|
|
};
|
|
|
} else if (options.protocol === 'http') {
|
|
|
return (key, fields) => {
|
|
|
return self.httpTrace(name, options, key, fields);
|
|
|
};
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* write point into influxdb by UDP
|
|
|
*
|
|
|
* @param name {string} the measurement name
|
|
|
* @param options {object}
|
|
|
* @param key {object} some keys of data
|
|
|
* @param fields {object} some fields of data
|
|
|
* @returns {Promise}
|
|
|
*/
|
|
|
udpTrace(name, options, key, fields) {
|
|
|
|
|
|
if (_.isArray(key)) {
|
|
|
key.forEach(p => {
|
|
|
let line = `${this._escape(name)},${this._makeLine(p)}`;
|
|
|
|
|
|
return this._updPostLine(line);
|
|
|
});
|
|
|
} else {
|
|
|
let line = `${this._escape(name)},${this._makeLine(key, false)} ${this._makeLine(fields, true)}`;
|
|
|
|
|
|
return this._updPostLine(line);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* upd send.
|
|
|
* @param line {string} @see infulxdb's line protocol
|
|
|
* @returns {Promise}
|
|
|
* @private
|
|
|
*/
|
|
|
_updPostLine(line) {
|
|
|
let self = this;
|
|
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
let socket = dgram.createSocket("udp4");
|
|
|
let buff = new Buffer(line);
|
|
|
|
|
|
|
|
|
socket.send(buff, 0, buff.length, self.options.port, self.options.host, (err, rp) => {
|
|
|
socket.close();
|
|
|
|
|
|
console.log(rp);
|
|
|
if (err) {
|
|
|
reject(err);
|
|
|
} else {
|
|
|
resolve();
|
|
|
}
|
|
|
});
|
|
|
});
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* make the data with influxdb's line protocol.
|
|
|
* @see https://docs.influxdata.com/influxdb/v0.13/write_protocols/line/
|
|
|
*
|
|
|
* @param data {object}
|
|
|
* @param withNoQuote {boolean}
|
|
|
* @returns {string}
|
|
|
* @private
|
|
|
*/
|
|
|
_makeLine(data, withNoQuote) {
|
|
|
if (_.isObject(data)) {
|
|
|
return Object.keys(data).map(key => {
|
|
|
return key + '=' + this._escape(data[key], withNoQuote);
|
|
|
}).join(',');
|
|
|
} else if (_.isArray(data)) {
|
|
|
return data.map(d => {
|
|
|
return this._makeLine(d, escape);
|
|
|
}).join(' ');
|
|
|
} else if (!_.isNil(data)) {
|
|
|
return 'value=' + this._escape(data, withNoQuote);
|
|
|
} else {
|
|
|
return '';
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* data escape with influxdb's line protocol.
|
|
|
*
|
|
|
* @param value {*}
|
|
|
* @param withQuote {boolean}
|
|
|
* @returns {*}
|
|
|
* @private
|
|
|
*/
|
|
|
_escape(value, withQuote) {
|
|
|
if (_.isString(value)) {
|
|
|
value = _.replace(value, /,/g, '\\,');
|
|
|
value = _.replace(value, /=/g, '\\=');
|
|
|
|
|
|
if (withQuote) {
|
|
|
value = '"' + value + '"';
|
|
|
} else {
|
|
|
value = _.replace(value, /\s/g, '\\ ');
|
|
|
}
|
|
|
} else if (_.isInteger(value)) {
|
|
|
if (withQuote) {
|
|
|
value = value + 'i';
|
|
|
}
|
|
|
}
|
|
|
|
|
|
return value;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* write point into influxdb by HTTP. use the open source node-influx module.
|
|
|
* @see https://github.com/node-influx/node-influx
|
|
|
*
|
|
|
* @param name {string} the measurement name
|
|
|
* @param options {object}
|
|
|
* @param key {object} some keys of data
|
|
|
* @param fields {object} some fields of data
|
|
|
* @returns {Promise}
|
|
|
*/
|
|
|
httpTrace(name, options, key, fields) {
|
|
|
let client = this.getHttpClient();
|
|
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
if (_.isArray(key)) {
|
|
|
client.writePoints(name, key, options, (err, rp) => {
|
|
|
if (err) {
|
|
|
reject(err);
|
|
|
} else {
|
|
|
resolve(rp)
|
|
|
}
|
|
|
});
|
|
|
} else {
|
|
|
client.writePoint(name, fields, key, options, (err, rp) => {
|
|
|
if (err) {
|
|
|
reject(err);
|
|
|
} else {
|
|
|
resolve(rp)
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* the singleton http client.
|
|
|
*
|
|
|
* @returns {*}
|
|
|
*/
|
|
|
getHttpClient() {
|
|
|
if (!this.httpClient) {
|
|
|
this.httpClient = influx(this.options);
|
|
|
}
|
|
|
|
|
|
return this.httpClient;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
module.exports = Trace; |
...
|
...
|
|