Authored by gemingdan

flink-kafka参数修改

@@ -3,7 +3,6 @@ package com.yoho.trace.online; @@ -3,7 +3,6 @@ package com.yoho.trace.online;
3 import com.alibaba.fastjson.JSON; 3 import com.alibaba.fastjson.JSON;
4 import com.yoho.trace.online.handler.TraceFlinkHandlerStarter; 4 import com.yoho.trace.online.handler.TraceFlinkHandlerStarter;
5 import com.yoho.trace.sleuth.Spans; 5 import com.yoho.trace.sleuth.Spans;
6 -import com.yoho.trace.utils.Constant;  
7 import com.yoho.trace.utils.KafkaUtils; 6 import com.yoho.trace.utils.KafkaUtils;
8 import org.apache.flink.api.common.functions.FlatMapFunction; 7 import org.apache.flink.api.common.functions.FlatMapFunction;
9 import org.apache.flink.streaming.api.TimeCharacteristic; 8 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -11,6 +10,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -11,6 +10,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
11 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 10 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
12 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; 11 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
13 import org.apache.flink.util.Collector; 12 import org.apache.flink.util.Collector;
  13 +import properties.PropertiesFactory;
14 14
15 /** 15 /**
16 * Created by mingdan.ge on 2019/11/7. 16 * Created by mingdan.ge on 2019/11/7.
@@ -22,7 +22,7 @@ public class TraceFlinkExecutor { @@ -22,7 +22,7 @@ public class TraceFlinkExecutor {
22 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 22 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
23 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); 23 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
24 24
25 - FlinkKafkaConsumerBase<String> kafkaConsumer = KafkaUtils.finkKafkaConsumer(Constant.KAFKA_CLICK_SERVERS, Constant.KAFKA_CLICK_ANALYSIS_GROUP, Constant.TOPIC_YOHO_LOG_MOBILE); 25 + FlinkKafkaConsumerBase<String> kafkaConsumer = KafkaUtils.finkKafkaConsumer(PropertiesFactory.kafka().getBrokers(), PropertiesFactory.kafka().getGroup(), PropertiesFactory.kafka().getTopic());
26 SingleOutputStreamOperator<Spans> clickStream = env.addSource(kafkaConsumer) 26 SingleOutputStreamOperator<Spans> clickStream = env.addSource(kafkaConsumer)
27 .flatMap(new FlatMapFunction<String, Spans>() { 27 .flatMap(new FlatMapFunction<String, Spans>() {
28 @Override 28 @Override
1 -package com.yoho.trace.utils;  
2 -  
3 -/**  
4 - * Created by mingdan.ge on 2019/11/7.  
5 - */  
6 -public interface Constant {  
7 - String KAFKA_CLICK_ANALYSIS_GROUP = "click.analysis";  
8 - String KAFKA_CLICK_SERVERS = "clickstream.ckafka.yohoops.org:9092";  
9 - String TOPIC_YOHO_LOG_MOBILE = "yoho_log_mobile";  
10 - String DRUID_KAFKA_SERVERS = "10.66.105.44:9092";  
11 -}