Authored by wangnan

整理配置文件,删除两个没用的mqListener

package com.yoho.search.consumer.index.increment;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.yoho.search.dal.model.SearchAction;
import com.yoho.error.event.SearchEvent;
import com.yoho.search.base.utils.ConvertUtils;
import com.yoho.search.base.utils.EventReportEnum;
import com.yoho.search.base.utils.ISearchConstans;
import com.yoho.search.consumer.index.common.IYohoIndexService;
import com.yoho.search.consumer.service.base.ProductService;
import com.yoho.search.consumer.service.base.SearchActionService;
import com.yoho.search.core.es.utils.IgnoreSomeException;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* Created by YOHO on 15-9-2.
*/
@Component
public class SearchActionMqListener extends AbstractMqListener implements ChannelAwareMessageListener {
private static final Logger logger = LoggerFactory.getLogger(SearchActionMqListener.class);
@Autowired
private IYohoIndexService indexService;
@Autowired
private SearchActionService searchActionService;
@Autowired
private ProductService productService;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
final String key = UUID.randomUUID().toString();
String messageStr = new String(message.getBody(), "UTF-8");
logger.info("[model=SearchActionMqListener][key={}][message={}]", key, messageStr);
// 如果在重建索引等待
this.waitingRebuildingIndex();
JSONObject json = JSONObject.parseObject(messageStr);
if (ISearchConstans.ACTION_DELETE.equals(json.getString("action"))) {
deleteData(json.getString("data"), key);
} else if (ISearchConstans.ACTION_UPDATE.equals(json.getString("action"))) {
updateData(json.getObject("data", Map.class), key);
} else {
updateData(json.getObject("data", Map.class), key);
}
} catch (Exception e) {
publisher.publishEvent(new SearchEvent(EventReportEnum.SEARCHACTIONMQLISTENER_ONMESSAGE.getEventName(),
EventReportEnum.SEARCHACTIONMQLISTENER_ONMESSAGE.getFunctionName(),
EventReportEnum.SEARCHACTIONMQLISTENER_ONMESSAGE.getMoudleName(),"exception",IgnoreSomeException.filterSomeException(e),null));
Thread.sleep(1000);
throw e;
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public void updateData(final Map data, final String key) {
long begin = System.currentTimeMillis();
SearchAction searchAction = new SearchAction();
searchAction = (SearchAction) ConvertUtils.toJavaBean(searchAction, data);
if (searchAction == null || searchAction.getId() == null) {
return;
}
searchActionService.saveOrUpdate(searchAction);
logger.info("[func=updateData][step=saveToDb][key={}][cost={}ms]", key, System.currentTimeMillis() - begin);
Integer productSkn = searchAction.getProductSkn();
if (productSkn == null) {
return;
}
updateProductIndex(productSkn, begin, key);
}
public void deleteData(final String id, final String key) {
if (StringUtils.isBlank(id)) {
return;
}
long begin = System.currentTimeMillis();
Integer idInt = Integer.parseInt(id);
SearchAction searchAction = searchActionService.getById(idInt);
if (searchAction == null) {
return;
}
searchActionService.delete(idInt);// 先从数据库删除数据
logger.info("[func=deleteData][step=deleteFromDb][key={}][cost={}ms]", key, System.currentTimeMillis() - begin);
this.clearProductIndex(searchAction.getProductSkn(), begin, key);
}
private void clearProductIndex(Integer productSkn, long begin, final String key) {
Integer productId = productService.selectProductIdBySkn(productSkn);
if (productId == null) {
return;
}
Map<String, Object> indexData = new HashMap<String, Object>();
indexData.put("productId", productId);
indexData.put("saleAction", 0);
this.updateProductIndexWithDataMap(indexData, productId, key, begin);
logger.info("[func=clearProductIndex][step=clearProductIndex][key={}][productId={}][cost={}ms]", key, productId,System.currentTimeMillis() - begin);
}
private void updateProductIndex(Integer productSkn, long begin, final String key) {
Map<String, Object> indexData = searchActionService.getSaleActionMap(productSkn);
if (indexData == null || indexData.size() == 0) {
return;
}
Object productId = indexData.get("productId");
if (productId == null) {
return;
}
Integer productIdInt = Integer.valueOf(productId.toString());
this.updateProductIndexWithDataMap(indexData, productIdInt, key, begin);
logger.info("[func=updateProductIndex][step=updateProductIndex][key={}][productId={}][cost={}ms]", key,productId, System.currentTimeMillis() - begin);
}
}
package com.yoho.search.consumer.index.increment;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.yoho.search.base.utils.ConvertUtils;
import com.yoho.search.base.utils.EventReportEnum;
import com.yoho.search.base.utils.ISearchConstans;
import com.yoho.search.dal.model.YohoodProduct;
import com.yoho.search.consumer.index.common.IYohoIndexService;
import com.yoho.search.consumer.service.base.YohoodProductService;
import com.yoho.search.core.es.utils.IgnoreSomeException;
import com.yoho.error.event.SearchEvent;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class YohoodProductMqListener extends AbstractMqListener implements ChannelAwareMessageListener {
private static final Logger logger = LoggerFactory.getLogger(YohoodProductMqListener.class);
@Autowired
private IYohoIndexService indexService;
@Autowired
private YohoodProductService yohoodProductService;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
String messageStr = new String(message.getBody(), "UTF-8");
logger.info("[model=YohoodProductMqListener][message={}]", messageStr);
// 如果在重建索引等待
this.waitingRebuildingIndex();
JSONObject json = JSONObject.parseObject(messageStr);
if (ISearchConstans.ACTION_DELETE.equals(json.getString("action"))) {
deleteData(json.getString("data"));
} else if (ISearchConstans.ACTION_UPDATE.equals(json.getString("action"))) {
updateData(json.getObject("data", Map.class));
} else {
updateData(json.getObject("data", Map.class));
}
} catch (Exception e) {
publisher.publishEvent(new SearchEvent(EventReportEnum.YOHOODPRODUCTMQLISTENER_ONMESSAGE.getEventName(),
EventReportEnum.YOHOODPRODUCTMQLISTENER_ONMESSAGE.getFunctionName(),
EventReportEnum.YOHOODPRODUCTMQLISTENER_ONMESSAGE.getMoudleName(),"exception", IgnoreSomeException.filterSomeException(e),null));
Thread.sleep(1000);
throw e;
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public void updateData(final Map data) {
long begin = System.currentTimeMillis();
YohoodProduct yohoodProduct = new YohoodProduct();
yohoodProduct = (YohoodProduct) ConvertUtils.toJavaBean(yohoodProduct, data);
if (yohoodProduct == null || yohoodProduct.getProductSkn() == null) {
return;
}
if (StringUtils.isBlank(yohoodProduct.getIsOnline())) {
yohoodProduct.setIsOnline(null);
}
yohoodProductService.saveOrUpdate(yohoodProduct);
Map<String, Object> map = yohoodProductService.isYohood(yohoodProduct.getProductSkn());
if (map == null) {
return;
}
updateIndex(map, begin);
}
public void deleteData(final String id) {
if (StringUtils.isBlank(id)) {
return;
}
long begin = System.currentTimeMillis();
Integer productSku = Integer.parseInt(id);
YohoodProduct yohoodProduct = yohoodProductService.getById(productSku);
if (yohoodProduct == null) {
return;
}
yohoodProductService.delete(productSku);// 先从数据库删除数据
Map<String, Object> map = yohoodProductService.isYohood(yohoodProduct.getProductSkn());
if (map == null) {
return;
}
updateIndex(map, begin);
logger.info("[func=deleteData][step=success][indexName={}][id={}][cost={}ms]", ISearchConstans.INDEX_NAME_PRODUCT_INDEX, id, (System.currentTimeMillis() - begin));
}
private void updateIndex(Map<String, Object> map, long begin) {
String productId = map.get("productId").toString();
if (StringUtils.isBlank(productId))
return;
try {
indexService.updateIndexData(ISearchConstans.INDEX_NAME_PRODUCT_INDEX, productId, map);
} catch (Exception e) {
publisher.publishEvent(new SearchEvent(EventReportEnum.YOHOODPRODUCTMQLISTENER_UPDATEINDEX.getEventName(),
EventReportEnum.YOHOODPRODUCTMQLISTENER_UPDATEINDEX.getFunctionName(),
EventReportEnum.YOHOODPRODUCTMQLISTENER_UPDATEINDEX.getMoudleName(),"exception",IgnoreSomeException.filterSomeException(e),null));
// this.sendMessage(Integer.parseInt(productId));
logger.error("[func=updateIndex][step=error][indexName={}][id={}][cost={}ms][error={}]", ISearchConstans.INDEX_NAME_PRODUCT_INDEX, productId,
(System.currentTimeMillis() - begin), e.getMessage());
}
logger.info("[func=updateIndex][step=success][indexName={}][id={}][cost={}ms]", ISearchConstans.INDEX_NAME_PRODUCT_INDEX, productId, (System.currentTimeMillis() - begin));
}
}
... ... @@ -8,7 +8,6 @@ import com.yoho.search.base.utils.EventReportEnum;
import com.yoho.search.base.utils.HttpClientUtils;
import com.yoho.search.base.utils.ISearchConstans;
import com.yoho.search.consumer.index.common.IYohoIndexService;
import com.yoho.search.consumer.index.increment.YohoodProductMqListener;
import com.yoho.search.consumer.service.base.ProductIndexService;
import com.yoho.search.consumer.service.bo.ProductIndexBO;
import com.yoho.search.core.es.model.ESBluk;
... ... @@ -28,7 +27,7 @@ import java.util.List;
import java.util.Map;
@Component
public class TplAdaptorJob extends YohoodProductMqListener implements ApplicationEventPublisherAware{
public class TplAdaptorJob implements ApplicationEventPublisherAware{
private static final Logger logger = LoggerFactory.getLogger(TplAdaptorJob.class);
... ... @@ -151,4 +150,9 @@ public class TplAdaptorJob extends YohoodProductMqListener implements Applicatio
logger.error(e.getMessage(), e);
}
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
}
}
... ...
... ... @@ -10,6 +10,8 @@
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<context:component-scan base-package="com.yoho.search.consumer.*" />
<!-- 连接服务配置 -->
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="username" value="${search.mq.username}"/>
... ... @@ -81,7 +83,6 @@
<rabbit:listener queue-names="data_update_channel_productsearch" ref="productSearchMqListener" />
<rabbit:listener queue-names="data_update_channel_productactivitieslink" ref="productActivitiesLinkMqListener" />
<rabbit:listener queue-names="data_update_channel_parametermake" ref="parameterMakeMqListener" />
<rabbit:listener queue-names="data_update_channel_searchaction" ref="searchActionMqListener" />
<rabbit:listener queue-names="data_update_channel_productkeywords" ref="productKeywordsMqListener" />
<rabbit:listener queue-names="data_update_channel_productpooldetail" ref="productPoolDetailMqListener" />
... ...
... ... @@ -9,67 +9,7 @@
http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task.xsd">
<!-- see java based configuration file -->
<task:annotation-driven/>
<context:annotation-config/>
<bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>
<!-- 配置调度程序quartz
<bean id="indexRebuildJob" class="IndexRebuildJob" />
<bean id="indexRebuildJobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
<property name="targetObject">
<ref bean="indexRebuildJob" />
</property>
<property name="targetMethod">
<value>execute</value>
</property>
</bean>
<bean id="indexRebuildJobCronTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail" ref="indexRebuildJobDetail"></property>
<property name="cronExpression" value="0 0 3 * * ?"></property>
</bean>
-->
<!-- ======================== 调度触发器 ========================
<bean id="rebuildTblProductIndexJobDetail"
class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
<property name="targetObject">
<ref bean="indexRebuildJob" />
</property>
<property name="targetMethod">
<value>rebuildTblProductIndex</value>
</property>
</bean>
<bean id="rebuildTblProductIndexCronTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail" ref="rebuildTblProductIndexJobDetail"></property>
<property name="cronExpression" value="0 50 * * * ?"></property>
</bean>
<bean id="tplAdaptorJob" class="TplAdaptorJob" />
<bean id="tplAdaptorJobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
<property name="targetObject">
<ref bean="tplAdaptorJob" />
</property>
<property name="targetMethod">
<value>doTplAdaptor</value>
</property>
</bean>
<bean id="tplAdaptorJobCronTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail" ref="tplAdaptorJobDetail"></property>
<property name="cronExpression" value="0 55 * * * ?"></property>
</bean>
-->
<!-- ======================== 调度工厂 ========================
<bean id="SpringJobSchedulerFactoryBean"
class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="triggers">
<list>
<ref bean="indexRebuildJobCronTrigger" />
<ref bean="rebuildTblProductIndexCronTrigger" />
<ref bean="tplAdaptorJobCronTrigger" />
</list>
</property>
</bean>
-->
<bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>
</beans>
\ No newline at end of file
... ...