Authored by wangnan

优化mqlinster代码结构,减少重复代码

... ... @@ -60,6 +60,8 @@ public class BrandMqListener extends AbstractMqListener implements ChannelAwareM
EventReportEnum.BRANDMQLISTENER_ONMESSAGE.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
Thread.sleep(1000);
throw e;
}finally {
finish();
}
}
... ...
... ... @@ -52,8 +52,10 @@ public class ProductColorMqListener extends AbstractMqListener implements Channe
EventReportEnum.PRODUCTCOLORMQLISTENER_ONMESSAGE.getMoudleName(),"exception", IgnoreSomeException.filterSomeException(e),null));
Thread.sleep(1000);
throw e;
}
}
}finally {
finish();
}
}
@Override
public void updateData(final Map data, final String indexName, final String key) throws Exception {
... ...
... ... @@ -56,6 +56,8 @@ public class ProductSortMqListener extends AbstractMqListener implements Channel
EventReportEnum.PRODUCTSORTMQLISTENER_ONMESSAGE.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
Thread.sleep(1000);
throw e;
}finally {
finish();
}
}
... ...
... ... @@ -56,8 +56,10 @@ public class SizeMqListener extends AbstractMqListener implements ChannelAwareMe
EventReportEnum.SIZEMQLISTENER_ONMESSAGE.getMoudleName(),"exception", IgnoreSomeException.filterSomeException(e),null));
Thread.sleep(1000);
throw e;
}
}
}finally {
finish();
}
}
@Override
public void updateData(final Map data, final String indexName, final String key) throws Exception {
... ...
... ... @@ -56,11 +56,15 @@ public class TblBrandMqListener extends AbstractMqListener implements ChannelAwa
}
@Override
public void finish(){
tblProductNewIndexBuilder.init();
}
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
process(this, message);
long nowTimeMillis = System.currentTimeMillis();
tblProductNewIndexBuilder.init();
publisher.publishEvent(new SearchLogsEvent("MqListener", EventReportEnum.TBLBRANDMQLISTENER_ONMESSAGE.getMoudleName(), "monitor", nowTimeMillis + ""));
// 记录上报的日志
SEARCH_EVENT_LOG.info("report to influxDb,currentTimeMillis is [{}],EventName is [{}] ,MoudleName is [{}]", nowTimeMillis, "MqListener", "consumer");
... ... @@ -69,6 +73,8 @@ public class TblBrandMqListener extends AbstractMqListener implements ChannelAwa
EventReportEnum.TBLBRANDMQLISTENER_ONMESSAGE.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
Thread.sleep(1000);
throw e;
}finally {
finish();
}
}
... ...
... ... @@ -45,6 +45,11 @@ public class TblSiteMqListener extends AbstractMqListener implements ChannelAwar
protected ApplicationEventPublisher publisher;
@Override
public void finish(){
tblProductNewIndexBuilder.init();
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
... ... @@ -56,7 +61,6 @@ public class TblSiteMqListener extends AbstractMqListener implements ChannelAwar
try {
process(this, message);
long nowTimeMillis = System.currentTimeMillis();
tblProductNewIndexBuilder.init();
publisher.publishEvent(new SearchLogsEvent("MqListener", EventReportEnum.TBLSITEMQLISTENER_ONMESSAGE.getMoudleName(), "monitor", nowTimeMillis + ""));
// 记录上报的日志
SEARCH_EVENT_LOG.info("report to influxDb,currentTimeMillis is [{}],EventName is [{}] ,MoudleName is [{}]", nowTimeMillis, "MqListener", "consumer");
... ... @@ -65,7 +69,9 @@ public class TblSiteMqListener extends AbstractMqListener implements ChannelAwar
EventReportEnum.TBLSITEMQLISTENER_ONMESSAGE.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
Thread.sleep(1000);
throw e;
}
}finally {
finish();
}
}
@Override
... ...
... ... @@ -5,6 +5,7 @@ import com.yoho.error.event.SearchEvent;
import com.yoho.error.event.SearchLogsEvent;
import com.yoho.search.base.utils.ConvertUtils;
import com.yoho.search.base.utils.EventReportEnum;
import com.yoho.search.consumer.index.fullbuild.TblProductNewIndexBuilder;
import com.yoho.search.consumer.index.increment.AbstractMqListener;
import com.yoho.search.consumer.service.base.TblSortService;
import com.yoho.search.core.es.utils.IgnoreSomeException;
... ... @@ -30,11 +31,19 @@ public class TblSortMqListener extends AbstractMqListener implements ChannelAwar
private static final Logger SEARCH_EVENT_LOG = LoggerFactory.getLogger("SEARCH_EVENT_LOG");
@Autowired
private TblProductNewIndexBuilder tblProductNewIndexBuilder;
@Autowired
private TblSortService tblSortService;
protected ApplicationEventPublisher publisher;
@Override
public void finish(){
tblProductNewIndexBuilder.init();
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
... ... @@ -52,6 +61,8 @@ public class TblSortMqListener extends AbstractMqListener implements ChannelAwar
EventReportEnum.TBLSORTMQLISTENER_ONMESSAGE.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
Thread.sleep(1000);
throw e;
}finally {
finish();
}
}
... ...