...
|
...
|
@@ -79,6 +79,7 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable { |
|
|
spanInfo.setName(span.getName());
|
|
|
spanInfo.setBegin(span.getBegin());
|
|
|
spanInfo.setEnd(span.getEnd());
|
|
|
spanInfo.setDuration(span.getEnd() - span.getBegin());
|
|
|
spanInfo.setTraceid(Span.idToHex(span.getTraceId()));
|
|
|
spanInfo.setSpanid(Span.idToHex(span.getSpanId()));
|
|
|
if (span.getParents().size() > 0) {
|
...
|
...
|
@@ -90,7 +91,12 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable { |
|
|
spanInfo.setReceive(spans.getReceive());
|
|
|
if(span.tags()!=null){
|
|
|
spanInfo.setHttpHost(span.tags().get("http.host"));
|
|
|
//标记span是否是是异常span
|
|
|
if(StringUtils.isNotBlank(span.tags().get("error"))){
|
|
|
spanInfo.setErrorStatus(true);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
spanInfoList.add(new Tuple2<>(spanInfo.getTraceid(), spanInfo));
|
|
|
}
|
|
|
}
|
...
|
...
|
@@ -247,23 +253,17 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable { |
|
|
|
|
|
sortSpanTrace.cache();
|
|
|
|
|
|
//处理span+ip的耗时分布
|
|
|
//1处理span+ip的耗时分布
|
|
|
handlerSpanIp(sortSpanTrace);
|
|
|
|
|
|
JavaPairDStream<String, ApiTraceResult> apiResultTraceDStream = sortSpanTrace.mapToPair(analyzer.ConvertTraceResultFunc) ;
|
|
|
apiResultTraceDStream.cache();
|
|
|
|
|
|
//根据traceMD5分组计算value
|
|
|
JavaPairDStream<String, ApiTraceResult> resultDStream = apiResultTraceDStream.reduceByKey(analyzer.ReduceFunc) ;
|
|
|
|
|
|
resultDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, ApiTraceResult>>() {
|
|
|
@Override
|
|
|
public void call(JavaPairRDD<String, ApiTraceResult> apiResultRdd) throws Exception {
|
|
|
|
|
|
ApiStatisticsResultStore.store(apiResultRdd, "trace_api_analyze_minutes");
|
|
|
}
|
|
|
});
|
|
|
|
|
|
//2处理treemd5调用链总览
|
|
|
handlertraceApiAnalyzeMinutes(apiResultTraceDStream);
|
|
|
|
|
|
//3处理异常调用链信息
|
|
|
handlerExceptionTrace(apiResultTraceDStream);
|
|
|
}
|
|
|
|
|
|
|
...
|
...
|
@@ -369,4 +369,78 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable { |
|
|
});
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private void handlertraceApiAnalyzeMinutes(JavaPairDStream<String, ApiTraceResult> apiResultTraceDStream){
|
|
|
//根据traceMD5分组计算value
|
|
|
JavaPairDStream<String, ApiTraceResult> resultDStream = apiResultTraceDStream.reduceByKey(analyzer.ReduceFunc) ;
|
|
|
|
|
|
resultDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, ApiTraceResult>>() {
|
|
|
@Override
|
|
|
public void call(JavaPairRDD<String, ApiTraceResult> apiResultRdd) throws Exception {
|
|
|
|
|
|
ApiStatisticsResultStore.store(apiResultRdd, "trace_api_analyze_minutes");
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
|
|
|
private void handlerExceptionTrace(JavaPairDStream<String, ApiTraceResult> apiResultTraceDStream){
|
|
|
JavaPairDStream<String, ApiTraceResult> filter = apiResultTraceDStream.filter(new Function<Tuple2<String, ApiTraceResult>, Boolean>() {
|
|
|
@Override
|
|
|
public Boolean call(Tuple2<String, ApiTraceResult> stringSortedTraceTuple2) throws Exception {
|
|
|
if(stringSortedTraceTuple2._2.isErrorStatus()){
|
|
|
return true;
|
|
|
}else{
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
filter.foreachRDD(new VoidFunction<JavaPairRDD<String, ApiTraceResult>>() {
|
|
|
@Override
|
|
|
public void call(JavaPairRDD<String, ApiTraceResult> stringSortedTraceJavaPairRDD) throws Exception {
|
|
|
stringSortedTraceJavaPairRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, ApiTraceResult>>>() {
|
|
|
@Override
|
|
|
public void call(Iterator<Tuple2<String, ApiTraceResult>> tuple2Iterator) throws Exception {
|
|
|
HTable resultTable = null;
|
|
|
try {
|
|
|
if(tuple2Iterator == null){
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
if (resultTable == null) {
|
|
|
resultTable = (HTable) HBasePool.getConnection().getTable(TableName.valueOf("exception_trace"));
|
|
|
}
|
|
|
|
|
|
List<Put> putList = new ArrayList<>();
|
|
|
while(tuple2Iterator.hasNext()){
|
|
|
Tuple2<String, ApiTraceResult> next = tuple2Iterator.next();
|
|
|
ApiTraceResult apiTraceResult = next._2;
|
|
|
|
|
|
Put put = new Put(Bytes.toBytes(apiTraceResult.getTraceStartTime()/1000 + ":" + apiTraceResult.getTraceId()));
|
|
|
put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("spans"), Bytes.toBytes(JSONObject.toJSONString(apiTraceResult.getSpans())));
|
|
|
put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("traceid"), Bytes.toBytes(apiTraceResult.getTraceId()));
|
|
|
put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("starttime"), Bytes.toBytes(apiTraceResult.getTraceStartTime()/1000));
|
|
|
|
|
|
putList.add(put);
|
|
|
}
|
|
|
|
|
|
resultTable.put(putList);
|
|
|
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
logger.error(e.getMessage(),e);
|
|
|
} finally {
|
|
|
if (resultTable != null)
|
|
|
resultTable.close();
|
|
|
}
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
} |
...
|
...
|
|