TraceFlinkExecutor.java 2.09 KB
package com.yoho.trace.online;

import com.alibaba.fastjson.JSON;
import com.yoho.trace.online.handler.TraceFlinkHandlerStarter;
import com.yoho.trace.sleuth.Spans;
import com.yoho.trace.utils.KafkaUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.util.Collector;
import properties.PropertiesFactory;

/**
 * Created by mingdan.ge on 2019/11/7.
 */
public class TraceFlinkExecutor {
    //String CLICK_KAFKA_SERVERS = "192.168.103.78:9092";

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        FlinkKafkaConsumerBase<String> kafkaConsumer = KafkaUtils.finkKafkaConsumer(PropertiesFactory.kafka().getBrokers(), PropertiesFactory.kafka().getGroup(), PropertiesFactory.kafka().getTopic());
        SingleOutputStreamOperator<Spans> clickStream = env.addSource(kafkaConsumer)
                .flatMap(new FlatMapFunction<String, Spans>() {
                    @Override
                    public void flatMap(String line, Collector<Spans> collector) throws Exception {
                        Spans spans = transf(line);
                        if (spans != null) {
                            collector.collect(spans);
                        }
                    }
                });
        TraceFlinkHandlerStarter.start(clickStream);
        env.execute("app.click.stream.analysis");
    }


    public static Spans transf(String s) {
        try {
            Spans spans = JSON.parseObject(s, Spans.class);
            if (spans != null) {
                spans.setReceive(System.currentTimeMillis());
            }
            return spans;
        } catch (Exception e) {
            return null;
        }
    }

}