TraceAnalyzeHandler.java 21.7 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446
package com.yoho.trace.online.handler;

import com.alibaba.fastjson.JSONObject;
import com.clearspring.analytics.util.Lists;
import com.yoho.trace.anaylzer.ApiStatisticsAnalyzer;
import com.yoho.trace.anaylzer.model.ApiTraceResult;
import com.yoho.trace.anaylzer.model.SortedTrace;
import com.yoho.trace.anaylzer.model.SpanInfo;
import com.yoho.trace.anaylzer.model.SpanIpResult;
import com.yoho.trace.sleuth.Span;
import com.yoho.trace.sleuth.Spans;
import com.yoho.trace.store.ApiStatisticsResultStore;
import com.yoho.trace.store.HBasePool;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.*;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.State;
import org.apache.spark.streaming.StateSpec;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

import java.io.Serializable;
import java.util.*;

/**
 * Created by markeloff on 2017/7/26.
 */
public class TraceAnalyzeHandler implements TraceHandler, Serializable {

    private ApiStatisticsAnalyzer analyzer;

    private static Logger logger = LoggerFactory.getLogger(TraceAnalyzeHandler.class);

    public TraceAnalyzeHandler() {
        analyzer = new ApiStatisticsAnalyzer(false);
    }

    private static final int interval = 3000;  //3s

    @Override
    public void handle(final JavaDStream<Spans> kafkaMsgDStream) {

        // key 为 traceid, value 为 每个 span
        JavaPairDStream<String, SpanInfo> spanPairDStream = kafkaMsgDStream.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Spans>, String, SpanInfo>() {

            @Override
            public Iterator<Tuple2<String, SpanInfo>> call(Iterator<Spans> spansIterator) throws Exception {

                long enter = System.currentTimeMillis();
                List<Tuple2<String, SpanInfo>> spanInfoList = new ArrayList<>(500);
                while (spansIterator.hasNext()) {
                    Spans spans = spansIterator.next();
                    spans.getHost();
                    List<Span> list = spans.getSpans();
                    Iterator<Span> itor = list.iterator();
                    while (itor.hasNext()) {
                        Span span = itor.next();

                        String logEvent = "none";
                        if (CollectionUtils.isNotEmpty(span.logs())) {
                            logEvent = span.logs().get(0).getEvent();
                        }

                        SpanInfo spanInfo = new SpanInfo();
                        spanInfo.setName(span.getName());
                        spanInfo.setBegin(span.getBegin());
                        spanInfo.setEnd(span.getEnd());
                        spanInfo.setDuration(span.getEnd() - span.getBegin());
                        spanInfo.setTraceid(Span.idToHex(span.getTraceId()));
                        spanInfo.setSpanid(Span.idToHex(span.getSpanId()));
                        if (span.getParents().size() > 0) {
                            spanInfo.setParent(Span.idToHex(span.getParents().get(span.getParents().size() - 1)));
                        }
                        spanInfo.setService(spans.getHost().getServiceName());
                        spanInfo.setEndpoint(logEvent);
                        spanInfo.setIp(spans.getHost().getAddress());
                        spanInfo.setReceive(spans.getReceive());
                        if(span.tags()!=null){
                            spanInfo.setHttpHost(span.tags().get("http.host"));
                            //标记span是否是是异常span
                            if(StringUtils.isNotBlank(span.tags().get("error"))){
                                spanInfo.setErrorStatus(true);
                            }
                        }

                        spanInfoList.add(new Tuple2<>(spanInfo.getTraceid(), spanInfo));
                    }
                }

                logger.info("kafkaMsgDStream.mapPartitionsToPair elapse {}, size {} ", System.currentTimeMillis() - enter, spanInfoList.size());

                return spanInfoList.iterator();
            }
        });


        //把 span 按照 traceid 进行分组,  并过滤 120s 以前的 trace,不进行分析
        JavaPairDStream<String, Iterable<SpanInfo>> tracePairDStream = spanPairDStream.groupByKey().filter(new Function<Tuple2<String, Iterable<SpanInfo>>, Boolean>() {
            @Override
            public Boolean call(Tuple2<String, Iterable<SpanInfo>> v1) throws Exception {
                Iterator<SpanInfo> itr = v1._2().iterator();
                long current = System.currentTimeMillis();
                if (itr.hasNext()) {
                    SpanInfo span = itr.next();
                    //过滤过早的调用信息,不进行分析
                    if (current - span.getEnd() >= 120000) {
                        logger.info("filter early trace , don't anaylze it, trace id {}", v1._1());
                        return false;
                    }
                }
                return true;
            }
        });


