TraceSpanStore.java 3.66 KB
package com.yoho.trace.store;

import com.alibaba.fastjson.JSONObject;
import com.yoho.trace.sleuth.Span;
import com.yoho.trace.sleuth.Spans;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import properties.PropertiesFactory;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
 * Created by markeloff on 2017/7/26.
 */
public class TraceSpanStore implements Serializable {

    private static final Logger logger = LoggerFactory.getLogger(TraceSpanStore.class) ;


    public static int store(Spans spans, HTable traceTable) {

        int count = 0 ;

        if (spans != null && CollectionUtils.isNotEmpty(spans.getSpans())) {
            try {

                List<Span> spanList = spans.getSpans() ;
                for(int i=0; i<spanList.size(); i++){

                    Span span = spanList.get(i);
                    StringBuffer sb = new StringBuffer(64);

                    String logEvent = "none" ;
                    if( CollectionUtils.isNotEmpty(span.logs()) ){
                        logEvent = span.logs().get(0).getEvent() ;
                    }

//                    String rowkey = sb.append( (span.getBegin()/1000) + ":" + span.idToHex(span.getTraceId())).append(":").append(span.idToHex(span.getSpanId()))
//                            .append(":").append(logEvent).toString() ;

                    String rowkey = sb.append( span.idToHex(span.getTraceId()) + ":" + (span.getBegin()/1000) ).append(":").append(span.idToHex(span.getSpanId()))
                            .append(":").append(logEvent).toString() ;

                    Put put = new Put(Bytes.toBytes(rowkey)) ;

                    put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("service"),Bytes.toBytes( spans.getHost().getServiceName() )) ;
                    put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("event"),Bytes.toBytes( logEvent )) ;
                    put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("ip"),Bytes.toBytes( spans.getHost().getAddress() )) ;

                    put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("name"),Bytes.toBytes(span.getName())) ;
                    put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("traceid"),Bytes.toBytes( Span.idToHex(span.getTraceId()) )) ;
                    put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("spanid"),Bytes.toBytes( Span.idToHex(span.getSpanId()) )) ;
                    put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("begin"),Bytes.toBytes( span.getBegin() )) ;
                    put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("end"),Bytes.toBytes( span.getEnd() )) ;

                    if( CollectionUtils.isNotEmpty( span.getParents())) {
                        put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("parent"), Bytes.toBytes(Span.idToHex(span.getParents().get(span.getParents().size() - 1)))) ;
                    }
                    put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("logs"),Bytes.toBytes(JSONObject.toJSONString(span.logs()))) ;
                    put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("tags"),Bytes.toBytes(JSONObject.toJSONString(span.tags()))) ;

                    traceTable.put(put);
                    count++ ;
                }

            }catch(Exception e){
                logger.error("store to hbase failed, spand {} ", JSONObject.toJSONString(spans),e);
            }
        }
        return count ;
    }

}