ApiStatisticsResultStore.java 5.79 KB
package com.yoho.trace.store;

import com.alibaba.fastjson.JSONObject;
import com.yoho.trace.anaylzer.model.ApiTraceResult;
import com.yoho.trace.anaylzer.model.SpanResult;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import properties.PropertiesFactory;
import scala.Tuple2;

import java.util.List;

/**
 * Created by markeloff on 2017/7/26.
 */
public class ApiStatisticsResultStore {

    private static final Logger logger = LoggerFactory.getLogger(ApiStatisticsResultStore.class);


    public static void store(JavaPairRDD<String, ApiTraceResult> apiResultRdd, String tableName) {

        logger.info("store api statistics result to hbase ");

        //调试打印
        if (logger.isDebugEnabled()) {
            apiResultRdd.foreach(
                    new VoidFunction<Tuple2<String, ApiTraceResult>>() {
                        @Override
                        public void call(Tuple2<String, ApiTraceResult> tuple2) throws Exception {

                            ApiTraceResult result = tuple2._2();

                            logger.debug("++++++++++++++++++++++++++++++++++++++++++++");
                            logger.debug("                    ");
                            logger.debug("API:" + result.getApiName());
                            logger.debug("            Avg Response(ms): " + result.getDuration() + " ms");
                            logger.debug("            Times: " + result.getCallTimes());
                            logger.debug("            max latency: " + result.getMaxLatencyTrace() + "  " + result.getMaxLatency() + " ms");
                            logger.debug("            min latency: " + result.getMinLatencyTrace() + "  " + result.getMinLatency() + " ms");
                            logger.debug("            Sample Trace: ");

                            List<SpanResult> list = result.getSpans();
                            for (int i = 0; i < list.size(); i++) {

                                StringBuffer sb = new StringBuffer();
                                for (int n = 0; n < list.get(i).getLevel(); n++) {
                                    sb.append("    ");
                                }
                                logger.debug("                    " + i + ":" + sb.toString() + list.get(i).getSpanName()
                                        + " (" + String.valueOf(list.get(i).getDuration()) + "ms) "
                                        + " id:" + list.get(i).getSpanId()
                                        + " parent:" + list.get(i).getParent());
                            }

                            Thread.sleep(200);

                        }
                    }
            );
        }

        //insert data
        apiResultRdd.foreach(
                new VoidFunction<Tuple2<String, ApiTraceResult>>() {
                    @Override
                    public void call(Tuple2<String, ApiTraceResult> tuple2) throws Exception {

                        ApiTraceResult result = tuple2._2();
                        HTable resultTable = null;
                        try {
                            if (resultTable == null) {
                                resultTable = (HTable) HBasePool.getConnection().getTable(TableName.valueOf(tableName));
                            }

                            String rowkey = result.getApiName() + ":" + result.getTraceMd5();
                            Put put = new Put(Bytes.toBytes(rowkey));
                            put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("api"), Bytes.toBytes(result.getApiName()));
                            String[] md5Tags = StringUtils.split(result.getTraceMd5(), '.');
                            if (null == md5Tags || 2 != md5Tags.length) {
                                put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("traceMd5"), Bytes.toBytes(StringUtils.EMPTY));
                            } else {
                                put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("traceMd5"), Bytes.toBytes(md5Tags[1]));
                            }
                            put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("duration"), Bytes.toBytes(result.getDuration()));
                            put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("times"), Bytes.toBytes(result.getCallTimes()));
                            put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("maxLatency"), Bytes.toBytes(result.getMaxLatency()));
                            put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("maxLatencyTrace"), Bytes.toBytes(result.getMaxLatencyTrace()));
                            put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("minLatency"), Bytes.toBytes(result.getMinLatency()));
                            put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("minLatencyTrace"), Bytes.toBytes(result.getMinLatencyTrace()));
                            put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("spans"), Bytes.toBytes(JSONObject.toJSONString(result.getSpans())));

                            logger.info("put data to hbase, {}", put);

                            resultTable.put(put);

                        } catch (Exception e) {
                            logger.error("store ApiTraceResult failed, result {} ", result, e);
                        } finally {
                            if (resultTable != null)
                                resultTable.close();
                        }
                    }
                }
        );
    }

}