        //返回的 dstream (mapWithStateDStream)中,是立即进行处理的, state 中保存的是 上个周期的最后3秒和本周期的最后3秒
        JavaMapWithStateDStream mapWithStateDStream = tracePairDStream.mapWithState(StateSpec.function(new Function3<String, Optional<Iterable<SpanInfo>>,
                State<Iterable<SpanInfo>>, Tuple2<String, Iterable<SpanInfo>>>() {
            @Override
            public Tuple2<String, Iterable<SpanInfo>> call(String traceId, Optional<Iterable<SpanInfo>> current, State<Iterable<SpanInfo>> state) throws Exception {
                boolean isLast3sRecv = false;
                List<SpanInfo> newSpanList = new ArrayList<SpanInfo>();
                if (current.isPresent()) {
                    Iterator itor = current.get().iterator();
                    while (itor.hasNext()) {
                        SpanInfo info = (SpanInfo) itor.next();
                        if ((info.getReceive() - info.getEnd()) < interval) {
                            isLast3sRecv = true;
                        }
                        newSpanList.add(info);
                    }
                }
                // 1 只要有span出现在本批次最后3秒,全部更新的state中,不处理, return 空
                if (isLast3sRecv) {
                    if (CollectionUtils.isNotEmpty(newSpanList)) {
                        logger.info("latest 3s trace in current batch, just state, trace id {}, span count {}", traceId, newSpanList.size());
                        state.update(newSpanList);
                    }
                    //null会被过滤掉,本批次不处理
                    return new Tuple2<>(traceId, null);
                }
                // 2 state 中有的trace,合并spanlist, 清理state, return完整的trace spanlist
                if (state.exists()) {
                    Iterator itor = state.get().iterator();
                    while (itor.hasNext()) {
                        newSpanList.add((SpanInfo) itor.next());
                    }

                    if (!state.isTimingOut()) {
                        //state未超时
                        state.remove();
                        logger.info("full trace with last batch spans, trace id {}, span count {} ", traceId, newSpanList.size());
                        return new Tuple2<String, Iterable<SpanInfo>>(traceId, newSpanList);
                    } else {
                        //state已超时
                        logger.info("state is timeout, can not process, trace id {}, span count {} ", traceId, newSpanList.size());
                        //null会被过滤掉,不处理
                        return new Tuple2<>(traceId, null);
                    }
                }

                // 3 state中没有的 trace,并且不在最后3秒,return完整的trace spanlist
                logger.info("full trace in current batch, trace id {}, span count {} ", traceId, newSpanList.size());
                return new Tuple2<String, Iterable<SpanInfo>>(traceId, newSpanList);

            }

        }).timeout(Durations.seconds(90)));

        //过滤
        JavaPairDStream<String, Iterable<SpanInfo>> currentTraceStream = mapWithStateDStream.filter(new Function<Tuple2<String, Iterable<SpanInfo>>, Boolean>() {
            @Override
            public Boolean call(Tuple2<String, Iterable<SpanInfo>> v1) throws Exception {
                //本周期最后3秒收到的trace,等到下个周期再处理
                if (v1._2() == null) {
                    logger.info("filter null trace (latest 3s will process on next batch) , trace id {}", v1._1());
                    return false;
                }
                logger.info("go to analyze current batch trace, trace id {}", v1._1());
                return true;
            }
        }).mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<String, Iterable<SpanInfo>>>, String, Iterable<SpanInfo>>() {
            @Override
            public Iterator<Tuple2<String, Iterable<SpanInfo>>> call(Iterator<Tuple2<String, Iterable<SpanInfo>>> tuple2Iterator) throws Exception {
                return tuple2Iterator;
            }
        });


        //mapWithStateDStream.stateSnapshots    state 中 保存了 1、本批次最后3秒, 2、 上个批次最后3秒(本批次未更新的trace)
        JavaPairDStream<String, Iterable<SpanInfo>> lastStateTraceStream = mapWithStateDStream.stateSnapshots()
                .filter(new Function<Tuple2<String, Iterable<SpanInfo>>, Boolean>() {
                    @Override
                    public Boolean call(Tuple2<String, Iterable<SpanInfo>> v1) throws Exception {

                        long current = System.currentTimeMillis();
                        Iterator itor = v1._2().iterator();
                        while (itor.hasNext()) {
                            SpanInfo span = (SpanInfo) itor.next();
                            //过滤 本批次的, 只保留上个批次的
                            if ((current - span.getReceive()) < 60000) {
                                logger.info("filter latest 3s trace from state snapshot, trace id {}", v1._1());
                                //本批次内收到的,不处理,因为state中,1、本批次最后3秒, 2、 上个批次最后3秒(本批次未更新的trace)
                                return false;
                            }
                            //过滤 上个批次之前的 , 只保留 上个批次的 ,因为 state 只在 checkpoint 的时候,才进行timeout 清理
                            //如果不过滤,就会有多个批次的数据, 导致重复分析
                            if ((current - span.getReceive()) >= 120000) {
                                logger.info("filter last 120s trace from state snapshot, trace id {}", v1._1());
                                return false;
                            }
                        }
                        //只保留, 60秒以上, 120秒以下的 trace信息, 即上个批次遗留下来,需要处理的trace
                        logger.info("go to analyze last batch trace (latest 3s on last batch), trace id {}", v1._1());
                        return true;
                    }
                });


        // 验证 是否产生交集 , 不应该产生交集
