Authored by wangnan9279

Merge branch 'wn_promotion_bulk' into gray1230

... ... @@ -14,6 +14,8 @@ public interface PromotionInfoMapper {
List<PromotionInfo> selectValidPromotionList(Integer currentTime);
List<PromotionInfo> selectByIds(List<Integer> ids);
int insert(PromotionInfo record);
int updateByPrimaryKey(PromotionInfo record);
... ... @@ -22,5 +24,5 @@ public interface PromotionInfoMapper {
int selectCount();
List<PromotionInfo> selectPageLists(@Param(value="start")Integer start, @Param(value="limit")Integer limit);
List<PromotionInfo> selectPageLists(@Param(value = "start") Integer start, @Param(value = "limit") Integer limit);
}
... ...
... ... @@ -19,4 +19,6 @@ public interface PromotionParamsMapper {
int updateByPrimaryKey(PromotionParams record);
int deleteByPrimaryKey(Integer id);
List<PromotionParams> selectByIds(List<Integer> ids);
}
... ...
... ... @@ -72,7 +72,7 @@
<select id="selectPageLists" resultMap="BaseResultMap" timeout="20000">
select
<include refid="Base_Column_List" />
<include refid="Base_Column_List"/>
from promotion_info
where
status = 1
... ... @@ -80,4 +80,14 @@
limit #{start},#{limit}
</select>
<select id="selectByIds" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
from promotion_info where id in
<foreach item="item" index="index" collection="list"
open="(" separator="," close=")">
#{item}
</foreach>
</select>
</mapper>
\ No newline at end of file
... ...
... ... @@ -54,4 +54,15 @@
reject_param= #{rejectParam,jdbcType=VARCHAR}
where promotion_id = #{promotionId,jdbcType=INTEGER}
</update>
<select id="selectByIds" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
from promotion_params where promotion_id in
<foreach item="item" index="index" collection="list"
open="(" separator="," close=")">
#{item}
</foreach>
</select>
</mapper>
\ No newline at end of file
... ...
... ... @@ -11,6 +11,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
... ... @@ -31,19 +32,19 @@ public class PromotionIndexIndexBuilder extends IIndexBuilder {
@Override
public List<?> getPageLists(int start, int limit) throws Exception {
List<PromotionIndexBO> promotionIndexBOList = new ArrayList<>();
List<PromotionInfo> promotionInfoList = promotionInfoMapper.selectPageLists(start,limit);
if(CollectionUtils.isEmpty(promotionInfoList)){
List<PromotionInfo> promotionInfoList = promotionInfoMapper.selectPageLists(start, limit);
if (CollectionUtils.isEmpty(promotionInfoList)) {
return new ArrayList<>();
}
List<Integer> idList = promotionInfoList.stream().map(PromotionInfo::getId).collect(Collectors.toList());
List<PromotionParams> promotionParamsList = promotionParamsMapper.selectByPromotionIdList(idList);
if(CollectionUtils.isEmpty(promotionParamsList)){
if (CollectionUtils.isEmpty(promotionParamsList)) {
return new ArrayList<>();
}
Map<Integer,PromotionParams> promotionParamsMap = promotionParamsList.stream().parallel().collect(Collectors.toMap(PromotionParams::getPromotionId, p->p));
for(PromotionInfo promotionInfo:promotionInfoList){
Map<Integer, PromotionParams> promotionParamsMap = promotionParamsList.stream().parallel().collect(Collectors.toMap(PromotionParams::getPromotionId, p -> p));
for (PromotionInfo promotionInfo : promotionInfoList) {
PromotionParams promotionParams = promotionParamsMap.get(promotionInfo.getId());
PromotionIndexBO promotionIndexBO = buildPromotionIndexBO(promotionInfo,promotionParams);
PromotionIndexBO promotionIndexBO = buildPromotionIndexBO(promotionInfo, promotionParams);
promotionIndexBOList.add(promotionIndexBO);
}
return promotionIndexBOList;
... ... @@ -55,10 +56,28 @@ public class PromotionIndexIndexBuilder extends IIndexBuilder {
}
/**
* 全量建Promotionindex索引、PromotionInfoMqListener、PromotionParamsMqListener都使用
*/
public PromotionIndexBO buildPromotionIndexBO(PromotionInfo promotionInfo, PromotionParams promotionParams) {
public List<PromotionIndexBO> buildPromotionIndexBOList(List<Integer> promotionIdList) {
List<PromotionInfo> promotionInfoList = promotionInfoMapper.selectByIds(promotionIdList);
Map<Integer, PromotionInfo> promotionInfoMap = new HashMap<>();
if (CollectionUtils.isNotEmpty(promotionInfoList)) {
promotionInfoMap = promotionInfoList.stream().collect(Collectors.toMap(PromotionInfo::getId, p -> p));
}
List<PromotionParams> promotionParamsList = promotionParamsMapper.selectByIds(promotionIdList);
Map<Integer, PromotionParams> promotionParamsMap = new HashMap<>();
if (CollectionUtils.isNotEmpty(promotionInfoList)) {
promotionParamsMap = promotionParamsList.stream().collect(Collectors.toMap(PromotionParams::getPromotionId, p -> p));
}
List<PromotionIndexBO> promotionIndexBOList = new ArrayList<>();
for (Integer promotionId : promotionIdList) {
PromotionInfo promotionInfo = promotionInfoMap.get(promotionId);
PromotionParams promotionParams = promotionParamsMap.get(promotionId);
PromotionIndexBO promotionIndexBO = buildPromotionIndexBO(promotionInfo, promotionParams);
promotionIndexBOList.add(promotionIndexBO);
}
return promotionIndexBOList;
}
private PromotionIndexBO buildPromotionIndexBO(PromotionInfo promotionInfo, PromotionParams promotionParams) {
PromotionIndexBO promotionIndexBO = new PromotionIndexBO();
if (promotionInfo != null) {
promotionIndexBO.setId(promotionInfo.getId());
... ... @@ -80,4 +99,6 @@ public class PromotionIndexIndexBuilder extends IIndexBuilder {
}
return promotionIndexBO;
}
}
... ...
package com.yoho.search.consumer.index.increment.bulks;
import com.alibaba.fastjson.JSON;
import com.yoho.search.base.models.PromotionIndexBO;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.consumer.common.IYohoIndexService;
import com.yoho.search.consumer.index.fullbuild.promotion.PromotionIndexIndexBuilder;
import com.yoho.search.consumer.service.utils.CostStatistics;
import com.yoho.search.core.es.model.ESBluk;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 促销索引BULK服务
* @author wangnan
* @version 2018/12/17
*/
@Component
public class PromotionBulkService {
private static final Logger logger = LoggerFactory.getLogger(PromotionBulkService.class);
private static final String INDEX_NAME_PROMOTION_INDEX = ISearchConstants.INDEX_NAME_PROMOTIONINDEX;
private static final long THREAD_SLEEP_IDLE = 2000;
private static final long THREAD_SLEEP_WORK = 50;
@Autowired
private IYohoIndexService indexService;
@Autowired
private PromotionIndexIndexBuilder promotionIndexIndexBuilder;
private ArrayBlockingQueue<Integer> idUpdateQueue = new ArrayBlockingQueue<Integer>(200);
private ArrayBlockingQueue<Integer> idDeleteQueue = new ArrayBlockingQueue<Integer>(200);
private ExecutorService updateExecutorService = Executors.newSingleThreadExecutor();
private ExecutorService deleteExecutorService = Executors.newSingleThreadExecutor();
@PostConstruct
void init() {
updateExecutorService.submit((Runnable) () -> {
while (true) {
try {
long threadSleep = doBulkUpdateIndex();
Thread.sleep(threadSleep);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
deleteExecutorService.submit((Runnable) () -> {
while (true) {
try {
long threadSleep = doBulkDeleteIndex();
Thread.sleep(threadSleep);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
updateExecutorService.shutdown();
deleteExecutorService.shutdown();
}));
}
public void updateIndex(Integer id) {
try {
if (id == null) {
return;
}
idUpdateQueue.put(id);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
public void deleteIndex(Integer id) {
try {
if (id == null) {
return;
}
idDeleteQueue.put(id);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
private long doBulkUpdateIndex() {
try {
CostStatistics costStatistics = new CostStatistics();
List<PromotionIndexBO> promotionIndexBOList = new ArrayList<>();
// 1、根据id去取数据库对象
List<Integer> idList = this.getElementsFromArrayBlockingQueue(idUpdateQueue, 200);
if (!idList.isEmpty()) {
promotionIndexBOList = promotionIndexIndexBuilder.buildPromotionIndexBOList(idList);
logger.info("doBulkUpdatePromotionIndex buildPromotionIndexBOList, size is [{}], cost [{}]ms", promotionIndexBOList.size(), costStatistics.getCost());
}
// 2、没数据直接返回[并让线程休息1000ms]
if (promotionIndexBOList.isEmpty()) {
return THREAD_SLEEP_IDLE;
}
// 3、批量更新ES
this.updateIndexByList(promotionIndexBOList);
logger.info("doBulkUpdatePromotionIndex updateIndexByList, cost [{}]ms", costStatistics.getCost());
// 4、打印总耗时
logger.info("doBulkUpdatePromotionIndex end , costStatistics is [{}] ", costStatistics.getCostStatisticsString());
// 让线程休息50ms继续工作
return THREAD_SLEEP_WORK;
} catch (Exception e) {
logger.error(e.getMessage(), e);
return THREAD_SLEEP_WORK;
}
}
private long doBulkDeleteIndex() {
try {
CostStatistics costStatistics = new CostStatistics();
// 1、根据id去取数据库对象
List<Integer> ids = this.getElementsFromArrayBlockingQueue(idDeleteQueue, 200);
// 2、没数据直接返回[并让线程休息1000ms]
if (ids.isEmpty()) {
return THREAD_SLEEP_IDLE;
}
// 3、批量更新ES
this.deleteIndexByList(ids);
logger.info("doBulkDeletePromotionIndex start, ids size is [{}], cost [{}]ms", ids.size(), costStatistics.getCost());
// 4、打印总耗时
logger.info("doBulkDeletePromotionIndex end , costStatistics is [{}] ", costStatistics.getCostStatisticsString());
return THREAD_SLEEP_WORK;// 让线程休息50ms继续工作
} catch (Exception e) {
logger.error(e.getMessage(), e);
return THREAD_SLEEP_WORK;
}
}
private void updateIndexByList(List<PromotionIndexBO> promotionIndexBOList) {
try {
if (CollectionUtils.isEmpty(promotionIndexBOList)) {
return;
}
List<ESBluk> results = new ArrayList<ESBluk>();
for (PromotionIndexBO promotionIndexBO : promotionIndexBOList) {
results.add(new ESBluk(JSON.toJSONString(promotionIndexBO), promotionIndexBO.getId().toString(), INDEX_NAME_PROMOTION_INDEX, INDEX_NAME_PROMOTION_INDEX, false));
}
indexService.bulk(results);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
private void deleteIndexByList(List<Integer> ids) {
try {
if (CollectionUtils.isEmpty(ids)) {
return;
}
List<ESBluk> results = new ArrayList<ESBluk>();
for (Integer id : ids) {
results.add(new ESBluk(null, id.toString(), INDEX_NAME_PROMOTION_INDEX, INDEX_NAME_PROMOTION_INDEX, true));
}
indexService.bulk(results);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
private <T> List<T> getElementsFromArrayBlockingQueue(ArrayBlockingQueue<T> queue, int limit) {
List<T> results = new ArrayList<T>();
while (results.size() < limit && (!queue.isEmpty())) {
results.add(queue.poll());
}
return results;
}
}
... ...
package com.yoho.search.consumer.index.increment.yhb_promotion;
import com.alibaba.fastjson.JSONObject;
import com.yoho.search.base.models.PromotionIndexBO;
import com.yoho.search.base.utils.ConvertUtils;
import com.yoho.search.base.utils.EventReportEnum;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.consumer.common.IYohoIndexService;
import com.yoho.search.consumer.index.fullbuild.promotion.PromotionIndexIndexBuilder;
import com.yoho.search.consumer.index.increment.AbstractMqListener;
import com.yoho.search.consumer.index.increment.bulks.PromotionBulkService;
import com.yoho.search.consumer.service.daoService.PromotionInfoService;
import com.yoho.search.core.message.beans.SearchMqConsumerListerner;
import com.yoho.search.dal.PromotionParamsMapper;
import com.yoho.search.dal.model.PromotionInfo;
import com.yoho.search.dal.model.PromotionParams;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@SearchMqConsumerListerner(dbName = "yhb_promotion",tableName = "promotion_info")
@SearchMqConsumerListerner(dbName = "yhb_promotion", tableName = "promotion_info")
public class PromotionInfoMqListener extends AbstractMqListener {
@Autowired
... ... @@ -27,7 +25,7 @@ public class PromotionInfoMqListener extends AbstractMqListener {
@Autowired
private IYohoIndexService indexService;
@Autowired
private PromotionIndexIndexBuilder promotionIndexIndexBuilder;
private PromotionBulkService promotionBulkService;
@Override
protected EventReportEnum getEventReportEnum() {
... ... @@ -38,6 +36,7 @@ public class PromotionInfoMqListener extends AbstractMqListener {
protected void deleteData(String id) throws Exception {
promotionInfoService.delete(Integer.valueOf(id));
indexService.deleteIndexData(ISearchConstants.INDEX_NAME_PROMOTIONINDEX, id);
promotionBulkService.deleteIndex(Integer.valueOf(id));
}
@Override
... ... @@ -49,9 +48,7 @@ public class PromotionInfoMqListener extends AbstractMqListener {
//更新DB
promotionInfoService.saveOrUpdate(promotionInfo);
//更新ES
PromotionParams promotionParams = promotionParamsMapper.selectByPrimaryKey(promotionInfo.getId());
PromotionIndexBO promotionIndexBO = promotionIndexIndexBuilder.buildPromotionIndexBO(promotionInfo, promotionParams);
indexService.updateIndexData(ISearchConstants.INDEX_NAME_PROMOTIONINDEX, promotionInfo.getId().toString(), promotionIndexBO);
promotionBulkService.updateIndex(promotionInfo.getId());
}
}
... ...
package com.yoho.search.consumer.index.increment.yhb_promotion;
import com.alibaba.fastjson.JSONObject;
import com.yoho.search.base.models.PromotionIndexBO;
import com.yoho.search.base.utils.ConvertUtils;
import com.yoho.search.base.utils.EventReportEnum;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.consumer.common.IYohoIndexService;
import com.yoho.search.consumer.index.fullbuild.promotion.PromotionIndexIndexBuilder;
import com.yoho.search.consumer.index.increment.AbstractMqListener;
import com.yoho.search.consumer.index.increment.bulks.PromotionBulkService;
import com.yoho.search.consumer.service.daoService.PromotionParamsService;
import com.yoho.search.core.message.beans.SearchMqConsumerListerner;
import com.yoho.search.dal.PromotionInfoMapper;
import com.yoho.search.dal.model.PromotionInfo;
import com.yoho.search.dal.model.PromotionParams;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@SearchMqConsumerListerner(dbName = "yhb_promotion",tableName = "promotion_params")
@SearchMqConsumerListerner(dbName = "yhb_promotion", tableName = "promotion_params")
public class PromotionParamsMqListener extends AbstractMqListener {
@Autowired
... ... @@ -25,9 +21,7 @@ public class PromotionParamsMqListener extends AbstractMqListener {
@Autowired
protected PromotionInfoMapper promotionInfoMapper;
@Autowired
private IYohoIndexService indexService;
@Autowired
private PromotionIndexIndexBuilder promotionIndexIndexBuilder;
private PromotionBulkService promotionBulkService;
@Override
protected EventReportEnum getEventReportEnum() {
... ... @@ -37,6 +31,7 @@ public class PromotionParamsMqListener extends AbstractMqListener {
@Override
protected void deleteData(String id) throws Exception {
promotionParamsService.delete(Integer.valueOf(id));
promotionBulkService.updateIndex(Integer.valueOf(id));
}
@Override
... ... @@ -45,15 +40,10 @@ public class PromotionParamsMqListener extends AbstractMqListener {
if (promotionParams == null || promotionParams.getPromotionId() == null) {
return;
}
//更新ES
promotionParamsService.saveOrUpdate(promotionParams);
//更新DB
PromotionInfo promotionInfo = promotionInfoMapper.selectByPrimaryKey(promotionParams.getPromotionId());
if(promotionInfo==null){
return;
}
PromotionIndexBO promotionIndexBO = promotionIndexIndexBuilder.buildPromotionIndexBO(promotionInfo,promotionParams);
indexService.updateIndexData(ISearchConstants.INDEX_NAME_PROMOTIONINDEX, promotionInfo.getId().toString(), promotionIndexBO);
promotionParamsService.saveOrUpdate(promotionParams);
//更新ES
promotionBulkService.updateIndex(promotionParams.getPromotionId());
}
}
... ...