Authored by wangning

update

@@ -264,6 +264,9 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable { @@ -264,6 +264,9 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable {
264 264
265 //3处理异常调用链信息 265 //3处理异常调用链信息
266 handlerExceptionTrace(apiResultTraceDStream); 266 handlerExceptionTrace(apiResultTraceDStream);
  267 +
  268 + //4处理所有traceid
  269 + handlerAllTrace(apiResultTraceDStream);
267 } 270 }
268 271
269 272
@@ -449,4 +452,51 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable { @@ -449,4 +452,51 @@ public class TraceAnalyzeHandler implements TraceHandler, Serializable {
449 } 452 }
450 }); 453 });
451 } 454 }
  455 +
  456 +
  457 +
  458 + private void handlerAllTrace(JavaPairDStream<String, ApiTraceResult> apiResultTraceDStream){
  459 + apiResultTraceDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, ApiTraceResult>>() {
  460 + @Override
  461 + public void call(JavaPairRDD<String, ApiTraceResult> stringSortedTraceJavaPairRDD) throws Exception {
  462 + stringSortedTraceJavaPairRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, ApiTraceResult>>>() {
  463 + @Override
  464 + public void call(Iterator<Tuple2<String, ApiTraceResult>> tuple2Iterator) throws Exception {
  465 + HTable resultTable = null;
  466 + try {
  467 + if(tuple2Iterator == null){
  468 + return;
  469 + }
  470 +
  471 + if (resultTable == null) {
  472 + resultTable = (HTable) HBasePool.getConnection().getTable(TableName.valueOf("all_trace"));
  473 + }
  474 +
  475 + List<Put> putList = new ArrayList<>();
  476 + while(tuple2Iterator.hasNext()){
  477 + Tuple2<String, ApiTraceResult> next = tuple2Iterator.next();
  478 + ApiTraceResult apiTraceResult = next._2;
  479 + String[] md5Tags = StringUtils.split(apiTraceResult.getTraceMd5(), '.');
  480 +
  481 + Put put = new Put(Bytes.toBytes( md5Tags[1] + ":" + apiTraceResult.getTraceStartTime()/1000 + ":" + apiTraceResult.getTraceId()));
  482 + put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("spans"), Bytes.toBytes(JSONObject.toJSONString(apiTraceResult.getSpans())));
  483 + put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("traceid"), Bytes.toBytes(apiTraceResult.getTraceId()));
  484 + put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("starttime"), Bytes.toBytes(apiTraceResult.getTraceStartTime()/1000));
  485 + put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("traceMd5"), Bytes.toBytes(apiTraceResult.getTraceMd5()));
  486 + putList.add(put);
  487 + }
  488 +
  489 + resultTable.put(putList);
  490 +
  491 + } catch (Exception e) {
  492 + logger.error(e.getMessage(),e);
  493 + } finally {
  494 + if (resultTable != null)
  495 + resultTable.close();
  496 + }
  497 + }
  498 + });
  499 + }
  500 + });
  501 + }
452 } 502 }