TraceSpanStore.java
3.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.yoho.trace.store;
import java.io.Serializable;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import com.yoho.trace.sleuth.Span;
import com.yoho.trace.sleuth.Spans;
/**
* Created by markeloff on 2017/7/26.
*/
public class TraceSpanStore implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(TraceSpanStore.class) ;
public static int store(Spans spans, HTable traceTable) {
int count = 0 ;
if (spans != null && CollectionUtils.isNotEmpty(spans.getSpans())) {
try {
List<Span> spanList = spans.getSpans() ;
for(int i=0; i<spanList.size(); i++){
Span span = spanList.get(i);
StringBuffer sb = new StringBuffer(128);
String logEvent = "none" ;
if( CollectionUtils.isNotEmpty(span.logs()) ){
logEvent = span.logs().get(0).getEvent() ;
}
String rowkey = sb.append( span.idToHex(span.getTraceId()) + ":" + (span.getBegin()/1000) ).append(":").append(span.idToHex(span.getSpanId()))
.append(":").append(logEvent).toString() ;
Put put = new Put(Bytes.toBytes(rowkey)) ;
//其实不推荐关闭WAL,不过关了的确可以提升性能...因为HBase在写数据前会先写WAL,以保证在异常情况下,HBase可以按照WAL的记录来恢复还未持久化的数据。
put.setDurability(Durability.SKIP_WAL);
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("service"),Bytes.toBytes( spans.getHost().getServiceName() )) ;
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("event"),Bytes.toBytes( logEvent )) ;
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("ip"),Bytes.toBytes( spans.getHost().getAddress() )) ;
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("name"),Bytes.toBytes(span.getName())) ;
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("traceid"),Bytes.toBytes( Span.idToHex(span.getTraceId()) )) ;
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("spanid"),Bytes.toBytes( Span.idToHex(span.getSpanId()) )) ;
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("begin"),Bytes.toBytes( span.getBegin() )) ;
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("end"),Bytes.toBytes( span.getEnd() )) ;
if( CollectionUtils.isNotEmpty( span.getParents())) {
put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("parent"), Bytes.toBytes(Span.idToHex(span.getParents().get(span.getParents().size() - 1)))) ;
}
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("logs"),Bytes.toBytes(JSONObject.toJSONString(span.logs()))) ;
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("tags"),Bytes.toBytes(JSONObject.toJSONString(span.tags()))) ;
traceTable.put(put);
count++ ;
}
}catch(Exception e){
logger.error("store to hbase failed, spand {} ", JSONObject.toJSONString(spans),e);
}
}
return count ;
}
}