TraceHbaseSink.java 1.66 KB
package com.yoho.trace.online.sink;

import com.yoho.trace.sleuth.Spans;
import com.yoho.trace.store.HBasePool;
import com.yoho.trace.store.TraceSpanStore;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HTable;
import scala.util.Random;

import java.io.IOException;

/**
 * Created by mingdan.ge on 2019/11/8.
 */
public class TraceHbaseSink extends RichSinkFunction<Spans> {

    private Connection conn;
    private HTable[] tables;
    private int tableNum=3;

    @Override
    public void open(Configuration parameters) throws Exception {
        conn = HBasePool.getConnection();
        tables = new HTable[tableNum];
        for (int i = 0; i < tableNum; i++) {
            tables[i] = (HTable) conn.getTable(TableName.valueOf("trace"));
            tables[i].setWriteBufferSize(1024 * 1024 * 20);
            tables[i].setAutoFlush(false, true);//不单个提交
        }
    }

    @Override
    public void invoke(Spans value, SinkFunction.Context context) throws Exception {
        int random=new Random().nextInt(tableNum);
        TraceSpanStore.store(value, tables[random]);
    }

    @Override
    public void close() throws Exception {
        try {
            for (HTable hTable : tables) {
                if (hTable != null)
                    hTable.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        super.close();
        conn.close();
    }
}