TraceHbaseHandler.java 2.35 KB
package com.yoho.trace.online.handler;

import com.yoho.trace.sleuth.Spans;
import com.yoho.trace.store.HBasePool;
import com.yoho.trace.store.TraceSpanStore;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;

/**
 * Created by markeloff on 2017/7/26.
 */
public class TraceHbaseHandler implements TraceHandler , Serializable {

    Logger logger = LoggerFactory.getLogger(TraceHbaseHandler.class) ;

    @Override
    public void handle(final JavaDStream<Spans> kafkaMsgDStream) {

        kafkaMsgDStream.foreachRDD(new VoidFunction<JavaRDD<Spans>>() {
            @Override
            public void call(JavaRDD<Spans> spansJavaRDD) throws Exception {

                spansJavaRDD.foreachPartition(new VoidFunction<Iterator<Spans>>() {
                    @Override
                    public void call(Iterator<Spans> spansIterator) throws Exception {

                        HTable traceTable = null ;
                        int count = 0 ;
                        long begin = System.currentTimeMillis() ;
                        try {
                            traceTable = (HTable) HBasePool.getConnection().getTable(TableName.valueOf("trace"));
                            traceTable.setWriteBufferSize(1024 * 1024 * 6);
                            traceTable.setAutoFlush(false, true);

                            while(spansIterator.hasNext()){
                                count = count + TraceSpanStore.store(spansIterator.next(), traceTable) ;
                            }

                            traceTable.flushCommits();

                            logger.info("flush spans to hbase, count {}, elapse {}", count, System.currentTimeMillis()-begin );

                        }finally {
                            try {
                                if(traceTable != null ) traceTable.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                });
            }
        });


    }
}