TraceFlinkExecutor.java
2.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.yoho.trace.online;
import com.alibaba.fastjson.JSON;
import com.yoho.trace.online.handler.TraceFlinkHandlerStarter;
import com.yoho.trace.sleuth.Spans;
import com.yoho.trace.utils.KafkaUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.util.Collector;
import properties.PropertiesFactory;
/**
* Created by mingdan.ge on 2019/11/7.
*/
public class TraceFlinkExecutor {
//String CLICK_KAFKA_SERVERS = "192.168.103.78:9092";
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
FlinkKafkaConsumerBase<String> kafkaConsumer = KafkaUtils.finkKafkaConsumer(PropertiesFactory.kafka().getBrokers(), PropertiesFactory.kafka().getGroup(), PropertiesFactory.kafka().getTopic());
SingleOutputStreamOperator<Spans> clickStream = env.addSource(kafkaConsumer)
.flatMap(new FlatMapFunction<String, Spans>() {
@Override
public void flatMap(String line, Collector<Spans> collector) throws Exception {
Spans spans = transf(line);
if (spans != null) {
collector.collect(spans);
}
}
});
TraceFlinkHandlerStarter.start(clickStream);
env.execute("app.click.stream.analysis");
}
public static Spans transf(String s) {
try {
Spans spans = JSON.parseObject(s, Spans.class);
if (spans != null) {
spans.setReceive(System.currentTimeMillis());
}
return spans;
} catch (Exception e) {
return null;
}
}
}