Authored by 谢勇

性能优化

@@ -8,6 +8,7 @@ import java.util.List; @@ -8,6 +8,7 @@ import java.util.List;
8 import org.apache.commons.collections.CollectionUtils; 8 import org.apache.commons.collections.CollectionUtils;
9 import org.apache.commons.lang3.StringUtils; 9 import org.apache.commons.lang3.StringUtils;
10 import org.apache.hadoop.hbase.TableName; 10 import org.apache.hadoop.hbase.TableName;
  11 +import org.apache.hadoop.hbase.client.Durability;
11 import org.apache.hadoop.hbase.client.HTable; 12 import org.apache.hadoop.hbase.client.HTable;
12 import org.apache.hadoop.hbase.client.Put; 13 import org.apache.hadoop.hbase.client.Put;
13 import org.apache.hadoop.hbase.util.Bytes; 14 import org.apache.hadoop.hbase.util.Bytes;
@@ -342,6 +343,8 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable { @@ -342,6 +343,8 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable {
342 SpanIpResult spanIpResult = next._2; 343 SpanIpResult spanIpResult = next._2;
343 344
344 Put put = new Put(Bytes.toBytes(key)); 345 Put put = new Put(Bytes.toBytes(key));
  346 + //其实不推荐关闭WAL,不过关了的确可以提升性能...因为HBase在写数据前会先写WAL,以保证在异常情况下,HBase可以按照WAL的记录来恢复还未持久化的数据。
  347 + put.setDurability(Durability.SKIP_WAL);
345 put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("avg_duration"), Bytes.toBytes(spanIpResult.getAvgDuration())); 348 put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("avg_duration"), Bytes.toBytes(spanIpResult.getAvgDuration()));
346 put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("times"), Bytes.toBytes(spanIpResult.getTimes())); 349 put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("times"), Bytes.toBytes(spanIpResult.getTimes()));
347 put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("ip"), Bytes.toBytes(spanIpResult.getIp())); 350 put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("ip"), Bytes.toBytes(spanIpResult.getIp()));
@@ -418,12 +421,16 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable { @@ -418,12 +421,16 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable {
418 ApiTraceResult apiTraceResult = next._2; 421 ApiTraceResult apiTraceResult = next._2;
419 422
420 Put put = new Put(Bytes.toBytes(apiTraceResult.getTraceStartTime()/1000 + ":" + apiTraceResult.getTraceId())); 423 Put put = new Put(Bytes.toBytes(apiTraceResult.getTraceStartTime()/1000 + ":" + apiTraceResult.getTraceId()));
  424 + //其实不推荐关闭WAL,不过关了的确可以提升性能...因为HBase在写数据前会先写WAL,以保证在异常情况下,HBase可以按照WAL的记录来恢复还未持久化的数据。
  425 + put.setDurability(Durability.SKIP_WAL);
421 put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("spans"), Bytes.toBytes(JSONObject.toJSONString(apiTraceResult.getSpans()))); 426 put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("spans"), Bytes.toBytes(JSONObject.toJSONString(apiTraceResult.getSpans())));
422 put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("traceid"), Bytes.toBytes(apiTraceResult.getTraceId())); 427 put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("traceid"), Bytes.toBytes(apiTraceResult.getTraceId()));
423 put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("starttime"), Bytes.toBytes(apiTraceResult.getTraceStartTime()/1000)); 428 put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("starttime"), Bytes.toBytes(apiTraceResult.getTraceStartTime()/1000));
424 429
425 430
426 Put put2 = new Put(Bytes.toBytes(apiTraceResult.getApiName() + ":" + apiTraceResult.getTraceStartTime()/1000 + ":" + apiTraceResult.getTraceId())); 431 Put put2 = new Put(Bytes.toBytes(apiTraceResult.getApiName() + ":" + apiTraceResult.getTraceStartTime()/1000 + ":" + apiTraceResult.getTraceId()));
  432 + //其实不推荐关闭WAL,不过关了的确可以提升性能...因为HBase在写数据前会先写WAL,以保证在异常情况下,HBase可以按照WAL的记录来恢复还未持久化的数据。
  433 + put2.setDurability(Durability.SKIP_WAL);
