|
|
package com.yoho.search.consumer.index.increment.productIndex;
|
|
|
|
|
|
import com.rabbitmq.client.Channel;
|
|
|
import com.yoho.error.event.SearchEvent;
|
|
|
import com.yoho.error.event.SearchLogsEvent;
|
|
|
import com.yoho.search.base.utils.ConvertUtils;
|
|
|
import com.yoho.search.base.utils.EventReportEnum;
|
...
|
...
|
@@ -10,7 +9,6 @@ import com.yoho.search.consumer.index.increment.AbstractMqListener; |
|
|
import com.yoho.search.consumer.service.base.ProductService;
|
|
|
import com.yoho.search.consumer.service.base.ProductTimingService;
|
|
|
import com.yoho.search.consumer.service.logic.productIndex.StorageUpdateTimeLogicService;
|
|
|
import com.yoho.search.core.es.utils.IgnoreSomeException;
|
|
|
import com.yoho.search.dal.model.Product;
|
|
|
import com.yoho.search.dal.model.ProductTiming;
|
|
|
import com.yoho.search.dal.model.StorageUpdateTime;
|
...
|
...
|
@@ -21,8 +19,6 @@ import org.slf4j.LoggerFactory; |
|
|
import org.springframework.amqp.core.Message;
|
|
|
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.context.ApplicationEventPublisher;
|
|
|
import org.springframework.context.ApplicationEventPublisherAware;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import java.util.ArrayList;
|
...
|
...
|
@@ -34,7 +30,7 @@ import java.util.Map; |
|
|
* Created by wangnan on 2016/11/24.
|
|
|
*/
|
|
|
@Component
|
|
|
public class ProductTimingMqListener extends AbstractMqListener implements ChannelAwareMessageListener, ApplicationEventPublisherAware {
|
|
|
public class ProductTimingMqListener extends AbstractMqListener implements ChannelAwareMessageListener{
|
|
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(ProductTimingMqListener.class);
|
|
|
|
...
|
...
|
@@ -47,11 +43,10 @@ public class ProductTimingMqListener extends AbstractMqListener implements Chann |
|
|
@Autowired
|
|
|
private StorageUpdateTimeLogicService storageUpdateTimeLogicService;
|
|
|
|
|
|
protected ApplicationEventPublisher publisher;
|
|
|
|
|
|
@Override
|
|
|
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
|
|
|
this.publisher = applicationEventPublisher;
|
|
|
public String getIndexName() {
|
|
|
return ISearchConstants.INDEX_NAME_PRODUCT_INDEX;
|
|
|
}
|
|
|
|
|
|
@Override
|
...
|
...
|
@@ -63,8 +58,7 @@ public class ProductTimingMqListener extends AbstractMqListener implements Chann |
|
|
// 记录上报的日志
|
|
|
SEARCH_EVENT_LOG.info("report to influxDb,currentTimeMillis is [{}],EventName is [{}] ,MoudleName is [{}]", nowTimeMillis, "MqListener", "consumer");
|
|
|
} catch (Exception e) {
|
|
|
publisher.publishEvent(new SearchEvent(EventReportEnum.PRODUCTTIMINGMQLISTENER_ONMESSAGE.getEventName(), EventReportEnum.PRODUCTTIMINGMQLISTENER_ONMESSAGE.getFunctionName(),
|
|
|
EventReportEnum.PRODUCTTIMINGMQLISTENER_ONMESSAGE.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
|
|
|
publisher.publishEvent(buildSearchEvent(EventReportEnum.PRODUCTTIMINGMQLISTENER_ONMESSAGE,e));
|
|
|
Thread.sleep(1000);
|
|
|
throw e;
|
|
|
}
|
...
|
...
|
@@ -82,7 +76,7 @@ public class ProductTimingMqListener extends AbstractMqListener implements Chann |
|
|
logger.info("[func=updateData][step=success][tableName=productTiming][id={}][cost={}ms]", productTiming.getId(),(System.currentTimeMillis() - begin));
|
|
|
//更新productIndex
|
|
|
Product product = productService.getBySkn(productTiming.getProductSkn());
|
|
|
if (product != null && product.getId() != null && product.getId() != null) {
|
|
|
if (product != null && product.getId() != null) {
|
|
|
this.updateProductIndex(product.getId(), product.getErpProductId(), System.currentTimeMillis());
|
|
|
logger.info("[class=ProductTimingMqListener][func=updateProductIndex][id={}][cost={}ms]", productTiming.getId(),(System.currentTimeMillis() - begin));
|
|
|
}
|
...
|
...
|
@@ -99,7 +93,7 @@ public class ProductTimingMqListener extends AbstractMqListener implements Chann |
|
|
if (productTiming != null && productTiming.getProductSkn() != null) {
|
|
|
//更新productIndex
|
|
|
Product product = productService.getBySkn(productTiming.getProductSkn());
|
|
|
if (product != null && product.getId() != null && product.getId() != null) {
|
|
|
if (product != null && product.getId() != null) {
|
|
|
this.updateProductIndex(product.getId(), product.getErpProductId(), System.currentTimeMillis());
|
|
|
logger.info("[class=ProductTimingMqListener][func=updateProductIndex][id={}][cost={}ms]", id, (System.currentTimeMillis() - begin));
|
|
|
}
|
...
|
...
|
@@ -124,8 +118,4 @@ public class ProductTimingMqListener extends AbstractMqListener implements Chann |
|
|
this.updateProductIndexWithDataMap(indexData, productId, null, begin);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public String getIndexName() {
|
|
|
return ISearchConstants.INDEX_NAME_PRODUCT_INDEX;
|
|
|
}
|
|
|
} |
...
|
...
|
|