//        currentTraceStream.join(lastStateTraceStream).foreachRDD(new VoidFunction<JavaPairRDD<String, Tuple2<Iterable<SpanInfo>, Iterable<SpanInfo>>>>() {
//            @Override
//            public void call(JavaPairRDD<String, Tuple2<Iterable<SpanInfo>, Iterable<SpanInfo>>> stringTuple2JavaPairRDD) throws Exception {
//                stringTuple2JavaPairRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<Iterable<SpanInfo>, Iterable<SpanInfo>>>>() {
//                    @Override
//                    public void call(Tuple2<String, Tuple2<Iterable<SpanInfo>, Iterable<SpanInfo>>> stringTuple2Tuple2) throws Exception {
//                        logger.error("join trace in 2 rdd, trace id {}", stringTuple2Tuple2._1());
//                    }
//                });
//            }
//        });


        //key traceid
        JavaPairDStream<String, Iterable<SpanInfo>> needAnaylzeStream = currentTraceStream.union(lastStateTraceStream);

        //key minuteTime.traceMD5 value 该traceMD5的一个traceid对应的信息
        JavaPairDStream<String, SortedTrace> sortSpanTrace = needAnaylzeStream.flatMapToPair(analyzer.SortSpanTreeFunc);

        sortSpanTrace.cache();

        //1处理span+ip的耗时分布
        handlerSpanIp(sortSpanTrace);

        JavaPairDStream<String, ApiTraceResult> apiResultTraceDStream = sortSpanTrace.mapToPair(analyzer.ConvertTraceResultFunc) ;
        apiResultTraceDStream.cache();

        //2处理treemd5调用链总览
        handlertraceApiAnalyzeMinutes(apiResultTraceDStream);

        //3处理异常调用链信息
        handlerExceptionTrace(apiResultTraceDStream);
    }


    private void handlerSpanIp(JavaPairDStream<String, SortedTrace> sortSpanTrace){
        //key  traceMD5 + ":" + spanName + ":" + minuteStart + ":" + ip
        JavaPairDStream<String, SpanIpResult> stringSpanInfoJavaPairDStream = sortSpanTrace.flatMapToPair(new PairFlatMapFunction<Tuple2<String, SortedTrace>, String, SpanIpResult>() {

            @Override
            public Iterator<Tuple2<String, SpanIpResult>> call(Tuple2<String, SortedTrace> tuple2) throws Exception {

                List<Tuple2<String, SpanIpResult>> resultList = Lists.newArrayList();
                String minuteStart = StringUtils.split(tuple2._1, '.')[0];
                String traceMD5 = StringUtils.split(tuple2._1, '.')[1];
                SortedTrace sortTrace = tuple2._2;
                List<SpanInfo> sortSpanList = sortTrace.getSortSpanList();
                for (SpanInfo spanInfo : sortSpanList) {

                    String ip = "";
                    //只有root和resttemplate才会生成ip
                    if(spanInfo.getTraceid().equals(spanInfo.getSpanid())){
                        ip = spanInfo.getIp();
                    }else{
                        if(StringUtils.isNotBlank(spanInfo.getHttpHost())){
                                ip = spanInfo.getHttpHost();
                        }
                    }

                    if(StringUtils.isBlank(ip)){
                        continue;
                    }

                    SpanIpResult spanIpResult = new SpanIpResult();
                    String spanName = spanInfo.getName();
                    long duration = spanInfo.getEnd() - spanInfo.getBegin();
                    spanIpResult.setAvgDuration(duration);
                    spanIpResult.setIp(ip);
                    spanIpResult.setTimes(1);
                    String key = traceMD5 + ":" + spanName + ":" + minuteStart + ":" + ip;
                    Tuple2<String, SpanIpResult> resultTuple2 = new Tuple2<>(key, spanIpResult);
                    resultList.add(resultTuple2);
                }

                return resultList.iterator();
            }
        });


        JavaPairDStream<String, SpanIpResult> stringSpanIpResultJavaPairDStream = stringSpanInfoJavaPairDStream.reduceByKey(new Function2<SpanIpResult, SpanIpResult, SpanIpResult>() {
            @Override
            public SpanIpResult call(SpanIpResult v1, SpanIpResult v2) throws Exception {
                SpanIpResult result = new SpanIpResult();
                result.setIp(v1.getIp());
                result.setTimes(v1.getTimes() + v2.getTimes());
                result.setAvgDuration((v1.getAvgDuration() * v1.getTimes() + v2.getAvgDuration() * v2.getTimes()) / result.getTimes());
                return result;
            }
        });


        stringSpanIpResultJavaPairDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, SpanIpResult>>() {
            @Override
            public void call(JavaPairRDD<String, SpanIpResult> stringSpanIpResultJavaPairRDD) throws Exception {
                stringSpanIpResultJavaPairRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, SpanIpResult>>>() {
                    @Override
                    public void call(Iterator<Tuple2<String, SpanIpResult>> tuple2Iterator) throws Exception {
                        HTable resultTable = null;
                        try {
                            if (resultTable == null) {
                                resultTable = (HTable) HBasePool.getConnection().getTable(TableName.valueOf("span_ip"));
                            }

                            if(tuple2Iterator == null){
                                return;
                            }

                            List<Put> putList = new ArrayList<>();
                            while(tuple2Iterator.hasNext()){
                                Tuple2<String, SpanIpResult> next = tuple2Iterator.next();
                                String key = next._1;
                                SpanIpResult spanIpResult = next._2;

                                Put put = new Put(Bytes.toBytes(key));
                                put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("avg_duration"), Bytes.toBytes(spanIpResult.getAvgDuration()));
                                put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("times"), Bytes.toBytes(spanIpResult.getTimes()));
                                put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("ip"), Bytes.toBytes(spanIpResult.getIp()));

                                putList.add(put);
                            }

                            resultTable.put(putList);


                        } catch (Exception e) {
                            logger.error(e.getMessage(),e);
                        } finally {
                            if (resultTable != null)
                                resultTable.close();
                        }

                    }
                });
            }
        });

    }



    private void handlertraceApiAnalyzeMinutes(JavaPairDStream<String, ApiTraceResult> apiResultTraceDStream){
        //根据traceMD5分组计算value
        JavaPairDStream<String, ApiTraceResult> resultDStream = apiResultTraceDStream.reduceByKey(analyzer.ReduceFunc) ;

        resultDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, ApiTraceResult>>() {
            @Override
            public void call(JavaPairRDD<String, ApiTraceResult> apiResultRdd) throws Exception {

                ApiStatisticsResultStore.store(apiResultRdd, "trace_api_analyze_minutes");
            }
        });
    }

    private void handlerExceptionTrace(JavaPairDStream<String, ApiTraceResult> apiResultTraceDStream){
        JavaPairDStream<String, ApiTraceResult> filter = apiResultTraceDStream.filter(new Function<Tuple2<String, ApiTraceResult>, Boolean>() {
            @Override
            public Boolean call(Tuple2<String, ApiTraceResult> stringSortedTraceTuple2) throws Exception {
                if(stringSortedTraceTuple2._2.isErrorStatus()){
                    return true;
                }else{
                    return false;
                }
            }
        });




        filter.foreachRDD(new VoidFunction<JavaPairRDD<String, ApiTraceResult>>() {
            @Override
            public void call(JavaPairRDD<String, ApiTraceResult> stringSortedTraceJavaPairRDD) throws Exception {
                stringSortedTraceJavaPairRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, ApiTraceResult>>>() {
                    @Override
                    public void call(Iterator<Tuple2<String, ApiTraceResult>> tuple2Iterator) throws Exception {
                        HTable resultTable = null;
                        try {
                            if(tuple2Iterator == null){
                                return;
                            }

                            if (resultTable == null) {
                                resultTable = (HTable) HBasePool.getConnection().getTable(TableName.valueOf("exception_trace"));
                            }

                            List<Put> putList = new ArrayList<>();
                            while(tuple2Iterator.hasNext()){
                                Tuple2<String, ApiTraceResult> next = tuple2Iterator.next();
                                ApiTraceResult apiTraceResult = next._2;

                                Put put = new Put(Bytes.toBytes(apiTraceResult.getTraceStartTime()/1000 + ":" + apiTraceResult.getTraceId()));
                                put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("spans"), Bytes.toBytes(JSONObject.toJSONString(apiTraceResult.getSpans())));
                                put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("traceid"), Bytes.toBytes(apiTraceResult.getTraceId()));
                                put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("starttime"), Bytes.toBytes(apiTraceResult.getTraceStartTime()/1000));

                                putList.add(put);
                            }

                            resultTable.put(putList);


                        } catch (Exception e) {
                            logger.error(e.getMessage(),e);
                        } finally {
                            if (resultTable != null)
                                resultTable.close();
                        }
                    }
                });
            }
        });
    }
}