ApiStatisticsResultStore.java 5.91 KB
package com.yoho.trace.store;

import com.alibaba.fastjson.JSONObject;
import com.yoho.trace.anaylzer.model.ApiTraceResult;
import com.yoho.trace.anaylzer.model.SpanResult;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
 * Created by markeloff on 2017/7/26.
 */
public class ApiStatisticsResultStore {

    private static final Logger logger = LoggerFactory.getLogger(ApiStatisticsResultStore.class);


    public static void store(JavaPairRDD<String, ApiTraceResult> apiResultRdd, String tableName) {

        logger.info("store api statistics result to hbase ");

        //调试打印
        if (logger.isDebugEnabled()) {
            apiResultRdd.foreach(
                    new VoidFunction<Tuple2<String, ApiTraceResult>>() {
                        @Override
                        public void call(Tuple2<String, ApiTraceResult> tuple2) throws Exception {

                            ApiTraceResult result = tuple2._2();

                            logger.debug("++++++++++++++++++++++++++++++++++++++++++++");
                            logger.debug("                    ");
                            logger.debug("API:" + result.getApiName());
                            logger.debug("            Avg Response(ms): " + result.getDuration() + " ms");
                            logger.debug("            Times: " + result.getCallTimes());
                            logger.debug("            max latency: " + result.getMaxLatencyTrace() + "  " + result.getMaxLatency() + " ms");
                            logger.debug("            min latency: " + result.getMinLatencyTrace() + "  " + result.getMinLatency() + " ms");
                            logger.debug("            Sample Trace: ");

                            List<SpanResult> list = result.getSpans();
                            for (int i = 0; i < list.size(); i++) {

                                StringBuffer sb = new StringBuffer();
                                for (int n = 0; n < list.get(i).getLevel(); n++) {
                                    sb.append("    ");
                                }
                                logger.debug("                    " + i + ":" + sb.toString() + list.get(i).getSpanName()
                                        + " (" + String.valueOf(list.get(i).getDuration()) + "ms) "
                                        + " id:" + list.get(i).getSpanId()
                                        + " parent:" + list.get(i).getParent());
                            }

                            Thread.sleep(200);

                        }
                    }
            );
        }



        apiResultRdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, ApiTraceResult>>>() {
            @Override
            public void call(Iterator<Tuple2<String, ApiTraceResult>> tuple2Iterator) throws Exception {
                HTable resultTable = null;
                try {
                    if (resultTable == null) {
                        resultTable = (HTable) HBasePool.getConnection().getTable(TableName.valueOf(tableName));
                    }

                    if(tuple2Iterator == null){
                        return;
                    }

                    List<Put> putList = new ArrayList<>();
                    while(tuple2Iterator.hasNext()){
                        Tuple2<String, ApiTraceResult> next = tuple2Iterator.next();
                        ApiTraceResult apiTraceResult = next._2;
                        String rowkey = apiTraceResult.getApiName() + ":" + apiTraceResult.getTraceMd5();
                        Put put = new Put(Bytes.toBytes(rowkey));
                        put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("api"), Bytes.toBytes(apiTraceResult.getApiName()));
                        String[] md5Tags = StringUtils.split(apiTraceResult.getTraceMd5(), '.');
                        if (null == md5Tags || 2 != md5Tags.length) {
                            put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("traceMd5"), Bytes.toBytes(StringUtils.EMPTY));
                        } else {
                            put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("traceMd5"), Bytes.toBytes(md5Tags[1]));
                        }
                        put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("duration"), Bytes.toBytes(apiTraceResult.getDuration()));
                        put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("times"), Bytes.toBytes(apiTraceResult.getCallTimes()));
                        put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("maxLatency"), Bytes.toBytes(apiTraceResult.getMaxLatency()));
                        put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("maxLatencyTrace"), Bytes.toBytes(apiTraceResult.getMaxLatencyTrace()));
                        put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("minLatency"), Bytes.toBytes(apiTraceResult.getMinLatency()));
                        put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("minLatencyTrace"), Bytes.toBytes(apiTraceResult.getMinLatencyTrace()));
                        put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("spans"), Bytes.toBytes(JSONObject.toJSONString(apiTraceResult.getSpans())));
                        putList.add(put);
                    }

                    resultTable.put(putList);

                } catch (Exception e) {
                    logger.error(e.getMessage(),e);
                } finally {
                    if (resultTable != null)
                        resultTable.close();
                }
            }
        });

    }

}