Authored by zhengwen.ge

kafak

... ... @@ -17,6 +17,7 @@
<project-name>yoho-unions</project-name>
<project-version>1.0.0-SNAPSHOT</project-version>
<yoho.service.model.version>1.0.0-SNAPSHOT</yoho.service.model.version>
<spring-kafka.version>1.1.2.RELEASE</spring-kafka.version>
</properties>
<dependencyManagement>
... ... @@ -154,6 +155,11 @@
<artifactId>spring-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<scope>provided</scope>
... ...
package com.yoho.unions.server.mqconsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.kafka.listener.MessageListener;
/**
* Created by yoho on 2017/4/17.
*/
public class KafkaConsumer implements MessageListener<String, String> {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
/**
* 初始化
*/
// public void init(){
// MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8");
// gdt_Headers = new HttpHeaders();
// gdt_Headers.setContentType(type);
//
// dmp_Headers = new HttpHeaders();
// //360DMP必须强制指定,不然报如下错误
// //org.springframework.web.client.RestClientException: Could not extract response: no suitable HttpMessageConverter found for response type [class java.lang.String] and content type [application/octet-stream]
// dmp_Headers.setContentType(type);
// dmp_Headers.add("Accept", MediaType.APPLICATION_JSON.toString());
// dmp_Headers.add("App-Key", configReader.getString(DataToContants.DMP_UPLOAD_APP_KEY, DataToContants.DMP_UPLOAD_APP_KEY_DEFAULT));
// }
public void onMessage(ConsumerRecord<String, String> consumerRecord){
}
}
... ...
... ... @@ -124,4 +124,70 @@
<entry key="channelUserServiceImpl" value-ref="channelUserServiceImpl"/>
</util:map>
<!-- kafka consumer -->
<!-- 定义consumer的参数 -->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${bootstrap-consumer.servers}" />
<entry key="group.id" value="data2qq__consumer__1" />
<entry key="enable.auto.commit" value="false"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
</map>
</constructor-arg>
</bean>
<!-- 创建consumerFactory bean -->
<bean id="kafkaConsumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties" />
</constructor-arg>
</bean>
<bean id="kafkaConsumer" class="com.yoho.unions.server.mqconsumer.KafkaConsumer">
</bean>
<!-- 消费者容器配置信息 -->
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="yoho_log_mobile" /> <!-- yoho_log_mobile/test -->
<property name="messageListener" ref="kafkaConsumer" />
</bean>
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
init-method="doStart">
<constructor-arg ref="kafkaConsumerFactory" />
<constructor-arg ref="containerProperties" />
</bean>
<!-- kafka consumer -->
<!-- 定义producer的参数 -->
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${bootstrap-producer.servers}" />
<entry key="group.id" value="0" />
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
</map>
</constructor-arg>
</bean>
<!-- 创建kafkatemplate需要使用的producerfactory bean -->
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties" />
</constructor-arg>
</bean>
<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
<bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory" />
<constructor-arg name="autoFlush" value="true" />
<property name="defaultTopic" value="yoho_log_mobile" /> <!-- yoho_log_mobile/test -->
</bean>
<!--kafak product-->
</beans>
... ...
... ... @@ -110,3 +110,6 @@ exclude.union.type=100000000000349
business.mobile=13621380911,15210647200
bootstrap-consumer.servers=10.67.1.23:9092,10.67.1.53:9092,10.67.1.61:9092,10.67.1.69:9092
bootstrap-producer.servers=10.67.1.23:9092,10.67.1.53:9092,10.67.1.61:9092,10.67.1.69:9092
\ No newline at end of file
... ...