TraceAnalyzeHandler.java
21.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
package com.yoho.trace.online.handler;
import com.alibaba.fastjson.JSONObject;
import com.clearspring.analytics.util.Lists;
import com.yoho.trace.anaylzer.ApiStatisticsAnalyzer;
import com.yoho.trace.anaylzer.model.ApiTraceResult;
import com.yoho.trace.anaylzer.model.SortedTrace;
import com.yoho.trace.anaylzer.model.SpanInfo;
import com.yoho.trace.anaylzer.model.SpanIpResult;
import com.yoho.trace.sleuth.Span;
import com.yoho.trace.sleuth.Spans;
import com.yoho.trace.store.ApiStatisticsResultStore;
import com.yoho.trace.store.HBasePool;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.*;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.State;
import org.apache.spark.streaming.StateSpec;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.Serializable;
import java.util.*;
/**
* Created by markeloff on 2017/7/26.
*/
public class TraceAnalyzeHandler implements TraceHandler, Serializable {
private ApiStatisticsAnalyzer analyzer;
private static Logger logger = LoggerFactory.getLogger(TraceAnalyzeHandler.class);
public TraceAnalyzeHandler() {
analyzer = new ApiStatisticsAnalyzer(false);
}
private static final int interval = 3000; //3s
@Override
public void handle(final JavaDStream<Spans> kafkaMsgDStream) {
// key 为 traceid, value 为 每个 span
JavaPairDStream<String, SpanInfo> spanPairDStream = kafkaMsgDStream.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Spans>, String, SpanInfo>() {
@Override
public Iterator<Tuple2<String, SpanInfo>> call(Iterator<Spans> spansIterator) throws Exception {
long enter = System.currentTimeMillis();
List<Tuple2<String, SpanInfo>> spanInfoList = new ArrayList<>(500);
while (spansIterator.hasNext()) {
Spans spans = spansIterator.next();
spans.getHost();
List<Span> list = spans.getSpans();
Iterator<Span> itor = list.iterator();
while (itor.hasNext()) {
Span span = itor.next();
String logEvent = "none";
if (CollectionUtils.isNotEmpty(span.logs())) {
logEvent = span.logs().get(0).getEvent();
}
SpanInfo spanInfo = new SpanInfo();
spanInfo.setName(span.getName());
spanInfo.setBegin(span.getBegin());
spanInfo.setEnd(span.getEnd());
spanInfo.setDuration(span.getEnd() - span.getBegin());
spanInfo.setTraceid(Span.idToHex(span.getTraceId()));
spanInfo.setSpanid(Span.idToHex(span.getSpanId()));
if (span.getParents().size() > 0) {
spanInfo.setParent(Span.idToHex(span.getParents().get(span.getParents().size() - 1)));
}
spanInfo.setService(spans.getHost().getServiceName());
spanInfo.setEndpoint(logEvent);
spanInfo.setIp(spans.getHost().getAddress());
spanInfo.setReceive(spans.getReceive());
if(span.tags()!=null){
spanInfo.setHttpHost(span.tags().get("http.host"));
//标记span是否是是异常span
if(StringUtils.isNotBlank(span.tags().get("error"))){
spanInfo.setErrorStatus(true);
}
}
spanInfoList.add(new Tuple2<>(spanInfo.getTraceid(), spanInfo));
}
}
logger.info("kafkaMsgDStream.mapPartitionsToPair elapse {}, size {} ", System.currentTimeMillis() - enter, spanInfoList.size());
return spanInfoList.iterator();
}
});
//把 span 按照 traceid 进行分组, 并过滤 120s 以前的 trace,不进行分析
JavaPairDStream<String, Iterable<SpanInfo>> tracePairDStream = spanPairDStream.groupByKey().filter(new Function<Tuple2<String, Iterable<SpanInfo>>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Iterable<SpanInfo>> v1) throws Exception {
Iterator<SpanInfo> itr = v1._2().iterator();
long current = System.currentTimeMillis();
if (itr.hasNext()) {
SpanInfo span = itr.next();
//过滤过早的调用信息,不进行分析
if (current - span.getEnd() >= 120000) {
logger.info("filter early trace , don't anaylze it, trace id {}", v1._1());
return false;
}
}
return true;
}
});
//返回的 dstream (mapWithStateDStream)中,是立即进行处理的, state 中保存的是 上个周期的最后3秒和本周期的最后3秒
JavaMapWithStateDStream mapWithStateDStream = tracePairDStream.mapWithState(StateSpec.function(new Function3<String, Optional<Iterable<SpanInfo>>,
State<Iterable<SpanInfo>>, Tuple2<String, Iterable<SpanInfo>>>() {
@Override
public Tuple2<String, Iterable<SpanInfo>> call(String traceId, Optional<Iterable<SpanInfo>> current, State<Iterable<SpanInfo>> state) throws Exception {
boolean isLast3sRecv = false;
List<SpanInfo> newSpanList = new ArrayList<SpanInfo>();
if (current.isPresent()) {
Iterator itor = current.get().iterator();
while (itor.hasNext()) {
SpanInfo info = (SpanInfo) itor.next();
if ((info.getReceive() - info.getEnd()) < interval) {
isLast3sRecv = true;
}
newSpanList.add(info);
}
}
// 1 只要有span出现在本批次最后3秒,全部更新的state中,不处理, return 空
if (isLast3sRecv) {
if (CollectionUtils.isNotEmpty(newSpanList)) {
logger.info("latest 3s trace in current batch, just state, trace id {}, span count {}", traceId, newSpanList.size());
state.update(newSpanList);
}
//null会被过滤掉,本批次不处理
return new Tuple2<>(traceId, null);
}
// 2 state 中有的trace,合并spanlist, 清理state, return完整的trace spanlist
if (state.exists()) {
Iterator itor = state.get().iterator();
while (itor.hasNext()) {
newSpanList.add((SpanInfo) itor.next());
}
if (!state.isTimingOut()) {
//state未超时
state.remove();
logger.info("full trace with last batch spans, trace id {}, span count {} ", traceId, newSpanList.size());
return new Tuple2<String, Iterable<SpanInfo>>(traceId, newSpanList);
} else {
//state已超时
logger.info("state is timeout, can not process, trace id {}, span count {} ", traceId, newSpanList.size());
//null会被过滤掉,不处理
return new Tuple2<>(traceId, null);
}
}
// 3 state中没有的 trace,并且不在最后3秒,return完整的trace spanlist
logger.info("full trace in current batch, trace id {}, span count {} ", traceId, newSpanList.size());
return new Tuple2<String, Iterable<SpanInfo>>(traceId, newSpanList);
}
}).timeout(Durations.seconds(90)));
//过滤
JavaPairDStream<String, Iterable<SpanInfo>> currentTraceStream = mapWithStateDStream.filter(new Function<Tuple2<String, Iterable<SpanInfo>>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Iterable<SpanInfo>> v1) throws Exception {
//本周期最后3秒收到的trace,等到下个周期再处理
if (v1._2() == null) {
logger.info("filter null trace (latest 3s will process on next batch) , trace id {}", v1._1());
return false;
}
logger.info("go to analyze current batch trace, trace id {}", v1._1());
return true;
}
}).mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<String, Iterable<SpanInfo>>>, String, Iterable<SpanInfo>>() {
@Override
public Iterator<Tuple2<String, Iterable<SpanInfo>>> call(Iterator<Tuple2<String, Iterable<SpanInfo>>> tuple2Iterator) throws Exception {
return tuple2Iterator;
}
});
//mapWithStateDStream.stateSnapshots state 中 保存了 1、本批次最后3秒, 2、 上个批次最后3秒(本批次未更新的trace)
JavaPairDStream<String, Iterable<SpanInfo>> lastStateTraceStream = mapWithStateDStream.stateSnapshots()
.filter(new Function<Tuple2<String, Iterable<SpanInfo>>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Iterable<SpanInfo>> v1) throws Exception {
long current = System.currentTimeMillis();
Iterator itor = v1._2().iterator();
while (itor.hasNext()) {
SpanInfo span = (SpanInfo) itor.next();
//过滤 本批次的, 只保留上个批次的
if ((current - span.getReceive()) < 60000) {
logger.info("filter latest 3s trace from state snapshot, trace id {}", v1._1());
//本批次内收到的,不处理,因为state中,1、本批次最后3秒, 2、 上个批次最后3秒(本批次未更新的trace)
return false;
}
//过滤 上个批次之前的 , 只保留 上个批次的 ,因为 state 只在 checkpoint 的时候,才进行timeout 清理
//如果不过滤,就会有多个批次的数据, 导致重复分析
if ((current - span.getReceive()) >= 120000) {
logger.info("filter last 120s trace from state snapshot, trace id {}", v1._1());
return false;
}
}
//只保留, 60秒以上, 120秒以下的 trace信息, 即上个批次遗留下来,需要处理的trace
logger.info("go to analyze last batch trace (latest 3s on last batch), trace id {}", v1._1());
return true;
}
});
// 验证 是否产生交集 , 不应该产生交集
// currentTraceStream.join(lastStateTraceStream).foreachRDD(new VoidFunction<JavaPairRDD<String, Tuple2<Iterable<SpanInfo>, Iterable<SpanInfo>>>>() {
// @Override
// public void call(JavaPairRDD<String, Tuple2<Iterable<SpanInfo>, Iterable<SpanInfo>>> stringTuple2JavaPairRDD) throws Exception {
// stringTuple2JavaPairRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<Iterable<SpanInfo>, Iterable<SpanInfo>>>>() {
// @Override
// public void call(Tuple2<String, Tuple2<Iterable<SpanInfo>, Iterable<SpanInfo>>> stringTuple2Tuple2) throws Exception {
// logger.error("join trace in 2 rdd, trace id {}", stringTuple2Tuple2._1());
// }
// });
// }
// });
//key traceid
JavaPairDStream<String, Iterable<SpanInfo>> needAnaylzeStream = currentTraceStream.union(lastStateTraceStream);
//key minuteTime.traceMD5 value 该traceMD5的一个traceid对应的信息
JavaPairDStream<String, SortedTrace> sortSpanTrace = needAnaylzeStream.flatMapToPair(analyzer.SortSpanTreeFunc);
sortSpanTrace.cache();
//1处理span+ip的耗时分布
handlerSpanIp(sortSpanTrace);
JavaPairDStream<String, ApiTraceResult> apiResultTraceDStream = sortSpanTrace.mapToPair(analyzer.ConvertTraceResultFunc) ;
apiResultTraceDStream.cache();
//2处理treemd5调用链总览
handlertraceApiAnalyzeMinutes(apiResultTraceDStream);
//3处理异常调用链信息
handlerExceptionTrace(apiResultTraceDStream);
}
private void handlerSpanIp(JavaPairDStream<String, SortedTrace> sortSpanTrace){
//key traceMD5 + ":" + spanName + ":" + minuteStart + ":" + ip
JavaPairDStream<String, SpanIpResult> stringSpanInfoJavaPairDStream = sortSpanTrace.flatMapToPair(new PairFlatMapFunction<Tuple2<String, SortedTrace>, String, SpanIpResult>() {
@Override
public Iterator<Tuple2<String, SpanIpResult>> call(Tuple2<String, SortedTrace> tuple2) throws Exception {
List<Tuple2<String, SpanIpResult>> resultList = Lists.newArrayList();
String minuteStart = StringUtils.split(tuple2._1, '.')[0];
String traceMD5 = StringUtils.split(tuple2._1, '.')[1];
SortedTrace sortTrace = tuple2._2;
List<SpanInfo> sortSpanList = sortTrace.getSortSpanList();
for (SpanInfo spanInfo : sortSpanList) {
String ip = "";
//只有root和resttemplate才会生成ip
if(spanInfo.getTraceid().equals(spanInfo.getSpanid())){
ip = spanInfo.getIp();
}else{
if(StringUtils.isNotBlank(spanInfo.getHttpHost())){
ip = spanInfo.getHttpHost();
}
}
if(StringUtils.isBlank(ip)){
continue;
}
SpanIpResult spanIpResult = new SpanIpResult();
String spanName = spanInfo.getName();
long duration = spanInfo.getEnd() - spanInfo.getBegin();
spanIpResult.setAvgDuration(duration);
spanIpResult.setIp(ip);
spanIpResult.setTimes(1);
String key = traceMD5 + ":" + spanName + ":" + minuteStart + ":" + ip;
Tuple2<String, SpanIpResult> resultTuple2 = new Tuple2<>(key, spanIpResult);
resultList.add(resultTuple2);
}
return resultList.iterator();
}
});
JavaPairDStream<String, SpanIpResult> stringSpanIpResultJavaPairDStream = stringSpanInfoJavaPairDStream.reduceByKey(new Function2<SpanIpResult, SpanIpResult, SpanIpResult>() {
@Override
public SpanIpResult call(SpanIpResult v1, SpanIpResult v2) throws Exception {
SpanIpResult result = new SpanIpResult();
result.setIp(v1.getIp());
result.setTimes(v1.getTimes() + v2.getTimes());
result.setAvgDuration((v1.getAvgDuration() * v1.getTimes() + v2.getAvgDuration() * v2.getTimes()) / result.getTimes());
return result;
}
});
stringSpanIpResultJavaPairDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, SpanIpResult>>() {
@Override
public void call(JavaPairRDD<String, SpanIpResult> stringSpanIpResultJavaPairRDD) throws Exception {
stringSpanIpResultJavaPairRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, SpanIpResult>>>() {
@Override
public void call(Iterator<Tuple2<String, SpanIpResult>> tuple2Iterator) throws Exception {
HTable resultTable = null;
try {
if (resultTable == null) {
resultTable = (HTable) HBasePool.getConnection().getTable(TableName.valueOf("span_ip"));
}
if(tuple2Iterator == null){
return;
}
List<Put> putList = new ArrayList<>();
while(tuple2Iterator.hasNext()){
Tuple2<String, SpanIpResult> next = tuple2Iterator.next();
String key = next._1;
SpanIpResult spanIpResult = next._2;
Put put = new Put(Bytes.toBytes(key));
put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("avg_duration"), Bytes.toBytes(spanIpResult.getAvgDuration()));
put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("times"), Bytes.toBytes(spanIpResult.getTimes()));
put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("ip"), Bytes.toBytes(spanIpResult.getIp()));
putList.add(put);
}
resultTable.put(putList);
} catch (Exception e) {
logger.error(e.getMessage(),e);
} finally {
if (resultTable != null)
resultTable.close();
}
}
});
}
});
}
private void handlertraceApiAnalyzeMinutes(JavaPairDStream<String, ApiTraceResult> apiResultTraceDStream){
//根据traceMD5分组计算value
JavaPairDStream<String, ApiTraceResult> resultDStream = apiResultTraceDStream.reduceByKey(analyzer.ReduceFunc) ;
resultDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, ApiTraceResult>>() {
@Override
public void call(JavaPairRDD<String, ApiTraceResult> apiResultRdd) throws Exception {
ApiStatisticsResultStore.store(apiResultRdd, "trace_api_analyze_minutes");
}
});
}
private void handlerExceptionTrace(JavaPairDStream<String, ApiTraceResult> apiResultTraceDStream){
JavaPairDStream<String, ApiTraceResult> filter = apiResultTraceDStream.filter(new Function<Tuple2<String, ApiTraceResult>, Boolean>() {
@Override
public Boolean call(Tuple2<String, ApiTraceResult> stringSortedTraceTuple2) throws Exception {
if(stringSortedTraceTuple2._2.isErrorStatus()){
return true;
}else{
return false;
}
}
});
filter.foreachRDD(new VoidFunction<JavaPairRDD<String, ApiTraceResult>>() {
@Override
public void call(JavaPairRDD<String, ApiTraceResult> stringSortedTraceJavaPairRDD) throws Exception {
stringSortedTraceJavaPairRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, ApiTraceResult>>>() {
@Override
public void call(Iterator<Tuple2<String, ApiTraceResult>> tuple2Iterator) throws Exception {
HTable resultTable = null;
try {
if(tuple2Iterator == null){
return;
}
if (resultTable == null) {
resultTable = (HTable) HBasePool.getConnection().getTable(TableName.valueOf("exception_trace"));
}
List<Put> putList = new ArrayList<>();
while(tuple2Iterator.hasNext()){
Tuple2<String, ApiTraceResult> next = tuple2Iterator.next();
ApiTraceResult apiTraceResult = next._2;
Put put = new Put(Bytes.toBytes(apiTraceResult.getTraceStartTime()/1000 + ":" + apiTraceResult.getTraceId()));
put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("spans"), Bytes.toBytes(JSONObject.toJSONString(apiTraceResult.getSpans())));
put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("traceid"), Bytes.toBytes(apiTraceResult.getTraceId()));
put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("starttime"), Bytes.toBytes(apiTraceResult.getTraceStartTime()/1000));
putList.add(put);
}
resultTable.put(putList);
} catch (Exception e) {
logger.error(e.getMessage(),e);
} finally {
if (resultTable != null)
resultTable.close();
}
}
});
}
});
}
}