Authored by 谢勇

性能优化

package com.yoho.trace.online.handler;
import com.alibaba.fastjson.JSONObject;
import com.clearspring.analytics.util.Lists;
import com.yoho.trace.anaylzer.ApiStatisticsAnalyzer;
import com.yoho.trace.anaylzer.model.ApiTraceResult;
import com.yoho.trace.anaylzer.model.SortedTrace;
import com.yoho.trace.anaylzer.model.SpanInfo;
import com.yoho.trace.anaylzer.model.SpanIpResult;
import com.yoho.trace.sleuth.Span;
import com.yoho.trace.sleuth.Spans;
import com.yoho.trace.store.ApiStatisticsResultStore;
import com.yoho.trace.store.HBasePool;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.*;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.Duration;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.Function3;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.State;
import org.apache.spark.streaming.StateSpec;
... ... @@ -31,10 +26,20 @@ import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.Serializable;
import java.util.*;
import com.alibaba.fastjson.JSONObject;
import com.clearspring.analytics.util.Lists;
import com.yoho.trace.anaylzer.ApiStatisticsAnalyzer;
import com.yoho.trace.anaylzer.model.ApiTraceResult;
import com.yoho.trace.anaylzer.model.SortedTrace;
import com.yoho.trace.anaylzer.model.SpanInfo;
import com.yoho.trace.anaylzer.model.SpanIpResult;
import com.yoho.trace.sleuth.Span;
import com.yoho.trace.sleuth.Spans;
import com.yoho.trace.store.ApiStatisticsResultStore;
import com.yoho.trace.store.HBasePool;
import scala.Tuple2;
/**
* Created by markeloff on 2017/7/26.
... ... @@ -231,20 +236,6 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable {
});
// 验证 是否产生交集 , 不应该产生交集
// currentTraceStream.join(lastStateTraceStream).foreachRDD(new VoidFunction<JavaPairRDD<String, Tuple2<Iterable<SpanInfo>, Iterable<SpanInfo>>>>() {
// @Override
// public void call(JavaPairRDD<String, Tuple2<Iterable<SpanInfo>, Iterable<SpanInfo>>> stringTuple2JavaPairRDD) throws Exception {
// stringTuple2JavaPairRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<Iterable<SpanInfo>, Iterable<SpanInfo>>>>() {
// @Override
// public void call(Tuple2<String, Tuple2<Iterable<SpanInfo>, Iterable<SpanInfo>>> stringTuple2Tuple2) throws Exception {
// logger.error("join trace in 2 rdd, trace id {}", stringTuple2Tuple2._1());
// }
// });
// }
// });
//key traceid
JavaPairDStream<String, Iterable<SpanInfo>> needAnaylzeStream = currentTraceStream.union(lastStateTraceStream);
... ... @@ -253,66 +244,68 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable {
sortSpanTrace.cache();
//1处理span+ip的耗时分布
handlerSpanIp(sortSpanTrace);
JavaPairDStream<String, ApiTraceResult> apiResultTraceDStream = sortSpanTrace.mapToPair(analyzer.ConvertTraceResultFunc) ;
apiResultTraceDStream.cache();
//1处理异常调用链信息
handlerExceptionTrace(apiResultTraceDStream);
//2处理treemd5调用链总览
handlertraceApiAnalyzeMinutes(apiResultTraceDStream);
//3处理异常调用链信息
handlerExceptionTrace(apiResultTraceDStream);
//4处理所有traceid
//3处理所有traceid
handlerAllTrace(apiResultTraceDStream);
//4处理span+ip的耗时分布
handlerSpanIp(sortSpanTrace);
}
private void handlerSpanIp(JavaPairDStream<String, SortedTrace> sortSpanTrace){
//key traceMD5 + ":" + spanName + ":" + minuteStart + ":" + ip
JavaPairDStream<String, SpanIpResult> stringSpanInfoJavaPairDStream = sortSpanTrace.flatMapToPair(new PairFlatMapFunction<Tuple2<String, SortedTrace>, String, SpanIpResult>() {
@Override
public Iterator<Tuple2<String, SpanIpResult>> call(Tuple2<String, SortedTrace> tuple2) throws Exception {
List<Tuple2<String, SpanIpResult>> resultList = Lists.newArrayList();
String minuteStart = StringUtils.split(tuple2._1, '.')[0];
String traceMD5 = StringUtils.split(tuple2._1, '.')[1];
SortedTrace sortTrace = tuple2._2;
List<SpanInfo> sortSpanList = sortTrace.getSortSpanList();
for (SpanInfo spanInfo : sortSpanList) {
String ip = "";
//只有root和resttemplate才会生成ip
if(spanInfo.getTraceid().equals(spanInfo.getSpanid())){
ip = spanInfo.getIp();
}else{
if(StringUtils.isNotBlank(spanInfo.getHttpHost())){
ip = spanInfo.getHttpHost();
}
}
if(StringUtils.isBlank(ip)){
continue;
}
SpanIpResult spanIpResult = new SpanIpResult();
String spanName = spanInfo.getName();
long duration = spanInfo.getEnd() - spanInfo.getBegin();
spanIpResult.setAvgDuration(duration);
spanIpResult.setIp(ip);
spanIpResult.setTimes(1);
String key = traceMD5 + ":" + spanName + ":" + minuteStart + ":" + ip;
Tuple2<String, SpanIpResult> resultTuple2 = new Tuple2<>(key, spanIpResult);
resultList.add(resultTuple2);
}
return resultList.iterator();
}
});
private void handlerSpanIp(JavaPairDStream<String, SortedTrace> sortSpanTrace) {
// key traceMD5 + ":" + spanName + ":" + minuteStart + ":" + ip
JavaPairDStream<String, SpanIpResult> stringSpanInfoJavaPairDStream = sortSpanTrace.mapPartitionsToPair(
new PairFlatMapFunction<Iterator<Tuple2<String, SortedTrace>>, String, SpanIpResult>() {
@Override
public Iterator<Tuple2<String, SpanIpResult>> call(Iterator<Tuple2<String, SortedTrace>> tuple2List)
throws Exception {
if (null == tuple2List) {
return null;
}
List<Tuple2<String, SpanIpResult>> resultList = Lists.newArrayList();
while (tuple2List.hasNext()) {
Tuple2<String, SortedTrace> tuple2 = tuple2List.next();
String minuteStart = StringUtils.split(tuple2._1, '.')[0];
String traceMD5 = StringUtils.split(tuple2._1, '.')[1];
SortedTrace sortTrace = tuple2._2;
List<SpanInfo> sortSpanList = sortTrace.getSortSpanList();
for (SpanInfo spanInfo : sortSpanList) {
String ip = "";
// 只有root和resttemplate才会生成ip
if (spanInfo.getTraceid().equals(spanInfo.getSpanid())) {
ip = spanInfo.getIp();
} else {
if (StringUtils.isNotBlank(spanInfo.getHttpHost())) {
ip = spanInfo.getHttpHost();
}
}
if (StringUtils.isBlank(ip)) {
continue;
}
SpanIpResult spanIpResult = new SpanIpResult();
String spanName = spanInfo.getName();
long duration = spanInfo.getEnd() - spanInfo.getBegin();
spanIpResult.setAvgDuration(duration);
spanIpResult.setIp(ip);
spanIpResult.setTimes(1);
String key = traceMD5 + ":" + spanName + ":" + minuteStart + ":" + ip;
Tuple2<String, SpanIpResult> resultTuple2 = new Tuple2<>(key, spanIpResult);
resultList.add(resultTuple2);
}
}
return resultList.iterator();
}
});
JavaPairDStream<String, SpanIpResult> stringSpanIpResultJavaPairDStream = stringSpanInfoJavaPairDStream.reduceByKey(new Function2<SpanIpResult, SpanIpResult, SpanIpResult>() {
@Override
... ... @@ -355,10 +348,8 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable {
putList.add(put);
}
resultTable.put(putList);
} catch (Exception e) {
logger.error(e.getMessage(),e);
} finally {
... ... @@ -370,11 +361,9 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable {
});
}
});
}
private void handlertraceApiAnalyzeMinutes(JavaPairDStream<String, ApiTraceResult> apiResultTraceDStream){
//根据traceMD5分组计算value
JavaPairDStream<String, ApiTraceResult> resultDStream = apiResultTraceDStream.reduceByKey(analyzer.ReduceFunc) ;
... ...
package com.yoho.trace.online.handler;
import com.yoho.trace.sleuth.Spans;
import com.yoho.trace.store.HBasePool;
import com.yoho.trace.store.TraceSpanStore;
import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.spark.api.java.JavaRDD;
... ... @@ -11,9 +12,9 @@ import org.apache.spark.streaming.api.java.JavaDStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import com.yoho.trace.sleuth.Spans;
import com.yoho.trace.store.HBasePool;
import com.yoho.trace.store.TraceSpanStore;
/**
* Created by markeloff on 2017/7/26.
... ... @@ -34,21 +35,14 @@ public class TraceHbaseHandler implements TraceHandler , Serializable {
public void call(Iterator<Spans> spansIterator) throws Exception {
HTable traceTable = null ;
int count = 0 ;
long begin = System.currentTimeMillis() ;
try {
traceTable = (HTable) HBasePool.getConnection().getTable(TableName.valueOf("trace"));
traceTable.setWriteBufferSize(1024 * 1024 * 6);
traceTable.setAutoFlush(false, true);
while(spansIterator.hasNext()){
count = count + TraceSpanStore.store(spansIterator.next(), traceTable) ;
}
traceTable.flushCommits();
//traceTable.setAutoFlush(false, true);
int count = TraceSpanStore.store(spansIterator, traceTable) ;
//traceTable.flushCommits();
logger.info("flush spans to hbase, count {}, elapse {}", count, System.currentTimeMillis()-begin );
}finally {
try {
if(traceTable != null ) traceTable.close();
... ...
package com.yoho.trace.store;
import com.alibaba.fastjson.JSONObject;
import com.yoho.trace.sleuth.Span;
import com.yoho.trace.sleuth.Spans;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import properties.PropertiesFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
import com.yoho.trace.sleuth.Span;
import com.yoho.trace.sleuth.Spans;
/**
* Created by markeloff on 2017/7/26.
*/
public class TraceSpanStore implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(TraceSpanStore.class) ;
public static int store(Spans spans, HTable traceTable) {
int count = 0 ;
if (spans != null && CollectionUtils.isNotEmpty(spans.getSpans())) {
try {
List<Span> spanList = spans.getSpans() ;
for(int i=0; i<spanList.size(); i++){
Span span = spanList.get(i);
StringBuffer sb = new StringBuffer(64);
String logEvent = "none" ;
if( CollectionUtils.isNotEmpty(span.logs()) ){
logEvent = span.logs().get(0).getEvent() ;
}
// String rowkey = sb.append( (span.getBegin()/1000) + ":" + span.idToHex(span.getTraceId())).append(":").append(span.idToHex(span.getSpanId()))
// .append(":").append(logEvent).toString() ;
String rowkey = sb.append( span.idToHex(span.getTraceId()) + ":" + (span.getBegin()/1000) ).append(":").append(span.idToHex(span.getSpanId()))
.append(":").append(logEvent).toString() ;
Put put = new Put(Bytes.toBytes(rowkey)) ;
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("service"),Bytes.toBytes( spans.getHost().getServiceName() )) ;
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("event"),Bytes.toBytes( logEvent )) ;
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("ip"),Bytes.toBytes( spans.getHost().getAddress() )) ;
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("name"),Bytes.toBytes(span.getName())) ;
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("traceid"),Bytes.toBytes( Span.idToHex(span.getTraceId()) )) ;
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("spanid"),Bytes.toBytes( Span.idToHex(span.getSpanId()) )) ;
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("begin"),Bytes.toBytes( span.getBegin() )) ;
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("end"),Bytes.toBytes( span.getEnd() )) ;
if( CollectionUtils.isNotEmpty( span.getParents())) {
put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("parent"), Bytes.toBytes(Span.idToHex(span.getParents().get(span.getParents().size() - 1)))) ;
}
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("logs"),Bytes.toBytes(JSONObject.toJSONString(span.logs()))) ;
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("tags"),Bytes.toBytes(JSONObject.toJSONString(span.tags()))) ;
traceTable.put(put);
count++ ;
}
}catch(Exception e){
logger.error("store to hbase failed, spand {} ", JSONObject.toJSONString(spans),e);
}
}
return count ;
}
}
private static final Logger logger = LoggerFactory.getLogger(TraceSpanStore.class);
public static int store(Iterator<Spans> spansIterator, HTable traceTable) {
List<Put> spanPutList = new ArrayList<>(10000);
while (spansIterator.hasNext()) {
Spans spans = spansIterator.next();
if (spans != null && CollectionUtils.isNotEmpty(spans.getSpans())) {
try {
List<Span> spanList = spans.getSpans();
for (int i = 0; i < spanList.size(); i++) {
Span span = spanList.get(i);
StringBuffer sb = new StringBuffer(128);
String logEvent = "none";
if (CollectionUtils.isNotEmpty(span.logs())) {
logEvent = span.logs().get(0).getEvent();
}
String rowkey = sb.append(span.idToHex(span.getTraceId()) + ":" + (span.getBegin() / 1000))
.append(":").append(span.idToHex(span.getSpanId())).append(":").append(logEvent)
.toString();
Put put = new Put(Bytes.toBytes(rowkey));
put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("service"),Bytes.toBytes(spans.getHost().getServiceName()));
put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("event"), Bytes.toBytes(logEvent));
put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("ip"),Bytes.toBytes(spans.getHost().getAddress()));
put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("name"), Bytes.toBytes(span.getName()));
put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("traceid"),Bytes.toBytes(Span.idToHex(span.getTraceId())));
put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("spanid"),Bytes.toBytes(Span.idToHex(span.getSpanId())));
put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("begin"), Bytes.toBytes(span.getBegin()));
put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("end"), Bytes.toBytes(span.getEnd()));
if (CollectionUtils.isNotEmpty(span.getParents())) {
put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("parent"),Bytes.toBytes(Span.idToHex(span.getParents().get(span.getParents().size() - 1))));
}
put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("logs"),Bytes.toBytes(JSONObject.toJSONString(span.logs())));
put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("tags"),Bytes.toBytes(JSONObject.toJSONString(span.tags())));
spanPutList.add(put);
}
traceTable.put(spanPutList);
} catch (Exception e) {
logger.error("store to hbase failed, spand {} ", JSONObject.toJSONString(spans), e);
}
}
}
return spanPutList.size();
}
}
\ No newline at end of file
... ...