Authored by chunhua.zhang

add spark streaming kafka

... ... @@ -9,14 +9,7 @@
<packaging>jar</packaging>
<name>mlicious-detection </name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
... ... @@ -25,33 +18,49 @@
<spring-kafka.version>1.1.2.RELEASE</spring-kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
... ...
package com.yoho.ops;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MliciousDetectionApplication {
public static void main(String[] args) {
SpringApplication.run(MliciousDetectionApplication.class, args);
}
}
package com.yoho.ops.kafka;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
public class Receiver {
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private CountDownLatch latch = new CountDownLatch(1);
@KafkaListener(topics = "${kafka.topic.helloworld}")
public void receive(String message) {
LOGGER.info("received message='{}'", message);
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
}
\ No newline at end of file
package com.yoho.ops.kafka;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@Configuration
@EnableKafka
public class ReceiverConfig {
@Value("${kafka.servers.bootstrap}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections
// to the Kakfa cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// consumer groups allow a pool of processes to divide the work of
// consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public Receiver receiver() {
return new Receiver();
}
}
\ No newline at end of file
package com.yoho.ops.kafka;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
public class Sender {
private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String message) {
// the KafkaTemplate provides asynchronous send methods returning a
// Future
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
// you can register a callback with the listener to receive the result
// of the send asynchronously
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
LOGGER.info("sent message='{}' with offset={}", message,
result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
LOGGER.error("unable to send message='{}'", message, ex);
}
});
// alternatively, to block the sending thread, to await the result,
// invoke the future's get() method
}
}
\ No newline at end of file
package com.yoho.ops.kafka;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration
public class SenderConfig {
@Value("${kafka.servers.bootstrap}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections
// to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// value to block, after which it will throw a TimeoutException
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}
\ No newline at end of file
package com.yoho.ops.maliciousdetection;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
/**
* Gateway 访问日志
* @author chunhua.zhang@yoho.cnm
* File Pattern: 本地IP|用户IP|当前时间|HTTP请求方式|User-Agent|请求标示|请求参数|HTTP响应码|请求处理时间
*/
public class GatewayAccessLog implements Serializable {
private final static Logger log = LoggerFactory.getLogger(GatewayAccessLog.class.getClass());
private String localIp;
private String userIp;
private String time;
private String httpMethod;
private String userAgent;
private String method;
private String params;
private int reponseCode;
private int cost;
public GatewayAccessLog(String localIp, String userIp, String time, String httpMethod, String userAgent, String method, String params, int reponseCode, int cost) {
this.localIp = localIp;
this.userIp = userIp;
this.time = time;
this.httpMethod = httpMethod;
this.userAgent = userAgent;
this.method = method;
this.params = params;
this.reponseCode = reponseCode;
this.cost = cost;
}
public GatewayAccessLog() {
}
/**
* 解析日志
*
* @param logline
* @return
*/
public static GatewayAccessLog parseFromLogLine(String logline) {
// 本地IP|用户IP|当前时间|HTTP请求方式|User-Agent|请求标示|请求参数|HTTP响应码|请求处理时间
if (StringUtils.isEmpty(logline)) {
return null;
}
String[] params = logline.split("\\|");
if (params == null || params.length < 9) {
return null;
}
int httpResponseCode = 200;
int cost = 0;
try{
httpResponseCode = Integer.parseInt(params[7]);
cost = Integer.parseInt(params[8]);
}catch (Exception e){
return null;
}
return new GatewayAccessLog(params[0], params[1], params[2], params[3],
params[4], params[5], params[6], httpResponseCode, cost);
}
@Override
public String toString() {
return String.format("%s|%s|%s|%s|%s|%s|%s|%d|%d",
localIp, userIp, time, httpMethod, userAgent, method,
params, reponseCode, cost);
}
public String getLocalIp() {
return localIp;
}
public void setLocalIp(String localIp) {
this.localIp = localIp;
}
public String getUserIp() {
return userIp;
}
public void setUserIp(String userIp) {
this.userIp = userIp;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public String getHttpMethod() {
return httpMethod;
}
public void setHttpMethod(String httpMethod) {
this.httpMethod = httpMethod;
}
public String getUserAgent() {
return userAgent;
}
public void setUserAgent(String userAgent) {
this.userAgent = userAgent;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
public String getParams() {
return params;
}
public void setParams(String params) {
this.params = params;
}
public int getReponseCode() {
return reponseCode;
}
public void setReponseCode(int reponseCode) {
this.reponseCode = reponseCode;
}
public int getCost() {
return cost;
}
public void setCost(int cost) {
this.cost = cost;
}
}
... ...
package com.yoho.ops.maliciousdetection;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
/**
* spark main
* ref to : http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/java8/src/main/java/com/databricks/apps/logs/chapter1/LogAnalyzer.java
* Created by chunhua.zhang on 2017/3/28.
*/
public class SparkMain {
private final static String KAFKA_TOPIC = "gateway_access";
//定义一个累加功能的函数
private static Function2<Long, Long, Long> SUM_REDUCER = (a, b) -> a + b;
public static void main(String args[]) throws InterruptedException {
SparkConf conf = new SparkConf().setAppName("Log Analyzer").setMaster("local");
// Create the context with 1 seconds batch size
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, new Duration(1000));
//kafka configuration
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "192.168.103.30:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "gateway-access-log-analyzer");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList(KAFKA_TOPIC);
//create stream
final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
//map to object
JavaDStream<GatewayAccessLog> objStream = stream.map( consumerRecord-> GatewayAccessLog.parseFromLogLine(consumerRecord.value()));
objStream.print();
// Convert the text log lines to GatewayAccessLog objects
streamingContext.start();
streamingContext.awaitTermination();
}
}
... ...
kafka.servers.bootstrap=192.168.103.30:9092
kafka.topic.helloworld=helloworld.t
\ No newline at end of file