Authored by wangning

批量入库

@@ -464,15 +464,11 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable { @@ -464,15 +464,11 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable {
464 apiResultTraceDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, ApiTraceResult>>() { 464 apiResultTraceDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, ApiTraceResult>>() {
465 @Override 465 @Override
466 public void call(JavaPairRDD<String, ApiTraceResult> stringSortedTraceJavaPairRDD) throws Exception { 466 public void call(JavaPairRDD<String, ApiTraceResult> stringSortedTraceJavaPairRDD) throws Exception {
467 - System.out.println("handlerAllTrace begin1");  
468 - logger.info("handlerAllTrace begin1");  
469 467
470 stringSortedTraceJavaPairRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, ApiTraceResult>>>() { 468 stringSortedTraceJavaPairRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, ApiTraceResult>>>() {
471 @Override 469 @Override
472 public void call(Iterator<Tuple2<String, ApiTraceResult>> tuple2Iterator) throws Exception { 470 public void call(Iterator<Tuple2<String, ApiTraceResult>> tuple2Iterator) throws Exception {
473 471
474 - System.out.println("handlerAllTrace begin2");  
475 - logger.info("handlerAllTrace begin2");  
476 HTable resultTable = null; 472 HTable resultTable = null;
477 try { 473 try {
478 if(tuple2Iterator == null){ 474 if(tuple2Iterator == null){
@@ -7,15 +7,15 @@ import org.apache.commons.lang3.StringUtils; @@ -7,15 +7,15 @@ import org.apache.commons.lang3.StringUtils;
7 import org.apache.hadoop.hbase.TableName; 7 import org.apache.hadoop.hbase.TableName;
8 import org.apache.hadoop.hbase.client.HTable; 8 import org.apache.hadoop.hbase.client.HTable;
9 import org.apache.hadoop.hbase.client.Put; 9 import org.apache.hadoop.hbase.client.Put;
10 -import org.apache.hadoop.hbase.client.Table;  
11 import org.apache.hadoop.hbase.util.Bytes; 10 import org.apache.hadoop.hbase.util.Bytes;
12 import org.apache.spark.api.java.JavaPairRDD; 11 import org.apache.spark.api.java.JavaPairRDD;
13 import org.apache.spark.api.java.function.VoidFunction; 12 import org.apache.spark.api.java.function.VoidFunction;
14 import org.slf4j.Logger; 13 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory; 14 import org.slf4j.LoggerFactory;
16 -import properties.PropertiesFactory;  
17 import scala.Tuple2; 15 import scala.Tuple2;
18 16
  17 +import java.util.ArrayList;
  18 +import java.util.Iterator;
19 import java.util.List; 19 import java.util.List;
20 20
21 /** 21 /**
@@ -68,49 +68,55 @@ public class ApiStatisticsResultStore { @@ -68,49 +68,55 @@ public class ApiStatisticsResultStore {
68 ); 68 );
69 } 69 }
70 70
71 - //insert data  
72 - apiResultRdd.foreach(  
73 - new VoidFunction<Tuple2<String, ApiTraceResult>>() {  
74 - @Override  
75 - public void call(Tuple2<String, ApiTraceResult> tuple2) throws Exception {  
76 -  
77 - ApiTraceResult result = tuple2._2();  
78 - HTable resultTable = null;  
79 - try {  
80 - if (resultTable == null) {  
81 - resultTable = (HTable) HBasePool.getConnection().getTable(TableName.valueOf(tableName));  
82 - }  
83 71
84 - String rowkey = result.getApiName() + ":" + result.getTraceMd5();  
85 - Put put = new Put(Bytes.toBytes(rowkey));  
86 - put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("api"), Bytes.toBytes(result.getApiName()));  
87 - String[] md5Tags = StringUtils.split(result.getTraceMd5(), '.');  
88 - if (null == md5Tags || 2 != md5Tags.length) {  
89 - put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("traceMd5"), Bytes.toBytes(StringUtils.EMPTY));  
90 - } else {  
91 - put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("traceMd5"), Bytes.toBytes(md5Tags[1]));  
92 - }  
93 - put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("duration"), Bytes.toBytes(result.getDuration()));  
94 - put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("times"), Bytes.toBytes(result.getCallTimes()));  
95 - put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("maxLatency"), Bytes.toBytes(result.getMaxLatency()));  
96 - put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("maxLatencyTrace"), Bytes.toBytes(result.getMaxLatencyTrace()));  
97 - put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("minLatency"), Bytes.toBytes(result.getMinLatency()));  
98 - put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("minLatencyTrace"), Bytes.toBytes(result.getMinLatencyTrace()));  
99 - put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("spans"), Bytes.toBytes(JSONObject.toJSONString(result.getSpans())));  
100 -  
101 - logger.info("put data to hbase, {}", put);  
102 -  
103 - resultTable.put(put);  
104 -  
105 - } catch (Exception e) {  
106 - logger.error("store ApiTraceResult failed, result {} ", result, e);  
107 - } finally {  
108 - if (resultTable != null)  
109 - resultTable.close(); 72 +
  73 + apiResultRdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, ApiTraceResult>>>() {
  74 + @Override
  75 + public void call(Iterator<Tuple2<String, ApiTraceResult>> tuple2Iterator) throws Exception {
  76 + HTable resultTable = null;
  77 + try {
  78 + if (resultTable == null) {
  79 + resultTable = (HTable) HBasePool.getConnection().getTable(TableName.valueOf(tableName));
  80 + }
  81 +
  82 + if(tuple2Iterator == null){
  83 + return;
  84 + }
  85 +
  86 + List<Put> putList = new ArrayList<>();
  87 + while(tuple2Iterator.hasNext()){
  88 + Tuple2<String, ApiTraceResult> next = tuple2Iterator.next();
  89 + ApiTraceResult apiTraceResult = next._2;
  90 + String rowkey = apiTraceResult.getApiName() + ":" + apiTraceResult.getTraceMd5();
  91 + Put put = new Put(Bytes.toBytes(rowkey));
  92 + put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("api"), Bytes.toBytes(apiTraceResult.getApiName()));
  93 + String[] md5Tags = StringUtils.split(apiTraceResult.getTraceMd5(), '.');
  94 + if (null == md5Tags || 2 != md5Tags.length) {
  95 + put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("traceMd5"), Bytes.toBytes(StringUtils.EMPTY));
  96 + } else {
  97 + put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("traceMd5"), Bytes.toBytes(md5Tags[1]));
110 } 98 }
  99 + put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("duration"), Bytes.toBytes(apiTraceResult.getDuration()));
  100 + put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("times"), Bytes.toBytes(apiTraceResult.getCallTimes()));
  101 + put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("maxLatency"), Bytes.toBytes(apiTraceResult.getMaxLatency()));
  102 + put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("maxLatencyTrace"), Bytes.toBytes(apiTraceResult.getMaxLatencyTrace()));
  103 + put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("minLatency"), Bytes.toBytes(apiTraceResult.getMinLatency()));
  104 + put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("minLatencyTrace"), Bytes.toBytes(apiTraceResult.getMinLatencyTrace()));
  105 + put.addColumn(Bytes.toBytes("trace"), Bytes.toBytes("spans"), Bytes.toBytes(JSONObject.toJSONString(apiTraceResult.getSpans())));
  106 + putList.add(put);
111 } 107 }
  108 +
  109 + resultTable.put(putList);
  110 +
  111 + } catch (Exception e) {
  112 + logger.error(e.getMessage(),e);
  113 + } finally {
  114 + if (resultTable != null)
  115 + resultTable.close();
112 } 116 }
113 - ); 117 + }
  118 + });
  119 +
114 } 120 }
115 121
116 } 122 }