Authored by 孟令阶

add

... ... @@ -13,6 +13,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
... ... @@ -81,6 +82,9 @@ public class PageAnalyzeHandler implements TraceHandler, Serializable {
}
});
spanInfoStream.cache();
// key:pageId:apiname, value ApiTraceResult
JavaPairDStream<String, ApiTraceResult> pageIdSpanInfoJavaPairDStream = spanInfoStream
.mapPartitionsToPair(new PairFlatMapFunction<Iterator<SpanInfo>, String, ApiTraceResult>() {
... ... @@ -98,25 +102,14 @@ public class PageAnalyzeHandler implements TraceHandler, Serializable {
}
});
// key pageId:apiname, value List<ApiTraceResult>
JavaPairDStream<String, Iterable<ApiTraceResult>> pageIdGroupPairStream = pageIdSpanInfoJavaPairDStream.groupByKey();
// 给每个页面的数据汇总,key pageId:apiname, value ApiTraceResult
JavaPairDStream<String, ApiTraceResult> pageResultStream = pageIdGroupPairStream.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Iterable<ApiTraceResult>>, String, ApiTraceResult>() {
JavaPairDStream<String, ApiTraceResult> pageResultStream = pageIdSpanInfoJavaPairDStream.reduceByKey(new Function2<ApiTraceResult, ApiTraceResult, ApiTraceResult>() {
@Override
public Iterator<Tuple2<String, ApiTraceResult>> call(Tuple2<String, Iterable<ApiTraceResult>> tuple2) throws Exception {
Iterator<ApiTraceResult> iterator = tuple2._2().iterator();
int calTimes = 0;
long duration = 0;
while (iterator.hasNext()) {
ApiTraceResult apiTraceResult = iterator.next();
duration += apiTraceResult.getDuration();
calTimes++;
}
ApiTraceResult result = new ApiTraceResult();
result.setCallTimes(calTimes);
result.setDuration(duration);
return Arrays.asList(new Tuple2<>(tuple2._1, result)).iterator();
public ApiTraceResult call(ApiTraceResult v1, ApiTraceResult v2) throws Exception {
ApiTraceResult apiTraceResult = new ApiTraceResult();
apiTraceResult.setDuration(v1.getDuration() + v2.getDuration());
apiTraceResult.setCallTimes(v1.getCallTimes() + v2.getCallTimes());
return apiTraceResult;
}
});
... ... @@ -139,7 +132,7 @@ public class PageAnalyzeHandler implements TraceHandler, Serializable {
String rowKey1 = tuple2._1.split(SPLIT_STR)[0] + "-" + now + "-" + tuple2._1.split(SPLIT_STR)[1];
Put put1 = new Put(Bytes.toBytes(rowKey1));
put1.addColumn(Bytes.toBytes("data"), Bytes.toBytes("times"), Bytes.toBytes(tuple2._2.getCallTimes()));
put1.addColumn(Bytes.toBytes("data"), Bytes.toBytes("duration"), Bytes.toBytes(tuple2._2.getDuration()/tuple2._2.getCallTimes()));
put1.addColumn(Bytes.toBytes("data"), Bytes.toBytes("duration"), Bytes.toBytes(tuple2._2.getDuration() / tuple2._2.getCallTimes()));
put1.addColumn(Bytes.toBytes("data"), Bytes.toBytes("total_duration"), Bytes.toBytes(tuple2._2.getDuration()));
puts1.add(put1);
logger.info("put data to trace_page_analyze_minutes, {}", put1);
... ... @@ -163,6 +156,55 @@ public class PageAnalyzeHandler implements TraceHandler, Serializable {
}
});
//开始计算每个页面每个周期最高访问的接口
JavaPairDStream<String, ApiTraceResult> pageStream = spanInfoStream
.mapPartitionsToPair(new PairFlatMapFunction<Iterator<SpanInfo>, String, ApiTraceResult>() {
@Override
public Iterator<Tuple2<String, ApiTraceResult>> call(Iterator<SpanInfo> ite) throws Exception {
List<Tuple2<String, ApiTraceResult>> list = Lists.newArrayList();
while (ite.hasNext()) {
SpanInfo spanInfo = ite.next();
ApiTraceResult result = new ApiTraceResult();
result.setDuration(spanInfo.getEnd() - spanInfo.getBegin());
list.add(new Tuple2<>(spanInfo.getPageId(), result));
}
return list.iterator();
}
});
JavaPairDStream<String, ApiTraceResult> pageMaxCallTimeStream = pageStream.reduceByKey(new Function2<ApiTraceResult, ApiTraceResult, ApiTraceResult>() {
@Override
public ApiTraceResult call(ApiTraceResult v1, ApiTraceResult v2) throws Exception {
return v1.getCallTimes() >= v2.getCallTimes() ? v1 : v2;
}
});
pageMaxCallTimeStream.foreachRDD(new VoidFunction<JavaPairRDD<String, ApiTraceResult>>() {
@Override
public void call(JavaPairRDD<String, ApiTraceResult> rdd) throws Exception {
rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, ApiTraceResult>>>() {
@Override
public void call(Iterator<Tuple2<String, ApiTraceResult>> tuple2s) throws Exception {
long now = Calendar.getInstance().getTimeInMillis();
try (HTable resultTable1 = (HTable) HBasePool.getConnection().getTable(TableName.valueOf("page_call_time_minutes"))) {
List<Put> puts1 = Lists.newArrayList();
while (tuple2s.hasNext()) {
Tuple2<String, ApiTraceResult> tuple2 = tuple2s.next();
String rowKey1 = tuple2._1 + "-" + now;
Put put1 = new Put(Bytes.toBytes(rowKey1));
put1.addColumn(Bytes.toBytes("data"), Bytes.toBytes("times"), Bytes.toBytes(tuple2._2.getCallTimes()));
puts1.add(put1);
logger.info("put data to page_call_time_minutes, {}", put1);
}
resultTable1.put(puts1);
} catch (Exception e) {
logger.error("store page_call_time_minutes result failed, e is {} ", e);
}
}
});
}
});
}
}
... ...