ApiAnalyzeHandler.java
8.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
package com.yoho.trace.offline.handler;
import com.yoho.trace.anaylzer.ApiStatisticsAnalyzer;
import com.yoho.trace.offline.AnalyzeHandleFactory;
import com.yoho.trace.anaylzer.model.ApiTraceResult;
import com.yoho.trace.anaylzer.model.SortedTrace;
import com.yoho.trace.anaylzer.model.SpanInfo;
import com.yoho.trace.anaylzer.model.SpanResult;
import com.yoho.trace.store.ApiStatisticsResultStore;
import com.yoho.trace.utils.MD5;
import org.apache.spark.api.java.JavaPairRDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.Serializable;
import java.util.*;
/**
* Created by xjipeng on 2017/10/12.
*/
public class ApiAnalyzeHandler implements IAnalyzeHandler, Serializable {
private static final Logger logger = LoggerFactory.getLogger(ApiAnalyzeHandler.class) ;
public void handle(JavaPairRDD<String, SpanInfo> spanInfoPairRDD ){
ApiStatisticsAnalyzer analyzer = new ApiStatisticsAnalyzer(true) ;
//按照traceid分组,key为traceid,value为 span list
JavaPairRDD<String, Iterable<SpanInfo>> tracePairRdd = spanInfoPairRDD.groupByKey() ;
// <traceid, spanList > 映射为 <traceType, spanList >
JavaPairRDD<String, SortedTrace> pairRdd = tracePairRdd.flatMapToPair(analyzer.SortSpanTreeFunc) ;
//map 映射为结果的对象, key为 链条类型的md5
JavaPairRDD<String, ApiTraceResult> apiTracePairRDD = pairRdd.mapToPair(analyzer.ConvertTraceResultFunc)
.reduceByKey(analyzer.ReduceFunc);
//输出,hbase 或者 打印日志
ApiStatisticsResultStore.store(apiTracePairRDD, "trace_api_analyze");
}
/**
* 为每一种链条生成结果
* @param tuple2 key为链条类型的md5, value为该类链条的明细list
* @return 每种链条类型的统计结果, 次数、平均时间、每一步的平均时间、最快和最慢的traceid、
*/
private Tuple2<String, ApiTraceResult> generateResult(Tuple2<String, Iterable<SortedTrace>> tuple2){
// calc the times for every trace link
long duration = 0;
int times = 0;
//calc the fastest and slowest trace
String maxLatencyTrace = new String();
long maxLatency = 1;
String minLatencyTrace = new String();
long minLatency = 9999999;
//计算出 一种链条中 最快 和 最慢的 traceid,以及 这种链条发生的 总次数、总耗时
Iterator itor = tuple2._2().iterator();
while (itor.hasNext()) {
SortedTrace trace = (SortedTrace) itor.next();
duration = duration + trace.getDuration();
times++;
//slowest
if (trace.getDuration() > maxLatency) {
maxLatency = trace.getDuration();
maxLatencyTrace = trace.getTraceid();
}
//fastest
if (trace.getDuration() < minLatency) {
minLatency = trace.getDuration();
minLatencyTrace = trace.getTraceid();
}
}
//计算调用链路中每一步的总耗时
Map<String, Long> durationPerStep = new HashMap();
Iterator itor3 = tuple2._2().iterator();
while (itor3.hasNext()) {
SortedTrace trace = (SortedTrace) itor3.next();
List<SpanInfo> spanList = (List<SpanInfo>) trace.getSortSpanList();
for (int i = 0; i < spanList.size(); i++) {
String key = spanList.get(i).getName();
long d = spanList.get(i).getEnd() - spanList.get(i).getBegin();
if (!durationPerStep.containsKey(key)) {
durationPerStep.put(key, d);
} else {
durationPerStep.put(key, durationPerStep.get(key) + d);
}
}
}
//计算调用链中每一步的平均耗时
Iterator keyItr = durationPerStep.keySet().iterator();
while (keyItr.hasNext()) {
String key = (String) keyItr.next();
if (durationPerStep.containsKey(key)) {
durationPerStep.put(key, durationPerStep.get(key) / times);
}
}
//生成调用链路
Iterator itor2 = tuple2._2().iterator();
SortedTrace firstSortedTrace = (SortedTrace) itor2.next();
//call list of trace link
List<SpanInfo> spanList = firstSortedTrace.getSortSpanList();
List<SpanResult> list = new ArrayList();
for (int i = 0; i < spanList.size(); i++) {
list.add(new SpanResult(spanList.get(i).getName(), durationPerStep.get(String.valueOf(spanList.get(i).getName())),
spanList.get(i).getLevel(), spanList.get(i).getSpanid() ,spanList.get(i).getParent() , spanList.get(i).getSrcService(), spanList.get(i).getDstService(), null, null,spanList.get(i).getErrorCount(),spanList.get(i).getSpanType()));
}
//设置结果而已
ApiTraceResult result = new ApiTraceResult();
result.setApiName(firstSortedTrace.getApi());
result.setCallTimes(times);
result.setTraceMd5(tuple2._1());
result.setSpans(list);
result.setDuration(duration / times);
result.setMaxLatencyTrace(maxLatencyTrace);
result.setMinLatencyTrace(minLatencyTrace);
result.setMaxLatency((int) maxLatency);
result.setMinLatency((int) minLatency);
return new Tuple2(firstSortedTrace.getApi(), result);
}
/**
* 为每个具体的调用链,生成调用链的树形结构,并生成唯一trace link 标识
* @param spanList
* @return
*/
private Tuple2<String, SortedTrace> generateTrace( ArrayList<SpanInfo> spanList ){
//find root
Iterator itor = spanList.iterator() ;
SpanInfo rootSpan = null ;
while(itor.hasNext() ){
rootSpan = (SpanInfo)itor.next() ;
if(rootSpan.getTraceid().equals(rootSpan.getSpanid())){
break;
}
}
if( rootSpan == null ){
return null ;
}
//sort by time and service name
Collections.sort(spanList, new Comparator<Object>() {
public int compare(Object o1, Object o2) {
if (((SpanInfo) o1).getBegin() < ((SpanInfo) o2).getBegin()) {
return -1;
} else if (((SpanInfo) o1).getBegin() == ((SpanInfo) o2).getBegin()) {
if (((SpanInfo) o1).getName().compareToIgnoreCase(((SpanInfo) o2).getName()) <= 0) {
return -1;
}
}
return 1;
}
});
//build tree
ArrayList<SpanInfo> sortSpanList = new ArrayList<>();
rootSpan.setLevel(0);
rootSpan.setParent(rootSpan.getTraceid());
sortSpanList.add(rootSpan);
recusive(spanList, rootSpan.getSpanid(), sortSpanList , 0);
long duration = rootSpan.getEnd() - rootSpan.getBegin() ;
String api = rootSpan.getName() ;
StringBuilder key = new StringBuilder() ;
Iterator it = sortSpanList.iterator() ;
while(it.hasNext()) {
key.append( ((SpanInfo)it.next()).getName()+"|");
}
String keyMd5 = MD5.md5(key.toString());
SortedTrace trace = new SortedTrace() ;
trace.setApi(api);
trace.setDuration(duration);
trace.setTraceid(rootSpan.getTraceid());
trace.setSortSpanList(sortSpanList);
return new Tuple2<>(keyMd5 , trace );
}
/**
* 递归查找子调用,拼装为树形结构
* @param spanList
* @param parentSpanid
* @param sortSpanList
* @param count
*/
private void recusive(ArrayList<SpanInfo> spanList, String parentSpanid, ArrayList<SpanInfo> sortSpanList, int count ){
if( count > 100 ) return ;
Iterator itor = spanList.iterator() ;
while(itor.hasNext()){
SpanInfo s = (SpanInfo) itor.next() ;
if( parentSpanid.equals(s.getParent()) && s.getEndpoint().equals("cs") ) {
s.setLevel(count+1);
sortSpanList.add(s);
recusive(spanList, s.getSpanid(), sortSpanList, count+1 ) ;
}
}
}
private void output(JavaPairRDD<String,ApiTraceResult> apiResultRdd, boolean debugPrint){
//结果存到hbases中
ApiStatisticsResultStore.store(apiResultRdd, "trace_api_analyze");
}
}