427 put2.addColumn(Bytes.toBytes("data"), Bytes.toBytes("spans"), Bytes.toBytes(JSONObject.toJSONString(apiTraceResult.getSpans()))); 434 put2.addColumn(Bytes.toBytes("data"), Bytes.toBytes("spans"), Bytes.toBytes(JSONObject.toJSONString(apiTraceResult.getSpans())));
428 put2.addColumn(Bytes.toBytes("data"), Bytes.toBytes("traceid"), Bytes.toBytes(apiTraceResult.getTraceId())); 435 put2.addColumn(Bytes.toBytes("data"), Bytes.toBytes("traceid"), Bytes.toBytes(apiTraceResult.getTraceId()));
429 put2.addColumn(Bytes.toBytes("data"), Bytes.toBytes("starttime"), Bytes.toBytes(apiTraceResult.getTraceStartTime()/1000)); 436 put2.addColumn(Bytes.toBytes("data"), Bytes.toBytes("starttime"), Bytes.toBytes(apiTraceResult.getTraceStartTime()/1000));
@@ -474,6 +481,8 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable { @@ -474,6 +481,8 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable {
474 String[] md5Tags = StringUtils.split(apiTraceResult.getTraceMd5(), '.'); 481 String[] md5Tags = StringUtils.split(apiTraceResult.getTraceMd5(), '.');
475 482
476 Put put1 = new Put(Bytes.toBytes( md5Tags[1] + ":" + apiTraceResult.getTraceStartTime()/1000 + ":" + apiTraceResult.getTraceId())); 483 Put put1 = new Put(Bytes.toBytes( md5Tags[1] + ":" + apiTraceResult.getTraceStartTime()/1000 + ":" + apiTraceResult.getTraceId()));
  484 + //其实不推荐关闭WAL,不过关了的确可以提升性能...因为HBase在写数据前会先写WAL,以保证在异常情况下,HBase可以按照WAL的记录来恢复还未持久化的数据。
  485 + put1.setDurability(Durability.SKIP_WAL);
477 put1.addColumn(Bytes.toBytes("data"), Bytes.toBytes("spans"), Bytes.toBytes(JSONObject.toJSONString(apiTraceResult.getSpans()))); 486 put1.addColumn(Bytes.toBytes("data"), Bytes.toBytes("spans"), Bytes.toBytes(JSONObject.toJSONString(apiTraceResult.getSpans())));
478 put1.addColumn(Bytes.toBytes("data"), Bytes.toBytes("traceid"), Bytes.toBytes(apiTraceResult.getTraceId())); 487 put1.addColumn(Bytes.toBytes("data"), Bytes.toBytes("traceid"), Bytes.toBytes(apiTraceResult.getTraceId()));
479 put1.addColumn(Bytes.toBytes("data"), Bytes.toBytes("starttime"), Bytes.toBytes(apiTraceResult.getTraceStartTime()/1000)); 488 put1.addColumn(Bytes.toBytes("data"), Bytes.toBytes("starttime"), Bytes.toBytes(apiTraceResult.getTraceStartTime()/1000));
@@ -482,6 +491,8 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable { @@ -482,6 +491,8 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable {
482 491
483 492
484 Put put2 = new Put(Bytes.toBytes( apiTraceResult.getApiName() + ":" + apiTraceResult.getTraceStartTime()/1000 + ":" + apiTraceResult.getTraceId())); 493 Put put2 = new Put(Bytes.toBytes( apiTraceResult.getApiName() + ":" + apiTraceResult.getTraceStartTime()/1000 + ":" + apiTraceResult.getTraceId()));
  494 + //其实不推荐关闭WAL,不过关了的确可以提升性能...因为HBase在写数据前会先写WAL,以保证在异常情况下,HBase可以按照WAL的记录来恢复还未持久化的数据。
  495 + put2.setDurability(Durability.SKIP_WAL);
485 put2.addColumn(Bytes.toBytes("data"), Bytes.toBytes("spans"), Bytes.toBytes(JSONObject.toJSONString(apiTraceResult.getSpans()))); 496 put2.addColumn(Bytes.toBytes("data"), Bytes.toBytes("spans"), Bytes.toBytes(JSONObject.toJSONString(apiTraceResult.getSpans())));
486 put2.addColumn(Bytes.toBytes("data"), Bytes.toBytes("traceid"), Bytes.toBytes(apiTraceResult.getTraceId())); 497 put2.addColumn(Bytes.toBytes("data"), Bytes.toBytes("traceid"), Bytes.toBytes(apiTraceResult.getTraceId()));
487 put2.addColumn(Bytes.toBytes("data"), Bytes.toBytes("starttime"), Bytes.toBytes(apiTraceResult.getTraceStartTime()/1000)); 498 put2.addColumn(Bytes.toBytes("data"), Bytes.toBytes("starttime"), Bytes.toBytes(apiTraceResult.getTraceStartTime()/1000));
@@ -19,42 +19,48 @@ import com.yoho.trace.store.TraceSpanStore; @@ -19,42 +19,48 @@ import com.yoho.trace.store.TraceSpanStore;
19 /** 19 /**
20 * Created by markeloff on 2017/7/26. 20 * Created by markeloff on 2017/7/26.
21 */ 21 */
22 -public class TraceHbaseHandler implements TraceHandler , Serializable {  
23 -  
24 - Logger logger = LoggerFactory.getLogger(TraceHbaseHandler.class) ;  
25 -  
26 - @Override  
27 - public void handle(final JavaDStream<Spans> kafkaMsgDStream) {  
28 -  
29 - kafkaMsgDStream.foreachRDD(new VoidFunction<JavaRDD<Spans>>() {  
30 - @Override  
31 - public void call(JavaRDD<Spans> spansJavaRDD) throws Exception {  
32 -  
33 - spansJavaRDD.foreachPartition(new VoidFunction<Iterator<Spans>>() {  
34 - @Override  
35 - public void call(Iterator<Spans> spansIterator) throws Exception {  
36 -  
37 - HTable traceTable = null ;  
38 - long begin = System.currentTimeMillis() ;  
39 - try {  
40 - traceTable = (HTable) HBasePool.getConnection().getTable(TableName.valueOf("trace"));  
41 - //traceTable.setWriteBufferSize(1024 * 1024 * 6);  
42 - //traceTable.setAutoFlush(false, true);  
43 - int count = TraceSpanStore.store(spansIterator, traceTable) ;  
44 - //traceTable.flushCommits();  
45 - logger.info("flush spans to hbase, count {}, elapse {}", count, System.currentTimeMillis()-begin );  
46 - }finally {  
47 - try {  
48 - if(traceTable != null ) traceTable.close();  
49 - } catch (IOException e) {  
50 - e.printStackTrace();  
51 - }  
52 - }  
53 - }  
54 - });  
55 - }  
56 - });  
57 -  
58 -  
59 - } 22 +public class TraceHbaseHandler implements TraceHandler, Serializable {
  23 +
  24 + Logger logger = LoggerFactory.getLogger(TraceHbaseHandler.class);
  25 +
  26 + @Override
  27 + public void handle(final JavaDStream<Spans> kafkaMsgDStream) {
  28 +
  29 + kafkaMsgDStream.foreachRDD(new VoidFunction<JavaRDD<Spans>>() {
  30 + @Override
  31 + public void call(JavaRDD<Spans> spansJavaRDD) throws Exception {
  32 + spansJavaRDD.foreachPartition(new VoidFunction<Iterator<Spans>>() {
  33 + @Override
  34 + public void call(Iterator<Spans> spansIterator) throws Exception {
  35 + // HTable traceTable = null ;
  36 + HTable[] tables = new HTable[3];
  37 + int count = 0;
  38 + long begin = System.currentTimeMillis();
  39 + try {
  40 + for (int i = 0; i < 3; i++) {
  41 + tables[i] = (HTable) HBasePool.getConnection().getTable(TableName.valueOf("trace"));
  42 + tables[i].setWriteBufferSize(1024 * 1024 * 6);
  43 + tables[i].setAutoFlush(false, true);
  44 + while (spansIterator.hasNext()) {
  45 + count = count + TraceSpanStore.store(spansIterator.next(), tables[i]);
  46 + }
  47 + tables[i].flushCommits();
  48 + logger.info("flush spans to hbase, count {}, elapse {}", count,System.currentTimeMillis() - begin);
  49 + }
  50 +
  51 + } finally {
  52 + try {
  53 + for (HTable hTable : tables) {
  54 + if (hTable != null)
  55 + hTable.close();
  56 + }
  57 + } catch (IOException e) {
  58 + e.printStackTrace();
  59 + }
  60 + }
  61 + }
  62 + });
  63 + }
  64 + });
  65 + }
60 } 66 }
1 package com.yoho.trace.store; 1 package com.yoho.trace.store;
2 2
3 import java.io.Serializable; 3 import java.io.Serializable;
4 -import java.util.ArrayList;  
5 -import java.util.Iterator;  
6 import java.util.List; 4 import java.util.List;
7 5
8 import org.apache.commons.collections.CollectionUtils; 6 import org.apache.commons.collections.CollectionUtils;
  7 +import org.apache.hadoop.hbase.client.Durability;
9 import org.apache.hadoop.hbase.client.HTable; 8 import org.apache.hadoop.hbase.client.HTable;
10 import org.apache.hadoop.hbase.client.Put; 9 import org.apache.hadoop.hbase.client.Put;
11 import org.apache.hadoop.hbase.util.Bytes; 10 import org.apache.hadoop.hbase.util.Bytes;
@@ -21,51 +20,58 @@ import com.yoho.trace.sleuth.Spans; @@ -21,51 +20,58 @@ import com.yoho.trace.sleuth.Spans;
21 */ 20 */
22 public class TraceSpanStore implements Serializable { 21 public class TraceSpanStore implements Serializable {
23 22
24 - private static final Logger logger = LoggerFactory.getLogger(TraceSpanStore.class);  
25 -  
26 - public static int store(Iterator<Spans> spansIterator, HTable traceTable) {  
27 - List<Put> spanPutList = new ArrayList<>(10000);  
28 - try {  
29 - while (spansIterator.hasNext()) {  
30 - Spans spans = spansIterator.next();  
31 - if (spans != null && CollectionUtils.isNotEmpty(spans.getSpans())) {  
32 - List<Span> spanList = spans.getSpans();  
33 - for (int i = 0; i < spanList.size(); i++) {  
34 - Span span = spanList.get(i);  
35 - StringBuffer sb = new StringBuffer(128);  
36 -  
37 - String logEvent = "none";  
38 - if (CollectionUtils.isNotEmpty(span.logs())) {  
39 - logEvent = span.logs().get(0).getEvent();  
40 - }  
41 - String rowkey = sb.append(span.idToHex(span.getTraceId()) + ":" + (span.getBegin() / 1000))  
42 - .append(":").append(span.idToHex(span.getSpanId())).append(":").append(logEvent)  
43 - .toString();  
44 - Put put = new Put(Bytes.toBytes(rowkey));  
45 -  
46 - put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("service"),Bytes.toBytes(spans.getHost().getServiceName()));  
47 - put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("event"), Bytes.toBytes(logEvent));  
48 - put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("ip"),Bytes.toBytes(spans.getHost().getAddress()));  
49 -  
50 - put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("name"), Bytes.toBytes(span.getName()));  
51 - put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("traceid"),Bytes.toBytes(Span.idToHex(span.getTraceId())));  
52 - put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("spanid"),Bytes.toBytes(Span.idToHex(span.getSpanId())));  
53 - put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("begin"), Bytes.toBytes(span.getBegin()));  
54 - put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("end"), Bytes.toBytes(span.getEnd()));  
55 -  
56 - if (CollectionUtils.isNotEmpty(span.getParents())) {  
57 - put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("parent"),Bytes.toBytes(Span.idToHex(span.getParents().get(span.getParents().size() - 1))));  
58 - }  
59 - put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("logs"),Bytes.toBytes(JSONObject.toJSONString(span.logs())));  
60 - put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("tags"),Bytes.toBytes(JSONObject.toJSONString(span.tags())));  
61 - spanPutList.add(put);  
62 - }  
63 - }  
64 - }  
65 - traceTable.put(spanPutList);  
66 - } catch (Exception e) {  
67 -  
68 - }  
69 - return spanPutList.size();  
70 - }  
71 -}  
  23 + private static final Logger logger = LoggerFactory.getLogger(TraceSpanStore.class) ;
  24 +
  25 +
  26 + public static int store(Spans spans, HTable traceTable) {
  27 +
  28 + int count = 0 ;
  29 +
  30 + if (spans != null && CollectionUtils.isNotEmpty(spans.getSpans())) {
  31 + try {
  32 +
  33 + List<Span> spanList = spans.getSpans() ;
  34 + for(int i=0; i<spanList.size(); i++){
  35 +
  36 + Span span = spanList.get(i);
  37 + StringBuffer sb = new StringBuffer(128);
  38 +
  39 + String logEvent = "none" ;
  40 + if( CollectionUtils.isNotEmpty(span.logs()) ){
  41 + logEvent = span.logs().get(0).getEvent() ;
  42 + }
  43 +
  44 + String rowkey = sb.append( span.idToHex(span.getTraceId()) + ":" + (span.getBegin()/1000) ).append(":").append(span.idToHex(span.getSpanId()))
  45 + .append(":").append(logEvent).toString() ;
  46 +
  47 + Put put = new Put(Bytes.toBytes(rowkey)) ;
  48 + //其实不推荐关闭WAL,不过关了的确可以提升性能...因为HBase在写数据前会先写WAL,以保证在异常情况下,HBase可以按照WAL的记录来恢复还未持久化的数据。
  49 + put.setDurability(Durability.SKIP_WAL);
  50 + put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("service"),Bytes.toBytes( spans.getHost().getServiceName() )) ;
  51 + put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("event"),Bytes.toBytes( logEvent )) ;
  52 + put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("ip"),Bytes.toBytes( spans.getHost().getAddress() )) ;
  53 +
  54 + put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("name"),Bytes.toBytes(span.getName())) ;
  55 + put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("traceid"),Bytes.toBytes( Span.idToHex(span.getTraceId()) )) ;
  56 + put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("spanid"),Bytes.toBytes( Span.idToHex(span.getSpanId()) )) ;
  57 + put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("begin"),Bytes.toBytes( span.getBegin() )) ;
  58 + put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("end"),Bytes.toBytes( span.getEnd() )) ;
  59 +
  60 + if( CollectionUtils.isNotEmpty( span.getParents())) {
  61 + put.addColumn(Bytes.toBytes("span"), Bytes.toBytes("parent"), Bytes.toBytes(Span.idToHex(span.getParents().get(span.getParents().size() - 1)))) ;
  62 + }
  63 + put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("logs"),Bytes.toBytes(JSONObject.toJSONString(span.logs()))) ;
  64 + put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("tags"),Bytes.toBytes(JSONObject.toJSONString(span.tags()))) ;
  65 +
  66 + traceTable.put(put);
  67 + count++ ;
  68 + }
  69 +
  70 + }catch(Exception e){
  71 + logger.error("store to hbase failed, spand {} ", JSONObject.toJSONString(spans),e);
  72 + }
  73 + }
  74 + return count ;
  75 + }
  76 +
  77 +}