Authored by 谢勇

性能测试

... ... @@ -16,56 +16,52 @@ import com.yoho.trace.sleuth.Spans;
import com.yoho.trace.store.HBasePool;
import com.yoho.trace.store.TraceSpanStore;
import scala.util.Random;
/**
* Created by markeloff on 2017/7/26.
*/
public class TraceHbaseHandler implements TraceHandler, Serializable {
public class TraceHbaseHandler implements TraceHandler , Serializable {
Logger logger = LoggerFactory.getLogger(TraceHbaseHandler.class) ;
@Override
public void handle(final JavaDStream<Spans> kafkaMsgDStream) {
kafkaMsgDStream.foreachRDD(new VoidFunction<JavaRDD<Spans>>() {
@Override
public void call(JavaRDD<Spans> spansJavaRDD) throws Exception {
spansJavaRDD.foreachPartition(new VoidFunction<Iterator<Spans>>() {
@Override
public void call(Iterator<Spans> spansIterator) throws Exception {
HTable traceTable = null ;
int count = 0 ;
long begin = System.currentTimeMillis() ;
try {
traceTable = (HTable) HBasePool.getConnection().getTable(TableName.valueOf("trace"));
traceTable.setWriteBufferSize(1024 * 1024 * 6);
traceTable.setAutoFlush(false, true);
while(spansIterator.hasNext()){
count = count + TraceSpanStore.store(spansIterator.next(), traceTable) ;
}
traceTable.flushCommits();
Logger logger = LoggerFactory.getLogger(TraceHbaseHandler.class);
logger.info("flush spans to hbase, count {}, elapse {}", count, System.currentTimeMillis()-begin );
@Override
public void handle(final JavaDStream<Spans> kafkaMsgDStream) {
}finally {
try {
if(traceTable != null ) traceTable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
});
}
});
kafkaMsgDStream.foreachRDD(new VoidFunction<JavaRDD<Spans>>() {
@Override
public void call(JavaRDD<Spans> spansJavaRDD) throws Exception {
spansJavaRDD.foreachPartition(new VoidFunction<Iterator<Spans>>() {
@Override
public void call(Iterator<Spans> spansIterator) throws Exception {
// HTable traceTable = null ;
HTable[] tables = new HTable[3];
int count = 0;
long begin = System.currentTimeMillis();
try {
for (int i = 0; i < 3; i++) {
tables[i] = (HTable) HBasePool.getConnection().getTable(TableName.valueOf("trace"));
tables[i].setWriteBufferSize(1024 * 1024 * 20);
tables[i].setAutoFlush(false, true);
logger.info("flush spans to hbase, count {}, elapse {}", count,System.currentTimeMillis() - begin);
}
while (spansIterator.hasNext()) {
int random=new Random().nextInt(3);
count = count + TraceSpanStore.store(spansIterator.next(), tables[random]);
}
for (int i = 0; i < 3; i++) {
tables[i].flushCommits();
}
} finally {
try {
for (HTable hTable : tables) {
if (hTable != null)
hTable.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
});
}
});
}
}
}
... ...
... ... @@ -4,7 +4,6 @@ import java.io.Serializable;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
... ... @@ -46,7 +45,7 @@ public class TraceSpanStore implements Serializable {
Put put = new Put(Bytes.toBytes(rowkey)) ;
//其实不推荐关闭WAL,不过关了的确可以提升性能...因为HBase在写数据前会先写WAL,以保证在异常情况下,HBase可以按照WAL的记录来恢复还未持久化的数据。
put.setDurability(Durability.SKIP_WAL);
//put.setDurability(Durability.SKIP_WAL);
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("service"),Bytes.toBytes( spans.getHost().getServiceName() )) ;
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("event"),Bytes.toBytes( logEvent )) ;
put.addColumn(Bytes.toBytes("span"),Bytes.toBytes("ip"),Bytes.toBytes( spans.getHost().getAddress() )) ;
... ...