Authored by 沈志敏

add server data into influxdb

/**
* 采集数据
*
* @class Collect
* @author shenzm<zhimin.shen@yoho.cn>
* @date 2016/10/12
*/
import ssh from 'ssh2';
import Rp from 'request-promise';
import Trace from '../logger/trace.js';
import config from '../../config/config.js';
import {
Server
} from '../models';
var tracer = new Trace({
host: config.influxdb.host,
port: config.influxdb.port
});
class Collect {
constructor(host, projectname) {
this.host = host;
this.projectname = projectname;
this.scriptRunning = false;
this.retry = {};
}
async collect() {
let self = this;
let server = await Server.findByHost(self.host);
self.server = {
host: server.host,
username: server.username,
password: server.password,
port: server.port
}
let obj = {
'total': 0,
'status': {}
};
Rp({
uri: `http://${self.host}:9615`,
json: true
}).then(function(data) {
var processes = data.processes || [];
processes.forEach(function(p) {
if (p.name === 'pm2-server-monit') {
var cpuUsg = p.pm2_env.axm_monitor['CPU usage'].value;
var freeMen = p.pm2_env.axm_monitor['Free memory'].value;
obj.cpuUsg = cpuUsg ? parseFloat(cpuUsg.replace('%', '')).toFixed(2) : '';
obj.freeMen = freeMen ? parseFloat(freeMen.replace('%', '')).toFixed(2) : '';
}
if (p.name === self.projectname) {
obj.total++;
if (!obj.status[p.pm2_env.status]) {
obj.status[p.pm2_env.status] = 1;
} else {
obj.status[p.pm2_env.status]++;
}
}
});
if (obj.cpuUsg === undefined) { // install server monit
const script = 'pm2 install pm2-server-monit';
self.execScript(script);
}
// add into influxDB todo
tracer.report('server_data', {
host: self.host
}, obj);
}).catch(function(err) {
const script = 'pm2 web';
self.execScript(script);
});
}
execScript(script) {
let self = this;
if (self.scriptRunning || self.retry[`${self.host}_${script}`] > 5) {
// 脚本执行中,或者重试次数大于5次以上时,不执行脚本
return;
}
let retryCount = self.retry[`${self.host}_${script}`] || 0;
self.retry[`${self.host}_${script}`] = ++retryCount;
self.scriptRunning = true;
let conn = new ssh.Client();
conn.on('ready', () => {
self._log(`>>>>host:[${self.host}] script[${script}]`);
conn.exec(`${script}`, (err, stream) => {
if (err) {
conn.end();
self._log(`host:[${self.host}] script:[${script}] exec fail error: ${err}`);
self.scriptRunning = false;
} else {
stream.stdout.on('data', (data) => {
//self._log(data.toString());
});
stream.stderr.on('data', (data) => {
//self._log(data.toString());
});
stream.on('exit', (code) => {
conn.end();
if (code === 0) {
self._log(`host:[${self.host}] script[${script}] exec success`);
} else {
self._log(`host:[${self.host}] script[${script}] exec fail`);
}
self.scriptRunning = false;
});
}
});
}).on('error', (err) => {
self._log(`connect error ${self.host} ${err}`);
self.scriptRunning = false;
}).connect(Object.assign(self.server, {
readyTimeout: 5000
}));
}
_log(msg) {
console.log(msg);
}
}
export default Collect;
\ No newline at end of file
... ...
... ... @@ -170,7 +170,6 @@ class Deploy {
}
_log(msg) {
console.log(msg)
ws.broadcast(`/deploy/${this.project._id}/log`, {
host: this.info.host,
msg: msg
... ...
/**
* 跟踪监控工具, 将监控数据写入influxdb
*
* @usage:
* <code>
* let trace = new Trace({
* host: '54.222.219.223',
* port: 4444
* });
*
* let testTrace = trace.trace('test_key'); // createOrChoose a measurement
*
* // testTrace(someTags, someFields);
* testTrace({ foo: 'bar', foobar: 'baz2'}, {value: 123, value2: 'aaa 123', value3: 1.3, value4: false});
*
* </code>
*
* @author: jiangfeng<jeff.jiang@yoho.cn>
* @date: 16/8/1
*/
... ... @@ -36,79 +22,13 @@ class Trace {
this.options = options;
}
/**
* create or choose a measurement to write point in.
* @param name {string} the measurement name
* @param options {object} some options. the protocol of influxdb connection.
* @returns {function()} a point write function
*/
trace(name, options) {
options = _.assign({
protocol: 'udp'
}, options);
let self = this;
if (options.protocol === 'udp') {
return (key, fields) => {
return self.udpTrace(name, options, key, fields);
};
} else if (options.protocol === 'http') {
return (key, fields) => {
return self.httpTrace(name, options, key, fields);
};
report(name, keys, data) {
if (this.options.appName) {
keys.appName = this.options.appName;
}
}
/**
* write point into influxdb by UDP
*
* @param name {string} the measurement name
* @param options {object}
* @param key {object} some keys of data
* @param fields {object} some fields of data
* @returns {Promise}
*/
udpTrace(name, options, key, fields) {
if (_.isArray(key)) {
key.forEach(p => {
let line = `${this._escape(name)},${this._makeLine(p)}`;
return this._updPostLine(line);
});
} else {
let line = `${this._escape(name)},${this._makeLine(key, false)} ${this._makeLine(fields, true)}`;
return this._updPostLine(line);
}
}
/**
* upd send.
* @param line {string} @see infulxdb's line protocol
* @returns {Promise}
* @private
*/
_updPostLine(line) {
let self = this;
return new Promise((resolve, reject) => {
let socket = dgram.createSocket("udp4");
let buff = new Buffer(line);
socket.send(buff, 0, buff.length, self.options.port, self.options.host, (err, rp) => {
socket.close();
console.log(rp);
if (err) {
reject(err);
} else {
resolve();
}
});
});
let line = `${this._escape(name)},${this._makeLine(keys, false)} ${this._makeLine(data, true)}`;
return this._send(line);
}
/**
... ... @@ -137,79 +57,51 @@ class Trace {
}
/**
* data escape with influxdb's line protocol.
*
* @param value {*}
* @param withQuote {boolean}
* @returns {*}
* upd send.
* @param line {string} @see infulxdb's line protocol
* @returns {Promise}
* @private
*/
_send(line) {
let self = this;
return new Promise((resolve, reject) => {
let socket = dgram.createSocket('udp4');
let buff = new Buffer(line);
socket.send(buff, 0, buff.length, self.options.port, self.options.host, (err) => {
socket.close();
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
_escape(value, withQuote) {
if (_.isString(value)) {
value = _.replace(value, /,/g, '\\,');
value = _.replace(value, /=/g, '\\=');
if (withQuote) {
value = '"' + value + '"';
} else {
value = _.replace(value, /,/g, '\\,');
value = _.replace(value, /=/g, '\\=');
value = _.replace(value, /\s/g, '\\ ');
}
} else if (_.isInteger(value)) {
if (withQuote) {
value = value + 'i';
}
} else if (_.isObject(value)) {
value = '"' + _.replace(JSON.stringify(value), /"/g, '\\"') + '"';
} else if (_.isNull(value) || _.isNil(value)) {
value = '""';
}
return value;
}
/**
* write point into influxdb by HTTP. use the open source node-influx module.
* @see https://github.com/node-influx/node-influx
*
* @param name {string} the measurement name
* @param options {object}
* @param key {object} some keys of data
* @param fields {object} some fields of data
* @returns {Promise}
*/
httpTrace(name, options, key, fields) {
let client = this.getHttpClient();
return new Promise((resolve, reject) => {
if (_.isArray(key)) {
client.writePoints(name, key, options, (err, rp) => {
if (err) {
reject(err);
} else {
resolve(rp)
}
});
} else {
client.writePoint(name, fields, key, options, (err, rp) => {
if (err) {
reject(err);
} else {
resolve(rp)
}
});
}
});
}
/**
* the singleton http client.
*
* @returns {*}
*/
getHttpClient() {
if (!this.httpClient) {
this.httpClient = influx(this.options);
}
return this.httpClient;
}
}
module.exports = Trace;
module.exports = Trace;
\ No newline at end of file
... ...
'use strict';
import Collect from '../../ci/collect_data';
import {
Project
} from '../../models';
export default {
async collect(ctx) {
let projects,
servers = {};
setInterval(async() => {
if (!projects) projects = await Project.findAll();
if (!projects || !projects.length) {
return;
}
projects.forEach(async(p) => {
const hosts = p.deploy['production'].target;
hosts.forEach((host) => {
if (!servers[host]) {
servers[host] = new Collect(host, p.name);
}
servers[host].collect();
});
});
}, 5000);
}
};
\ No newline at end of file
... ...
... ... @@ -10,6 +10,7 @@ import Koa from 'koa';
import hbs from '../../middleware/yoho-koa-hbs';
import helpers from '../../lib/helpers';
import routers from './routers'
import collectData from './actions/collect_data';
const app = new Koa();
... ... @@ -29,6 +30,9 @@ const mastersUrl = [
'/users'
];
// 服务器监控数据采集
collectData.collect();
app.use(async(ctx, next) => {
ctx.locals = {
title: 'Yoho Node.js 持续集成平台'
... ... @@ -38,7 +42,7 @@ app.use(async(ctx, next) => {
ctx.locals.layout = null;
}
if (ctx.session && ctx.session.user ) {
if (ctx.session && ctx.session.user) {
ctx.locals.is_master = ctx.session.user.role === '1000';
ctx.locals.current_user = ctx.session.user;
}
... ...
... ... @@ -5,15 +5,19 @@ import path from 'path';
const env = process.env.NODE_ENV || 'development';
const defaults = {
port: 9000,
buildDir: path.normalize(__dirname + '/../packages/'),
dbDir: path.normalize(__dirname + '/../db')
port: 9000,
buildDir: path.normalize(__dirname + '/../packages/'),
dbDir: path.normalize(__dirname + '/../db'),
influxdb: {
host: '54.222.219.223',
port: 4444
}
};
const specific = {
development: {},
test: {},
production: {}
development: {},
test: {},
production: {},
};
export default Object.assign(defaults, specific[env]);
\ No newline at end of file
... ...