Authored by htoooth

Revert "Revert "fix""

This reverts commit dd76bfac.
const express = require('express');
const bodyParser = require('body-parser');
const requestIp = require('request-ip');
const app = express();
const config = require('./common/config');
const logger = require('yoho-node-lib/lib/logger').init(config);
const crypto = require('yoho-node-lib/lib/crypto');
global.yoho = {
logger,
config,
crypto
};
require('yoho-node-lib').global(config);
const {logger} = global.yoho;
app.use(bodyParser.text({
limit: '1024kb'
}));
app.use(function(req, res, next) {
req.clientIp = requestIp.getClientIp(req);
next();
});
try {
const middleware = require('./middleware');
require('./dispatch')(app);
app.get('/', (req, res) => {
res.send('首页');
// error handle
app.use('*', (req, res) => {
res.statusCode = 404;
res.send();
});
app.get('/apm/yas.gif', middleware.apm);
// docker验证项目是否正常发布
app.use('/node/status.html', (req, res) => {
return res.status(204).end();
app.use((err, req, res, next) => { // eslint-disable-line
logger.error('error:', err);
res.send();
});
// 新方法
app.get('/apm/yas2.gif', middleware.clientApm);
app.post('/write', middleware.serverApm);
} catch (e) {
logger.error(e);
}
app.use('*', (req, res) => {
res.statusCode = 404;
res.send();
});
app.use((err, req, res) => {
logger.error('error:', err);
res.send();
});
app.listen(config.port, () => {
logger.info(`yoho apm start at ${config.port}`);
});
module.exports = app;
... ...
... ... @@ -72,6 +72,42 @@ module.exports = {
// min: 0,
// max: 10 * 1000
},
zookeeperServer: '127.0.0.1:2181',
redis: {
connect: {
// host: '127.0.0.1',
host: '192.168.102.49',
port: '6379',
enable_offline_queue: false,
retry_strategy(options) {
if (options.error && options.error.code === 'ECONNREFUSED') {
console.log('connect redis server fail');
}
if (options.attempt < 10) {
return Math.min(options.attempt * 100, 1000);
} else if (options.attempt > 10 && options.attempt < 100) {
return 1000;
} else {
return 1000 * 10;
}
}
}
},
REQUEST_LIMIT: {
// 10s 最多访问5次
10: 5,
// 30s 最多访问10次
30: 10,
// 60s 最多访问15次
60: 15,
// 100s 最多访问100次
600: 100
}
};
... ... @@ -102,6 +138,28 @@ if (isProduction) {
db: 'webapm',
userName: 'root',
password: 'yB877Jy7tV6juIYk'
},
zookeeperServer: 'web.zookeeper.yohoops.org:2181',
redis: {
connect: {
host: 'redis.web.yohoops.org',
port: '6379',
password: 'redis9646',
enable_offline_queue: false,
retry_strategy(options) {
if (options.error && options.error.code === 'ECONNREFUSED') {
console.log('connect redis server fail');
}
if (options.attempt < 10) {
return Math.min(options.attempt * 100, 1000);
} else if (options.attempt > 10 && options.attempt < 100) {
return 1000;
} else {
return 1000 * 10;
}
}
}
}
});
}
... ...
const middleware = require('./middleware');
module.exports = (app) => {
app.get('/', (req, res) => {
res.send('首页');
});
// old yas
app.get('/apm/yas.gif', middleware.apm);
// docker验证项目是否正常发布
app.use('/node/status.html', (req, res) => {
return res.status(204).end();
});
// new yas
app.get('/apm/yas2.gif', middleware.getip, middleware.clientApm);
app.post('/write', middleware.serverApm);
};
... ...
const requestIp = require('request-ip');
module.exports = function(req, res, next) {
req.clientIp = requestIp.getClientIp(req);
next();
};
\ No newline at end of file
... ...
const clientApm = require('./clientapm');
const apm = require('./apm');
const serverApm = require('./serverapm');
const getip = require('./getip');
module.exports = {
apm,
clientApm,
serverApm
serverApm,
getip
};
... ...
const compose = require('koa-compose');
const _ = require('lodash');
const qpsPath = require('./qps-path');
const qps = require('./qps');
const APP_NAME = {
'yohobuy-node': 'pc',
'yohobuywap-node': 'wap',
UNKNOWN: ''
};
module.exports = () => {
const handlers = compose([qpsPath, qps]);
return async(m) => {
const user = {
uid: _.get(m, 'fields.uid', ''),
ip: _.get(m, 'fields.ip', ''),
app: APP_NAME[_.get(m, 'tags.app', 'UNKNOWN')],
path: _.get(m, 'fields.path', '')
};
await handlers({user});
};
};
... ...
/**
* 控制路由请求次数
* @date: 2018/03/05
*/
'use strict';
const _ = require('lodash');
const cache = global.yoho.cache.master;
const config = global.yoho.config;
const logger = global.yoho.logger;
const md5 = require('md5');
const pathToRegexp = require('path-to-regexp');
const Promise = require('bluebird');
const zk = {};
if (config.zookeeperServer) {
require('yoho-zookeeper')(config.zookeeperServer, 'pc', zk.pc = {}, global.yoho.cache);
require('yoho-zookeeper')(config.zookeeperServer, 'wap', zk.wap = {}, global.yoho.cache);
}
const INVALIDTIME = 3600 * 24; // 24h
const IP_WHITE_LIST = [
'106.38.38.146',
'106.38.38.147',
'106.39.86.227',
'218.94.75.58',
'218.94.75.50',
'218.94.77.166'
];
const key = 'risk1';
module.exports = async({user}, next) => {
if (!user.app || !user.path || !user.ip) {
return next();
}
const app = user.app;
if (_.get(zk, `${app}.close.risk`, false)) {
return next();
}
const ip = user.ip;
const path = user.path;
const risks = _.get(zk, `${app}.json.risk`, []);
let router = {};
logger.debug(`risk => risks: ${JSON.stringify(risks)}, path: ${path}, ip: ${ip}`); // eslint-disable-line
if (_.isEmpty(path) || _.isEmpty(risks) || IP_WHITE_LIST.indexOf(ip) > -1) {
return next();
}
_.isArray(risks) && risks.some(item => {
if (item.state === 'off') {
return false;
}
if (!item.regRoute) {
item.regRoute = pathToRegexp(item.route);
item.interval = parseInt(item.interval, 10);
item.requests = parseInt(item.requests, 10);
}
if (item.regRoute.test(path)) {
router = item;
return true;
}
return false;
});
logger.debug(`risk => router: ${JSON.stringify(router)}, path: ${path}`); // eslint-disable-line
if (_.isEmpty(router)) {
return next();
}
let keyPath = md5(`${router.regRoute}`);
let limitKey = `${app}:${key}:limit:${keyPath}:${ip}`; // 查询这个key是否生效
let configKey = `${app}:${key}:${keyPath}:${ip}`;
await Promise.all([
cache.getAsync(limitKey),
cache.getAsync(configKey),
]).then(inters => {
logger.debug(`risk => getCache: ${JSON.stringify(inters)}, path: ${path}`); // eslint-disable-line
if (inters[0]) {
logger.info('[qps:route] this user[%o] has rejected', user);
return;
}
if (typeof inters[1] === 'undefined') {
cache.setAsync(configKey, 1, router.interval || 300);
return;
}
inters[1] = parseInt(`0${inters[1]}`, 10);
if (inters[1] <= router.requests) {
router = [];
cache.incrAsync(configKey, 1);
return;
}
logger.warn('[qps:route] this user[%o] is being marked as rejected', user);
return Promise.all([
cache.setAsync(limitKey, 1, INVALIDTIME),
cache.delAsync(configKey)
]);
}).then(result => {
logger.debug('[qps:route] user[%o] result[%o]', user, result); // eslint-disable-line
}).catch(e => {
logger.error(`risk => path: ${path}, err: ${e.message}`);
}).finally(() => {
return next();
});
};
... ...
'use strict';
const logger = global.yoho.logger;
const cache = global.yoho.cache.master;
const config = global.yoho.config;
const Promise = require('bluebird');
const _ = require('lodash');
// 超出访问限制ip限制访问1小时
const limiterIpTime = 3600;
// 页面访问限制
const MAX_TIMES = config.REQUEST_LIMIT;
const limiterKey = 'limiter2';
module.exports = async({user}, next) => {
if (!user.app || !user.ip) {
return next();
}
// 存储规则的cache keys
let ruleKeys = {};
let getOp = {};
_.forEach(MAX_TIMES, (val, key) => {
ruleKeys[key] = `${user.app}:${limiterKey}:${key}:max:${user.ip}`;
getOp[key] = cache.getAsync(ruleKeys[key]);
});
getOp.human = cache.getAsync(`${user.app}:${limiterKey}:ishuman:${user.ip}`);
return Promise.props(getOp).then((results) => {
if (results.human) { // 经过验证码之后1小时有效期内不再验证qps
logger.warn('[qps] this user[%o] is being marked as human', user);
return {};
}
// 遍历限制规则,若满足返回相应处理策略, 否则页面访问次数加1
let operation = [];
_.forEach(MAX_TIMES, (val, key) => {
let cacheKey = ruleKeys[key];
if (!results[key]) {
operation.push(cache.setAsync(cacheKey, 1, +key));
} else if (+results[key] > +val) {
logger.warn('[qps] this user[%o] is being marked as rejected', user);
// ip限制1小时
operation.push(cache.setAsync(`${user.app}:${limiterKey}:${user.ip}`, 1, limiterIpTime));
} else {
operation.push(cache.incrAsync(cacheKey, 1));
}
});
return Promise.all(operation);
}).then((result) => {
logger.debug('[qps] user[%o] result[%o]', user, result); // eslint-disable-line
}).catch(err=>{
logger.error(err);
}).finally(() => {
next();
});
};
... ...
const compose = require('koa-compose');
class User {
constructor(uid, userAgent, path, ip) {
this._uid = uid;
this._userAgent = userAgent;
this._path = path;
this._ip = ip;
}
ip(opts) {
if (opts.excludes.includes(this._ip)) {
return null;
}
return this._ip;
}
}
class Resource {
all() {
return this;
}
includes(opts) {
}
excludes(opts) {
}
}
class Metrics {
qps(id, resource) {
}
}
function makeContext(req, res) {
const ctx = {req, res};
return ctx;
}
const _rules = [];
function runRule() {
const middleWare = compose(_rules);
return (req, res, next) => {
const context = makeContext(req, res);
middleWare(context).then(() => {
res.json({
code: 200,
message: '成功'
});
}).catch(next);
};
}
function addRule(name, opts, action) {
_rules.push(action.bind(opts));
}
module.exports = {
rule: addRule,
runRule
};
... ...
const {rule} = require('./model');
const whiteList = [];
const blackList = [];
rule('user.ip.router', {}, async({user, resource, metrics}, next) => {
const qps = await metrics.qps(user.ip({
excludes: whiteList
}), resource.all({
includes: blackList,
excludes: whiteList
}));
if (qps > 200) {
} else {
}
return next();
});
rule('user.ip.', {}, async({user, resource, metrics}, next) => {
const qps = await metrics.qps(user.ip({
excludes: whiteList
}), resource.all({
excludes: whiteList
}));
if (qps > 2000) {
} else {
}
return next();
});
... ...
const lineparse = require('../lib/line-parse');
const bb = require('bluebird');
const logger = global.yoho.logger;
... ... @@ -8,13 +9,16 @@ const {
handleWebServerDuration
} = require('./serverapm-service');
const riskService = require('./risk-service');
const handleRisk = riskService();
const server = {
async handle(data) {
try {
let msgs = lineparse.parse(data);
msgs.forEach((m) => {
await bb.map(msgs, async(m) => {
if (!m.measurement) {
return;
}
... ... @@ -22,6 +26,7 @@ const server = {
switch (m.measurement) {
case 'web-server-duration': {
handleWebServerDuration(m);
await handleRisk(m);
break;
}
case 'error-report': {
... ... @@ -37,8 +42,9 @@ const server = {
break;
}
}
});
logger.info('[server] handle OK [%s]', data);
logger.info('[server] handle OK [%s]', data);
}, {concurrency: 2});
} catch (e) {
logger.error('[server] handle ERROR [%s]', e);
}
... ...
... ... @@ -2,32 +2,37 @@
"name": "yoho-apm",
"description": "",
"version": "1.0.0",
"main": "app.js",
"main": "server.js",
"directories": {
"lib": "lib"
},
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"start:dev": "nodemon --ignore ./examples app.js"
"dev": "nodemon --ignore ./examples server.js"
},
"author": "",
"license": "ISC",
"dependencies": {
"bluebird": "^3.5.1",
"body-parser": "^1.18.2",
"debug": "^3.1.0",
"express": "^4.15.4",
"influx-batch-sender": "^0.1.5",
"ip2region": "^1.1.0",
"knex": "^0.14.1",
"koa-compose": "^4.1.0",
"lodash": "^4.17.4",
"lru-cache": "^4.1.1",
"md5": "^2.2.1",
"mysql": "^2.15.0",
"path-to-regexp": "^2.4.0",
"request-ip": "^2.0.2",
"request-promise": "^4.2.1",
"source-map": "^0.5.7",
"url": "^0.11.0",
"useragent": "^2.3.0",
"yoho-node-lib": "^0.6.13"
"yoho-node-lib": "^0.6.26",
"yoho-zookeeper": "^1.0.10"
},
"devDependencies": {
"babel-core": "^6.26.0",
... ...
... ... @@ -2,7 +2,7 @@
"apps": [
{
"name": "yoho-apm",
"script": "app.js",
"script": "server.js",
"instances": "1",
"exec_mode": "cluster",
"merge_logs": true,
... ...
const app = require('./app');
const {
config,
logger
} = global.yoho;
app.listen(config.port, () => {
logger.info(`yoho apm start at ${config.port}`);
});
... ...