PageAnalyzeHandler.java 6.88 KB
package com.yoho.trace.online.handler;

import com.google.common.collect.Lists;
import com.yoho.trace.anaylzer.model.ApiTraceResult;
import com.yoho.trace.anaylzer.model.SpanInfo;
import com.yoho.trace.sleuth.Span;
import com.yoho.trace.sleuth.Spans;
import com.yoho.trace.store.HBasePool;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Iterator;
import java.util.List;

/**
 * @Author: lingjie.meng
 * @Descroption:
 * @Date: craete on 下午3:48 in 2017/11/10
 * @ModifyBy:
 */
public class PageAnalyzeHandler implements TraceHandler, Serializable {


	private static Logger logger = LoggerFactory.getLogger(TraceAnalyzeHandler.class);


	private static final String SPLIT_STR = "-";

	private static final String FROM_PAGE = "yoho.fromPage";

	@Override
	public void handle(final JavaDStream<Spans> kafkaMsgDStream) {

		// 只取包含yoho.fromPage的span
		JavaDStream<SpanInfo> spanInfoStream = kafkaMsgDStream.flatMap(new FlatMapFunction<Spans, SpanInfo>() {
			@Override
			public Iterator<SpanInfo> call(Spans spans) throws Exception {
				List<SpanInfo> result = Lists.newArrayList();
				List<Span> list = spans.getSpans();
				Iterator<Span> ite = list.iterator();
				SpanInfo spanInfo;
				String pageId;
				while (ite.hasNext()) {
					Span span = ite.next();
					//只取包含pageID的span
					pageId = span.tags().get(FROM_PAGE);
					if (!StringUtils.isEmpty(pageId) && !StringUtils.isEmpty(span.getName())) {
						spanInfo = new SpanInfo();
						spanInfo.setPageId(pageId);
						spanInfo.setName(span.getName());
						spanInfo.setBegin(span.getBegin());
						spanInfo.setEnd(span.getEnd());
						spanInfo.setTraceid(Span.idToHex(span.getTraceId()));
						spanInfo.setSpanid(Span.idToHex(span.getSpanId()));
						spanInfo.setService(spans.getHost().getServiceName());
						spanInfo.setIp(spans.getHost().getAddress());
						result.add(spanInfo);
					}
				}
				return result.iterator();
			}
		});

		// key:pageId:apiname, value ApiTraceResult
		JavaPairDStream<String, ApiTraceResult> pageIdSpanInfoJavaPairDStream = spanInfoStream
				.mapPartitionsToPair(new PairFlatMapFunction<Iterator<SpanInfo>, String, ApiTraceResult>() {
					@Override
					public Iterator<Tuple2<String, ApiTraceResult>> call(Iterator<SpanInfo> ite) throws Exception {
						List<Tuple2<String, ApiTraceResult>> list = Lists.newArrayList();
						while (ite.hasNext()) {
							SpanInfo spanInfo = ite.next();
							ApiTraceResult result = new ApiTraceResult();
							result.setDuration(spanInfo.getEnd() - spanInfo.getBegin());
							list.add(new Tuple2<>(spanInfo.getPageId() + SPLIT_STR + spanInfo.getName().replace("http:/", ""), result));
						}

						return list.iterator();
					}
				});

		// key pageId:apiname, value List<ApiTraceResult>
		JavaPairDStream<String, Iterable<ApiTraceResult>> pageIdGroupPairStream = pageIdSpanInfoJavaPairDStream.groupByKey();

		// 给每个页面的数据汇总,key pageId:apiname, value ApiTraceResult
		JavaPairDStream<String, ApiTraceResult> pageResultStream = pageIdGroupPairStream.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Iterable<ApiTraceResult>>, String, ApiTraceResult>() {
			@Override
			public Iterator<Tuple2<String, ApiTraceResult>> call(Tuple2<String, Iterable<ApiTraceResult>> tuple2) throws Exception {
				Iterator<ApiTraceResult> iterator = tuple2._2().iterator();
				int calTimes = 0;
				long duration = 0;
				while (iterator.hasNext()) {
					ApiTraceResult apiTraceResult = iterator.next();
					duration = (calTimes++ * duration + apiTraceResult.getDuration()) / calTimes;
				}
				ApiTraceResult result = new ApiTraceResult();
				result.setCallTimes(calTimes);
				result.setDuration(duration);
				return Arrays.asList(new Tuple2<>(tuple2._1, result)).iterator();
			}
		});

		//入库
		pageResultStream.foreachRDD(new VoidFunction<JavaPairRDD<String, ApiTraceResult>>() {
			@Override
			public void call(JavaPairRDD<String, ApiTraceResult> rdd) throws Exception {
				rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, ApiTraceResult>>>() {
					@Override
					public void call(Iterator<Tuple2<String, ApiTraceResult>> tuple2s) throws Exception {
						long now = Calendar.getInstance().getTimeInMillis();
						try (HTable resultTable1 = (HTable) HBasePool.getConnection().getTable(TableName.valueOf("trace_page_analyze_minutes"));
						     HTable resultTable2 = (HTable) HBasePool.getConnection().getTable(TableName.valueOf("trace_api_source_analyze_minutes"));
						     HTable resultTable3 = (HTable) HBasePool.getConnection().getTable(TableName.valueOf("page_statistics"))) {

							List<Put> puts1 = Lists.newArrayList();
							List<Put> puts2 = Lists.newArrayList();
							List<Put> puts3 = Lists.newArrayList();

							while (tuple2s.hasNext()) {
								Tuple2<String, ApiTraceResult> tuple2 = tuple2s.next();
								String rowKey1 = tuple2._1.split(SPLIT_STR)[0] + "-" + now + "-" + tuple2._1.split(SPLIT_STR)[1];
								Put put1 = new Put(Bytes.toBytes(rowKey1));
								put1.addColumn(Bytes.toBytes("data"), Bytes.toBytes("times"), Bytes.toBytes(tuple2._2.getCallTimes()));
								put1.addColumn(Bytes.toBytes("data"), Bytes.toBytes("duration"), Bytes.toBytes(tuple2._2.getDuration()));
								puts1.add(put1);
								logger.info("put data to trace_page_analyze_minutes, {}", put1);


								String rowKey2 = tuple2._1.split(SPLIT_STR)[1] + "-" + now + "-" + tuple2._1.split(SPLIT_STR)[0];
								Put put2 = new Put(Bytes.toBytes(rowKey2));
								put2.addColumn(Bytes.toBytes("data"), Bytes.toBytes("times"), Bytes.toBytes(tuple2._2.getCallTimes()));
								puts2.add(put2);
								logger.info("put data to trace_api_source_analyze_minutes, {}", put2);

								String rowKey3 = "page-" + now;
								Put put3 = new Put(Bytes.toBytes(rowKey3));
								put3.addColumn(Bytes.toBytes("data"), Bytes.toBytes("page"), Bytes.toBytes(tuple2._1.split(SPLIT_STR)[0]));
								put3.addColumn(Bytes.toBytes("data"), Bytes.toBytes("api"), Bytes.toBytes(tuple2._1.split(SPLIT_STR)[1]));
								puts3.add(put3);
								logger.info("put data to page_statistics, {}", put3);

							}

							resultTable1.put(puts1);
							resultTable2.put(puts2);
							resultTable3.put(puts3);
						} catch (Exception e) {
							logger.error("store page result failed, e is {} ", e);
						}
					}
				});

			}
		});

	}

}