Authored by gemingdan

SuggestConversionCustom Mq

package com.yoho.search.consumer.index.increment;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.yoho.error.event.SearchEvent;
import com.yoho.search.base.utils.ConvertUtils;
import com.yoho.search.base.utils.EventReportEnum;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.consumer.service.base.SuggestConversionCustomService;
import com.yoho.search.core.es.utils.IgnoreSomeException;
import com.yoho.search.dal.model.SuggestConversionCustom;
@Component
public class SuggestConversionCustomMqListener extends AbstractMqListener implements ChannelAwareMessageListener{
private static final Logger logger = LoggerFactory.getLogger(SuggestConversionCustomMqListener.class);
@Autowired
SuggestConversionCustomService suggestConversionCustomService;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
String messageStr = new String(message.getBody(), "UTF-8");
logger.info("[model=ProductAttributeMqListener] [message={}]", messageStr);
// 如果在重建索引等待
this.waitingRebuildingIndex();
JSONObject json = JSONObject.parseObject(messageStr);
String tableName = ISearchConstants.TABLE_NAME_TBL_SUGGEST_CONVERSION_CUSTOM;
String action = json.getString("action");
if (ISearchConstants.ACTION_DELETE.equals(action)) {
deleteData(json.getString("data"), tableName);
} else if (ISearchConstants.ACTION_UPDATE.equals(action)) {
updateData(json.getObject("data", Map.class), tableName);
} else {
updateData(json.getObject("data", Map.class), tableName);
}
} catch (Exception e) {
publisher.publishEvent(new SearchEvent(EventReportEnum.SUGGESTCONVERSIONCUSTOMMQLISTENER_ONMESSAGE.getEventName(), EventReportEnum.SUGGESTCONVERSIONCUSTOMMQLISTENER_ONMESSAGE.getFunctionName(),
EventReportEnum.SUGGESTCONVERSIONCUSTOMMQLISTENER_ONMESSAGE.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
Thread.sleep(1000);
throw e;
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public void updateData(final Map data, final String tableName) throws Exception {
long begin = System.currentTimeMillis();
SuggestConversionCustom suggestConversionCustom = new SuggestConversionCustom();
suggestConversionCustom = (SuggestConversionCustom) ConvertUtils.toJavaBean(suggestConversionCustom, data);
if (suggestConversionCustom == null || suggestConversionCustom.getId() == null) {
return;
}
// 更新数据库
suggestConversionCustomService.saveOrUpdate(suggestConversionCustom);
logger.info("[func=updateData][step=success][tableName={}][id={}][cost={}ms]", tableName, suggestConversionCustom.getId(), (System.currentTimeMillis() - begin));
}
public void deleteData(final String id, final String tableName) throws Exception {
if (StringUtils.isBlank(id)) {
return;
}
long begin = System.currentTimeMillis();
// 删除数据
suggestConversionCustomService.delete(Integer.valueOf(id));
logger.info("[func=deleteData][step=success][tableName={}][id={}][cost={}ms]", tableName, id, (System.currentTimeMillis() - begin));
}
}
... ...
... ... @@ -28,4 +28,20 @@ public class SuggestConversionCustomService {
public SuggestConversionCustom selectBySource(String source){
return suggestConversionCustomMapper.selectBySource(source);
}
public void saveOrUpdate(SuggestConversionCustom suggestConversionCustom) {
if(suggestConversionCustom==null||suggestConversionCustom.getId()==null){
return;
}
if(suggestConversionCustomMapper.selectByPrimaryKey(suggestConversionCustom.getId())==null){
//插入
suggestConversionCustomMapper.insert(suggestConversionCustom);
}else{
//更新
suggestConversionCustomMapper.updateByPrimaryKey(suggestConversionCustom);
}
}
public void delete(Integer id) {
suggestConversionCustomMapper.deleteByPrimaryKey(id);
}
}
... ...
... ... @@ -79,6 +79,7 @@
<rabbit:queue durable="true" exclusive="false" name="data_update_channel_productext" />
<rabbit:queue durable="true" exclusive="false" name="data_update_channel_productattributepropertyvalues" />
<rabbit:queue durable="true" exclusive="false" name="data_update_channel_productattribute" />
<rabbit:queue durable="true" exclusive="false" name="data_update_channel_suggestconversioncustom" />
<rabbit:template exchange="${search.mq.exchange}" id="amqpTemplate"
connection-factory="connectionFactory" message-converter="jsonMessageConverter" />
... ... @@ -160,6 +161,7 @@
<rabbit:listener queue-names="data_update_channel_productext" ref="productExtMqListener" />
<rabbit:listener queue-names="data_update_channel_productattributepropertyvalues" ref="productAttributePropertyValuesMqListener" />
<rabbit:listener queue-names="data_update_channel_productattribute" ref="productAttributeMqListener" />
<rabbit:listener queue-names="data_update_channel_suggestconversioncustom" ref="suggestConversionCustomMqListener" />
</rabbit:listener-container>
</beans>
\ No newline at end of file
... ...