Authored by Gino Zhang

修改批量更新的方式

... ... @@ -17,7 +17,7 @@ public interface SuggestWordDefMapper {
void insertBatch(List<SuggestWordDef> list);
void updateBatch(List<SuggestWordDef> list);
void updateBatch(@Param(value = "suggestWordDefList") List<SuggestWordDef> list);
void deleteBatch(List<Integer> idList);
}
\ No newline at end of file
... ...
... ... @@ -18,9 +18,9 @@ public interface SuggestionKeywordsMapper {
int updateByPrimaryKey(SuggestionKeywords record);
void updateBatch(List<SuggestionKeywords> list);
void updateBatch(@Param(value = "suggestionKeywordList") List<SuggestionKeywords> list);
List<SuggestionKeywords> getPageLists(@Param(value="offset")Integer offset, @Param(value="pageSize")Integer pageSize);
List<SuggestionKeywords> getPageLists(@Param(value = "offset") Integer offset, @Param(value = "pageSize") Integer pageSize);
int count();
}
\ No newline at end of file
... ...
... ... @@ -25,17 +25,24 @@
</foreach>
</insert>
<update id="updateBatch" parameterType="java.util.List" timeout="20000">
insert into suggest_word_def (id, keyword, weight, count, type, status) values
<foreach collection="list" item="item" index="index" separator=",">
(#{item.id, jdbcType=INTEGER},
#{item.keyword, jdbcType=VARCHAR},
#{item.weight, jdbcType=INTEGER},
#{item.count, jdbcType=INTEGER},
#{item.type, jdbcType=INTEGER},
#{item.status, jdbcType=INTEGER})
<update id="updateBatch" parameterType="java.util.List">
update suggest_word_def
<trim prefix="set" suffixOverrides=",">
<trim prefix="count =case" suffix="end,">
<foreach collection="suggestWordDefList" item="item" index="index">
when id = #{item.id,jdbcType=INTEGER} then #{item.count,jdbcType=INTEGER}
</foreach>
</trim>
<trim prefix="weight =case" suffix="end,">
<foreach collection="suggestWordDefList" item="item" index="index">
when id = #{item.id,jdbcType=INTEGER} then #{item.weight,jdbcType=DECIMAL}
</foreach>
</trim>
</trim>
where
<foreach collection="suggestWordDefList" separator="or" item="item" index="index" >
id = #{item.id,jdbcType=INTEGER}
</foreach>
on duplicate key update
</update>
<delete id="deleteBatch" parameterType="java.util.List">
... ...
... ... @@ -71,14 +71,19 @@
where keyword = #{keyword,jdbcType=VARCHAR}
</update>
<update id="updateBatch" parameterType="java.util.List" timeout="20000">
insert into suggestion_keywords (keyword,weight,count) values
<foreach collection="list" item="item" index="index" separator=",">
(#{item.keyword, jdbcType=VARCHAR},
#{item.weight, jdbcType=INTEGER},
#{item.count, jdbcType=INTEGER})
<update id="updateBatch" parameterType="java.util.List">
update suggestion_keywords
<trim prefix="set" suffixOverrides=",">
<trim prefix="count =case" suffix="end,">
<foreach collection="suggestionKeywordList" item="item" index="index">
when keyword = #{item.keyword,jdbcType=VARCHAR} then #{item.count,jdbcType=INTEGER}
</foreach>
</trim>
</trim>
where
<foreach collection="suggestionKeywordList" separator="or" item="item" index="index" >
keyword = #{item.keyword,jdbcType=VARCHAR}
</foreach>
on duplicate key update
</update>
<select id="count" resultType="java.lang.Integer" timeout="20000">
... ...
... ... @@ -45,7 +45,7 @@ public class SuggestionDiscoveryJob implements ApplicationContextAware {
try {
long begin = System.currentTimeMillis();
logger.info("SuggestionDiscoveryJob execute start----[begin={}]", begin);
suggestionCache.cleanup();
suggestionCache.init();
boolean result = true;
for (RetryBusinessFlowExecutor executor : flowExecutorList) {
boolean tempResult = executor.execute();
... ... @@ -64,7 +64,7 @@ public class SuggestionDiscoveryJob implements ApplicationContextAware {
try {
long begin = System.currentTimeMillis();
logger.info("SuggestionDiscoveryJob executeFlow start----[begin={}]", begin);
suggestionCache.cleanup();
suggestionCache.init();
Optional<RetryBusinessFlowExecutor> executor = flowExecutorList.stream().filter(item -> flowName.equals(item.getFlowName())).findAny();
executor.ifPresent(RetryBusinessFlowExecutor::execute);
logger.info("SuggestionDiscoveryJob executeFlow end----[cost={}]", (System.currentTimeMillis() - begin));
... ...
package com.yoho.search.consumer.suggests.common;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.consumer.service.base.SuggestWordDefService;
import com.yoho.search.dal.model.SuggestWordDef;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
... ... @@ -12,15 +19,39 @@ import java.util.concurrent.ConcurrentHashMap;
@Component
public class SuggestionCache {
protected static final Logger logger = LoggerFactory.getLogger("FLOW_EXECUTOR");
@Autowired
private SuggestWordDefService suggestWordDefService;
private Map<String, String> suggestWordDefMap = new ConcurrentHashMap<>(10000);
public void cleanup()
{
public void init() {
long begin = System.currentTimeMillis();
logger.info("[SuggestionCache init start ...][begin={}]", begin);
cleanup();
int totalCount = suggestWordDefService.selectTotalCount();
if (totalCount == 0) {
return;
}
final int batchSize = ISearchConstants.SEARCH_INDEX_BATCH_LIMIT;
final int totalPageSize = (totalCount - 1) / batchSize + 1;
for (int pageNo = 1; pageNo <= totalPageSize; pageNo++) {
int start = (pageNo - 1) * batchSize;
List<SuggestWordDef> dataList = suggestWordDefService.selectPageList(start, batchSize);
if (dataList != null) {
dataList.forEach(suggestWordDef -> add(suggestWordDef.getKeyword()));
}
}
logger.info("[SuggestionCache init end ...][initSize={}][cost={}]", suggestWordDefMap.size(), System.currentTimeMillis() - begin);
}
public void cleanup() {
suggestWordDefMap.clear();
}
public boolean add(String keyword)
{
public boolean add(String keyword) {
String oldValue = suggestWordDefMap.putIfAbsent(generateIdentifier(keyword), keyword);
return oldValue == null;
}
... ...
... ... @@ -6,15 +6,12 @@ import com.yoho.search.consumer.service.base.SuggestWordDefService;
import com.yoho.search.consumer.suggests.common.KeywordType;
import com.yoho.search.consumer.suggests.common.RetryBusinessFlow;
import com.yoho.search.consumer.suggests.common.SuggestionCache;
import com.yoho.search.consumer.suggests.common.SuggestionConstants;
import com.yoho.search.dal.model.SuggestWordDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
... ... @@ -48,38 +45,38 @@ public abstract class AbstractSuggestionDiscoverer implements RetryBusinessFlow
rebuildFlagService.updateIsBuildingTrue();
// 删除suggest_word_def表的原始数据
Integer type = getKeywordType().getType();
int countByType = suggestWordDefService.selectCountByWordType(type);
if (countByType == 0) {
return;
}
final int limit = SuggestionConstants.SUGGESTION_DISCOVER_BATCH_LIMIT;
final int totalPageSize = (countByType - 1) / limit + 1;
logger.info("[{} init][countByType={}][totalPageSize={}]", flowName(), countByType, totalPageSize);
List<Integer> toDeletedIdList = new ArrayList<>();
for (int pageNo = 1; pageNo <= totalPageSize; pageNo++) {
final int start = (pageNo - 1) * limit;
List<SuggestWordDef> wordDefList = suggestWordDefService.selectByKeywordType(type, start, limit);
Map<Boolean, List<SuggestWordDef>> partitionList = wordDefList.stream().collect(Collectors.partitioningBy(word -> SuggestionConstants.VALID_STATUS.equals(word.getStatus())));
logger.info("[{} init][pageNo={}][validCount={}][invalidCount={}]", flowName(), pageNo, partitionList.get(Boolean.TRUE).size(), partitionList.get(Boolean.FALSE).size());
partitionList.get(Boolean.FALSE).stream().map(SuggestWordDef::getKeyword).forEach((keyword) -> suggestionCache.add(keyword));
toDeletedIdList.addAll(partitionList.get(Boolean.TRUE).stream().map(SuggestWordDef::getId).collect(Collectors.toList()));
}
int fromIndex = 0;
while (fromIndex < toDeletedIdList.size()) {
int endIndex = fromIndex + limit;
if (endIndex > toDeletedIdList.size()) {
endIndex = toDeletedIdList.size();
}
List<Integer> subIdList = toDeletedIdList.subList(fromIndex, endIndex);
logger.info("[{} init][deleteBatchSize={}]", flowName(), subIdList.size());
logger.trace("[{} init][deleteBatch={}]", flowName(), subIdList);
suggestWordDefService.deleteBatch(subIdList);
fromIndex += limit;
}
// Integer type = getKeywordType().getType();
// int countByType = suggestWordDefService.selectCountByWordType(type);
// if (countByType == 0) {
// return;
// }
//
// final int limit = SuggestionConstants.SUGGESTION_DISCOVER_BATCH_LIMIT;
// final int totalPageSize = (countByType - 1) / limit + 1;
// logger.info("[{} init][countByType={}][totalPageSize={}]", flowName(), countByType, totalPageSize);
// List<Integer> toDeletedIdList = new ArrayList<>();
// for (int pageNo = 1; pageNo <= totalPageSize; pageNo++) {
// final int start = (pageNo - 1) * limit;
// List<SuggestWordDef> wordDefList = suggestWordDefService.selectByKeywordType(type, start, limit);
// Map<Boolean, List<SuggestWordDef>> partitionList = wordDefList.stream().collect(Collectors.partitioningBy(word -> SuggestionConstants.VALID_STATUS.equals(word.getStatus())));
// logger.info("[{} init][pageNo={}][validCount={}][invalidCount={}]", flowName(), pageNo, partitionList.get(Boolean.TRUE).size(), partitionList.get(Boolean.FALSE).size());
// partitionList.get(Boolean.FALSE).stream().map(SuggestWordDef::getKeyword).forEach((keyword) -> suggestionCache.add(keyword));
// toDeletedIdList.addAll(partitionList.get(Boolean.TRUE).stream().map(SuggestWordDef::getId).collect(Collectors.toList()));
// }
//
// int fromIndex = 0;
// while (fromIndex < toDeletedIdList.size()) {
// int endIndex = fromIndex + limit;
// if (endIndex > toDeletedIdList.size()) {
// endIndex = toDeletedIdList.size();
// }
//
// List<Integer> subIdList = toDeletedIdList.subList(fromIndex, endIndex);
// logger.info("[{} init][deleteBatchSize={}]", flowName(), subIdList.size());
// logger.trace("[{} init][deleteBatch={}]", flowName(), subIdList);
// suggestWordDefService.deleteBatch(subIdList);
// fromIndex += limit;
// }
}
@Override
... ...