Authored by wangning

Merge branch 'dev-wn'

... ... @@ -227,7 +227,8 @@ public class ApiStatisticsAnalyzer implements Serializable {
Iterator itor = spanList.iterator() ;
while(itor.hasNext()){
SpanInfo s = (SpanInfo) itor.next() ;
if( parentSpanid.equals(s.getParent()) && s.getEndpoint().equals("cs") ) {
//如果spanid==parent 表示这是root,不做处理
if( parentSpanid.equals(s.getParent()) && s.getEndpoint().equals("cs") && !s.getSpanid().equals(s.getParent())) {
s.setLevel(count+1);
sortSpanList.add(s);
recusive(spanList, s.getSpanid(), sortSpanList, count+1 ) ;
... ...
... ... @@ -3,6 +3,7 @@ package com.yoho.trace.anaylzer.model;
import lombok.Data;
import java.io.Serializable;
import java.util.Map;
/**
* Created by xjipeng on 2017/10/12.
... ... @@ -30,4 +31,6 @@ public class SpanInfo implements Serializable {
* 来源页面ID
*/
private String pageId;
private Map<String, String> tags;
}
\ No newline at end of file
... ...
package com.yoho.trace.anaylzer.model;
import java.io.Serializable;
/**
* Created by wangning on 2017/11/15.
*/
public class SpanIpResult implements Serializable {
private long avgDuration;
private String ip;
private int times;
public long getAvgDuration() {
return avgDuration;
}
public void setAvgDuration(long avgDuration) {
this.avgDuration = avgDuration;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getTimes() {
return times;
}
public void setTimes(int times) {
this.times = times;
}
}
... ...
package com.yoho.trace.online.handler;
import com.alibaba.fastjson.JSONObject;
import com.clearspring.analytics.util.Lists;
import com.yoho.trace.anaylzer.ApiStatisticsAnalyzer;
import com.yoho.trace.anaylzer.model.ApiTraceResult;
import com.yoho.trace.anaylzer.model.SortedTrace;
import com.yoho.trace.anaylzer.model.SpanInfo;
import com.yoho.trace.anaylzer.model.SpanIpResult;
import com.yoho.trace.sleuth.Span;
import com.yoho.trace.sleuth.Spans;
import com.yoho.trace.store.ApiStatisticsResultStore;
import com.yoho.trace.store.HBasePool;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function3;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.api.java.function.*;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
... ... @@ -28,25 +34,22 @@ import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Currency;
import java.util.Iterator;
import java.util.List;
import java.util.*;
/**
* Created by markeloff on 2017/7/26.
*/
public class TraceAnalyzeHandler implements TraceHandler, Serializable {
private ApiStatisticsAnalyzer analyzer ;
private ApiStatisticsAnalyzer analyzer;
private static Logger logger = LoggerFactory.getLogger(TraceAnalyzeHandler.class) ;
private static Logger logger = LoggerFactory.getLogger(TraceAnalyzeHandler.class);
public TraceAnalyzeHandler(){
public TraceAnalyzeHandler() {
analyzer = new ApiStatisticsAnalyzer(false);
}
private static final int interval = 3000 ; //3s
private static final int interval = 3000; //3s
@Override
public void handle(final JavaDStream<Spans> kafkaMsgDStream) {
... ... @@ -57,62 +60,63 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable {
@Override
public Iterator<Tuple2<String, SpanInfo>> call(Iterator<Spans> spansIterator) throws Exception {
long enter = System.currentTimeMillis() ;
List<Tuple2<String, SpanInfo>> spanInfoList = new ArrayList<>(500) ;
while(spansIterator.hasNext()){
Spans spans = spansIterator.next() ;
spans.getHost() ;
List<Span> list = spans.getSpans() ;
Iterator<Span> itor = list.iterator() ;
while(itor.hasNext()){
Span span = itor.next() ;
String logEvent = "none" ;
if( CollectionUtils.isNotEmpty(span.logs()) ){
logEvent = span.logs().get(0).getEvent() ;
long enter = System.currentTimeMillis();
List<Tuple2<String, SpanInfo>> spanInfoList = new ArrayList<>(500);
while (spansIterator.hasNext()) {
Spans spans = spansIterator.next();
spans.getHost();
List<Span> list = spans.getSpans();
Iterator<Span> itor = list.iterator();
while (itor.hasNext()) {
Span span = itor.next();
String logEvent = "none";
if (CollectionUtils.isNotEmpty(span.logs())) {
logEvent = span.logs().get(0).getEvent();
}
SpanInfo spanInfo = new SpanInfo() ;
SpanInfo spanInfo = new SpanInfo();
spanInfo.setName(span.getName());
spanInfo.setBegin(span.getBegin());
spanInfo.setEnd(span.getEnd());
spanInfo.setTraceid(Span.idToHex(span.getTraceId()));
spanInfo.setSpanid(Span.idToHex(span.getSpanId()));
if( span.getParents().size() > 0 ) {
if (span.getParents().size() > 0) {
spanInfo.setParent(Span.idToHex(span.getParents().get(span.getParents().size() - 1)));
}
spanInfo.setService(spans.getHost().getServiceName());
spanInfo.setEndpoint(logEvent);
spanInfo.setIp(spans.getHost().getAddress());
spanInfo.setReceive(spans.getReceive());
spanInfoList.add(new Tuple2<>(spanInfo.getTraceid(),spanInfo));
spanInfo.setTags(span.tags());
spanInfoList.add(new Tuple2<>(spanInfo.getTraceid(), spanInfo));
}
}
logger.info("kafkaMsgDStream.mapPartitionsToPair elapse {}, size {} ", System.currentTimeMillis()-enter, spanInfoList.size() );
logger.info("kafkaMsgDStream.mapPartitionsToPair elapse {}, size {} ", System.currentTimeMillis() - enter, spanInfoList.size());
return spanInfoList.iterator();
}
}) ;
});
//把 span 按照 traceid 进行分组, 并过滤 120s 以前的 trace,不进行分析
JavaPairDStream<String,Iterable<SpanInfo>> tracePairDStream = spanPairDStream.groupByKey().filter(new Function<Tuple2<String, Iterable<SpanInfo>>, Boolean>() {
JavaPairDStream<String, Iterable<SpanInfo>> tracePairDStream = spanPairDStream.groupByKey().filter(new Function<Tuple2<String, Iterable<SpanInfo>>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Iterable<SpanInfo>> v1) throws Exception {
Iterator<SpanInfo> itr = v1._2().iterator() ;
long current = System.currentTimeMillis() ;
if( itr.hasNext() ){
SpanInfo span = itr.next() ;
Iterator<SpanInfo> itr = v1._2().iterator();
long current = System.currentTimeMillis();
if (itr.hasNext()) {
SpanInfo span = itr.next();
//过滤过早的调用信息,不进行分析
if( current - span.getEnd() >= 120000 ){
if (current - span.getEnd() >= 120000) {
logger.info("filter early trace , don't anaylze it, trace id {}", v1._1());
return false ;
return false;
}
}
return true;
}
}) ;
});
//返回的 dstream (mapWithStateDStream)中,是立即进行处理的, state 中保存的是 上个周期的最后3秒和本周期的最后3秒
... ... @@ -135,11 +139,11 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable {
// 1 只要有span出现在本批次最后3秒,全部更新的state中,不处理, return 空
if (isLast3sRecv) {
if (CollectionUtils.isNotEmpty(newSpanList)) {
logger.info("latest 3s trace in current batch, just state, trace id {}, span count {}", traceId, newSpanList.size() );
logger.info("latest 3s trace in current batch, just state, trace id {}, span count {}", traceId, newSpanList.size());
state.update(newSpanList);
}
//null会被过滤掉,本批次不处理
return new Tuple2<>(traceId,null);
return new Tuple2<>(traceId, null);
}
// 2 state 中有的trace,合并spanlist, 清理state, return完整的trace spanlist
if (state.exists()) {
... ... @@ -148,16 +152,16 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable {
newSpanList.add((SpanInfo) itor.next());
}
if( !state.isTimingOut() ) {
if (!state.isTimingOut()) {
//state未超时
state.remove();
logger.info("full trace with last batch spans, trace id {}, span count {} ", traceId, newSpanList.size() );
logger.info("full trace with last batch spans, trace id {}, span count {} ", traceId, newSpanList.size());
return new Tuple2<String, Iterable<SpanInfo>>(traceId, newSpanList);
} else {
//state已超时
logger.info("state is timeout, can not process, trace id {}, span count {} ", traceId, newSpanList.size() );
logger.info("state is timeout, can not process, trace id {}, span count {} ", traceId, newSpanList.size());
//null会被过滤掉,不处理
return new Tuple2<>(traceId,null);
return new Tuple2<>(traceId, null);
}
}
... ... @@ -167,18 +171,18 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable {
}
}).timeout(Durations.seconds(90))) ;
}).timeout(Durations.seconds(90)));
//过滤
JavaPairDStream<String, Iterable<SpanInfo>> currentTraceStream = mapWithStateDStream.filter(new Function<Tuple2<String, Iterable<SpanInfo>>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Iterable<SpanInfo>> v1) throws Exception {
//本周期最后3秒收到的trace,等到下个周期再处理
if( v1._2() == null ){
if (v1._2() == null) {
logger.info("filter null trace (latest 3s will process on next batch) , trace id {}", v1._1());
return false;
}
logger.info("go to analyze current batch trace, trace id {}", v1._1() );
logger.info("go to analyze current batch trace, trace id {}", v1._1());
return true;
}
}).mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<String, Iterable<SpanInfo>>>, String, Iterable<SpanInfo>>() {
... ... @@ -186,7 +190,7 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable {
public Iterator<Tuple2<String, Iterable<SpanInfo>>> call(Iterator<Tuple2<String, Iterable<SpanInfo>>> tuple2Iterator) throws Exception {
return tuple2Iterator;
}
}) ;
});
//mapWithStateDStream.stateSnapshots state 中 保存了 1、本批次最后3秒, 2、 上个批次最后3秒(本批次未更新的trace)
... ... @@ -233,25 +237,138 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable {
// });
JavaPairDStream<String, Iterable<SpanInfo>> needAnaylzeStream = currentTraceStream.union(lastStateTraceStream);
//key traceid
JavaPairDStream<String, Iterable<SpanInfo>> needAnaylzeStream = currentTraceStream.union(lastStateTraceStream);
//key minuteTime.traceMD5 value 该traceMD5的一个traceid对应的信息
JavaPairDStream<String, SortedTrace> sortSpanTrace = needAnaylzeStream.flatMapToPair(analyzer.SortSpanTreeFunc);
sortSpanTrace.cache();
//处理span+ip的耗时分布
handlerSpanIp(sortSpanTrace);
JavaPairDStream<String, ApiTraceResult> apiResultTraceDStream = sortSpanTrace.mapToPair(analyzer.ConvertTraceResultFunc) ;
//根据traceMD5分组计算value
JavaPairDStream<String, ApiTraceResult> resultDStream = apiResultTraceDStream.reduceByKey(analyzer.ReduceFunc) ;
resultDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, ApiTraceResult>>() {
@Override
public void call(JavaPairRDD<String, ApiTraceResult> apiResultRdd) throws Exception {
ApiStatisticsResultStore.store(apiResultRdd, "trace_api_analyze_minutes");
ApiStatisticsResultStore.store(apiResultRdd, "trace_api_analyze_minutes_wn");
}
});
}
private void handlerSpanIp(JavaPairDStream<String, SortedTrace> sortSpanTrace){
//key traceMD5 + ":" + spanName + ":" + minuteStart + ":" + ip
JavaPairDStream<String, SpanIpResult> stringSpanInfoJavaPairDStream = sortSpanTrace.flatMapToPair(new PairFlatMapFunction<Tuple2<String, SortedTrace>, String, SpanIpResult>() {
@Override
public Iterator<Tuple2<String, SpanIpResult>> call(Tuple2<String, SortedTrace> tuple2) throws Exception {
List<Tuple2<String, SpanIpResult>> resultList = Lists.newArrayList();
String minuteStart = tuple2._1.split("\\.")[0];
String traceMD5 = tuple2._1.split("\\.")[1];
SortedTrace sortTrace = tuple2._2;
List<SpanInfo> sortSpanList = sortTrace.getSortSpanList();
for (SpanInfo spanInfo : sortSpanList) {
String ip = "";
//只有root和resttemplate才会生成ip
if(spanInfo.getTraceid().equals(spanInfo.getSpanid())){
ip = spanInfo.getIp();
}else{
if(spanInfo.getTags()!=null){
String httpHost = spanInfo.getTags().get("http.host");
if(StringUtils.isNoneBlank(httpHost)){
ip = httpHost;
}
}
}
if(StringUtils.isBlank(ip)){
continue;
}
SpanIpResult spanIpResult = new SpanIpResult();
String spanName = spanInfo.getName();
long duration = spanInfo.getEnd() - spanInfo.getBegin();
spanIpResult.setAvgDuration(duration);
spanIpResult.setIp(ip);
spanIpResult.setTimes(1);
String key = traceMD5 + ":" + spanName + ":" + minuteStart + ":" + ip;
Tuple2<String, SpanIpResult> resultTuple2 = new Tuple2<>(key, spanIpResult);
resultList.add(resultTuple2);
}
return resultList.iterator();
}
});
JavaPairDStream<String, SpanIpResult> stringSpanIpResultJavaPairDStream = stringSpanInfoJavaPairDStream.reduceByKey(new Function2<SpanIpResult, SpanIpResult, SpanIpResult>() {
@Override
public SpanIpResult call(SpanIpResult v1, SpanIpResult v2) throws Exception {
SpanIpResult result = new SpanIpResult();
result.setIp(v1.getIp());
result.setTimes(v1.getTimes() + v2.getTimes());
result.setAvgDuration((v1.getAvgDuration() * v1.getTimes() + v2.getAvgDuration() * v2.getTimes()) / result.getTimes());
return result;
}
});
stringSpanIpResultJavaPairDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, SpanIpResult>>() {
@Override
public void call(JavaPairRDD<String, SpanIpResult> stringSpanIpResultJavaPairRDD) throws Exception {
stringSpanIpResultJavaPairRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, SpanIpResult>>>() {
@Override
public void call(Iterator<Tuple2<String, SpanIpResult>> tuple2Iterator) throws Exception {
HTable resultTable = null;
try {
if (resultTable == null) {
resultTable = (HTable) HBasePool.getConnection().getTable(TableName.valueOf("span_ip"));
}
if(tuple2Iterator == null){
return;
}
List<Put> putList = new ArrayList<>();
while(tuple2Iterator.hasNext()){
Tuple2<String, SpanIpResult> next = tuple2Iterator.next();
String key = next._1;
SpanIpResult spanIpResult = next._2;
Put put = new Put(Bytes.toBytes(key));
put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("avg_duration"), Bytes.toBytes(spanIpResult.getAvgDuration()));
put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("times"), Bytes.toBytes(spanIpResult.getTimes()));
put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("ip"), Bytes.toBytes(spanIpResult.getIp()));
putList.add(put);
}
resultTable.put(putList);
} catch (Exception e) {
logger.error(e.getMessage(),e);
} finally {
if (resultTable != null)
resultTable.close();
}
}
});
}
});
}
}
... ...