package com.yoho.search.consumer.restapi; import com.alibaba.fastjson.JSON; import com.yoho.error.event.SearchEvent; import com.yoho.error.event.SearchLogsEvent; import com.yoho.search.base.utils.EventReportEnum; import com.yoho.search.base.utils.ISearchConstants; import com.yoho.search.base.utils.MD5Util; import com.yoho.search.consumer.index.common.IYohoIndexService; import com.yoho.search.consumer.index.rebuild.RebuildFlagService; import com.yoho.search.consumer.job.CleanExpiredProductVectorFeatureJob; import com.yoho.search.consumer.job.IndexRebuildJob; import com.yoho.search.consumer.job.SuggestionCounterJob; import com.yoho.search.consumer.job.SuggestionDiscoveryJob; import com.yoho.search.consumer.service.base.ProductIndexService; import com.yoho.search.consumer.service.base.SuggestWordDefService; import com.yoho.search.consumer.service.bo.ProductIndexBO; import com.yoho.search.consumer.service.logic.ProductIndexLogicService; import com.yoho.search.consumer.suggests.common.KeywordType; import com.yoho.search.consumer.suggests.counter.KeywordCounterService; import com.yoho.search.core.es.model.ESBluk; import com.yoho.search.core.es.utils.IgnoreSomeException; import com.yoho.search.dal.model.SuggestWordDef; import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.stereotype.Controller; import org.springframework.util.Assert; import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; import java.io.UnsupportedEncodingException; import java.util.*; import java.util.stream.Collectors; /** * 索引管理相关请求 * * @author YOHO */ @Controller public class IndexController implements ApplicationEventPublisherAware { private static final Logger logger = LoggerFactory.getLogger(IndexController.class); private static final Logger SEARCH_EVENT_LOG = LoggerFactory.getLogger("SEARCH_EVENT_LOG"); private static final String DEFAULT_CHARSET = "UTF-8"; @Autowired private IYohoIndexService yohoIndexService; @Autowired private ProductIndexService productIndexService; @Autowired private ProductIndexLogicService productIndexLogicService; @Autowired private AmqpTemplate amqpTemplate; @Autowired private RebuildFlagService rebuildFlagService; @Autowired private IndexRebuildJob indexRebuildJob; @Autowired private SuggestionCounterJob suggestionCounterJob; @Autowired private SuggestionDiscoveryJob suggestionDiscoveryJob; @Autowired private CleanExpiredProductVectorFeatureJob cleanExpiredProductVectorFeatureJob; @Autowired private SuggestWordDefService suggestWordDefService; @Autowired private KeywordCounterService keywordCounterService; private ApplicationEventPublisher publisher; @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.publisher = applicationEventPublisher; } @RequestMapping(value = "/testevent") @ResponseBody public Map<String, Object> testevent() { publisher.publishEvent(new SearchLogsEvent("MqListener", EventReportEnum.INDEXCONTROLLER_INDEX_CREATE.getMoudleName(), "monitor", "test")); // 记录上报的日志 SEARCH_EVENT_LOG.info("report to influxDb,EventName is [{}] ,MoudleName is [{}]", "MqListener_TEST", "consumer"); return getResultMap(200, "success"); } @RequestMapping(value = "/index/create/{indexName}") @ResponseBody public Map<String, Object> create(@PathVariable String indexName, HttpServletRequest request) { try { yohoIndexService.createIndex(indexName, true); } catch (Exception e) { publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_CREATE.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_CREATE.getFunctionName(), EventReportEnum.INDEXCONTROLLER_INDEX_CREATE.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null)); return getResultMap(400, "create " + indexName + " error: " + e.getMessage()); } return getResultMap(200, "create " + indexName + " success"); } @RequestMapping(value = "/index/exist/{indexName}") @ResponseBody public Map<String, Object> exist(@PathVariable String indexName, HttpServletRequest request) { try { Boolean bool = yohoIndexService.indexExists(indexName); if (bool) { return getResultMap(200, indexName + "exist "); } return getResultMap(200, indexName + " not exist "); } catch (Exception e) { publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_EXIST.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_EXIST.getFunctionName(), EventReportEnum.INDEXCONTROLLER_INDEX_EXIST.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null)); return getResultMap(400, indexName + "not exist " + "error: " + e.getMessage()); } } @RequestMapping(value = "/index/rebuild/{indexName}") @ResponseBody public Map<String, Object> rebuild(@PathVariable String indexName, HttpServletRequest request) { if (rebuildFlagService.isRebuilding()) { return getResultMap(400, "current has index rebuilding,please wait......"); } try { boolean isExist = yohoIndexService.indexExists(indexName); // 如果不存在,则创建索引 if (!isExist) { yohoIndexService.createIndex(indexName, true); } yohoIndexService.rebuild(indexName); } catch (Exception e) { publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getFunctionName(), EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null)); return getResultMap(400, "rebuild " + indexName + " error: " + e.getMessage()); } return getResultMap(200, "rebuild " + indexName + " success"); } @RequestMapping(value = "/index/refresh/{indexName}") @ResponseBody public Map<String, Object> updateIndex(@PathVariable String indexName, @RequestParam String ids) { if (rebuildFlagService.isRebuilding()) { return getResultMap(400, "current has index rebuilding, please wait......"); } if (!ISearchConstants.INDEX_NAME_PRODUCT_INDEX.equals(indexName)) { // 暂只支持productindex索引,后续可以考虑支持其他或所有索引 return getResultMap(400, "only productindex is supported"); } if (ids == null || ids.trim().isEmpty()) { return getResultMap(400, "the ids parameter is required"); } try { boolean isExist = yohoIndexService.indexExists(indexName); // 如果不存在,则创建索引 if (!isExist) { yohoIndexService.createIndex(indexName, true); } long begin = System.currentTimeMillis(); List<ESBluk> bulkList = new ArrayList<ESBluk>(); List<Integer> idList = Arrays.asList(ids.split(",")).stream().map(Integer::valueOf).collect(Collectors.toList()); List<Integer> updateIdList = new ArrayList<>(); List<ProductIndexBO> productIndexBOs = productIndexLogicService.getProductIndexBOs(idList); if (CollectionUtils.isNotEmpty(productIndexBOs)) { for (ProductIndexBO item : productIndexBOs) { updateIdList.add(item.getProductId()); String jsonStr = JSON.toJSONString(productIndexService.beanToMap(item)); bulkList.add(new ESBluk(jsonStr, String.valueOf(item.getProductId()), indexName, indexName, false)); } } List<Integer> deleteIdList = idList.stream().filter(id -> !updateIdList.contains(id)).collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(deleteIdList)) { for (Integer id : deleteIdList) { bulkList.add(new ESBluk(null, String.valueOf(id), indexName, indexName, true)); } } logger.info("[func=updateIndex][indexName={}][updateIdList={}][deleteIdList={}][bulkList={}]", indexName, updateIdList, deleteIdList, bulkList); yohoIndexService.bulk(bulkList); logger.info("[func=updateIndex][indexName={}][cost={}]", indexName, (System.currentTimeMillis() - begin)); } catch (Exception e) { publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getFunctionName(), EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null)); return getResultMap(400, "update " + indexName + " error: " + e.getMessage()); } return getResultMap(200, "update " + indexName + " success"); } @RequestMapping(value = "/index/rebuildAll") @ResponseBody public Map<String, Object> rebuildAll() { if (rebuildFlagService.isRebuilding()) { return getResultMap(400, "current has index rebuilding,please wait......"); } indexRebuildJob.rebuildTblProductIndex(); indexRebuildJob.execute(); return getResultMap(200, "rebuildAll success"); } @RequestMapping(method = RequestMethod.POST, value = "/index/update/{indexName}") @ResponseBody public Map<String, Object> update(@PathVariable String indexName, @RequestBody String json, HttpServletRequest request) { // 通过indexName得到消息地址,然后发消息 // 使用这个接口需要了解消息的格式 String channel = ISearchConstants.REDIS_CHANNEL_PRIFIX + indexName; try { amqpTemplate.convertAndSend(channel, createMessage(json)); } catch (AmqpException e) { publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_UPDATE.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_UPDATE.getFunctionName(), EventReportEnum.INDEXCONTROLLER_INDEX_UPDATE.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null)); logger.error("[func=sendMessage][step=execption][e={}]", e.getMessage()); } logger.info("[func=sendMessage][step=success][indexName={}][msg={}]", indexName, json); return getResultMap(200, "update " + indexName + " success"); } @RequestMapping(value = "/index/suggestion/flow") @ResponseBody public Map<String, Object> runSuggestionFlow() { try { if (rebuildFlagService.isRebuilding()) { return getResultMap(400, "current has index rebuilding, please wait......"); } suggestionDiscoveryJob.execute(); suggestionCounterJob.execute(); indexRebuildJob.rebuildSuggestIndex(); return getResultMap(200, "success"); } catch (Exception e) { logger.error("[func=suggestionCounter][step=execption][e={}]", e.getMessage()); Map<String, Object> rtnMap = new HashMap<String, Object>(); rtnMap.put("code", 400); rtnMap.put("msg", e.getMessage()); return rtnMap; } } @RequestMapping(value = "/index/suggestionCounter") @ResponseBody public Map<String, Object> suggestionCounter(@RequestParam String flowName) { try { if (rebuildFlagService.isRebuilding()) { return getResultMap(400, "current has index rebuilding, please wait......"); } if ("all".equalsIgnoreCase(flowName)) { suggestionCounterJob.execute(); return getResultMap(200, "success"); } suggestionCounterJob.executeFlow(flowName); return getResultMap(200, "success"); } catch (Exception e) { logger.error("[func=suggestionCounter][step=execption][e={}]", e.getMessage()); Map<String, Object> rtnMap = new HashMap<String, Object>(); rtnMap.put("code", 400); rtnMap.put("msg", e.getMessage()); return rtnMap; } } @RequestMapping(value = "/index/suggestionDiscovery") @ResponseBody public Map<String, Object> suggestionDiscovery(@RequestParam String flowName) { try { if (rebuildFlagService.isRebuilding()) { return getResultMap(400, "current has index rebuilding, please wait......"); } if ("all".equalsIgnoreCase(flowName)) { suggestionDiscoveryJob.execute(); } else { suggestionDiscoveryJob.executeFlow(flowName); } return getResultMap(200, "success"); } catch (Exception e) { logger.error("[func=suggestionCounter][step=execption][e={}]", e.getMessage()); Map<String, Object> rtnMap = new HashMap<String, Object>(); rtnMap.put("code", 400); rtnMap.put("msg", e.getMessage()); return rtnMap; } } @RequestMapping(value = "/index/suggestWordMgr") @ResponseBody public Map<String, Object> suggestWordMgr(@RequestParam String keyword, @RequestParam(defaultValue = "false") boolean isDelete) { Assert.notNull(keyword); try { SuggestWordDef suggestWordDef = suggestWordDefService.selectByKeyword(keyword); if (isDelete) { if (suggestWordDef == null) { throw new RuntimeException("The keyword is not found!"); } if (!Integer.valueOf(1).equals(suggestWordDef.getStatus())) { throw new RuntimeException("The keyword has been deleted already!"); } suggestWordDefService.updateStatusByPrimaryKey(suggestWordDef.getId(), 0); yohoIndexService.deleteIndexData(ISearchConstants.INDEX_NAME_SUGGEST, MD5Util.string2MD5(keyword.trim().toLowerCase())); } else { if (suggestWordDef != null) { throw new RuntimeException("The keyword has been added!"); } suggestWordDef = keywordCounterService.countKeyword(keyword); suggestWordDef.setStatus(1); suggestWordDef.setType(KeywordType.Customized.getType()); suggestWordDef.setWeight(KeywordType.Customized.getWeightValue()); suggestWordDefService.insertBatch(Arrays.asList(suggestWordDef)); Map<String, Object> dataMap = new HashMap(); dataMap.put("keyword", keyword); dataMap.put("type", suggestWordDef.getType()); dataMap.put("weight", suggestWordDef.getWeight()); dataMap.put("count", suggestWordDef.getCount()); dataMap.put("countForApp", suggestWordDef.getCountForApp()); dataMap.put("countForBlk", suggestWordDef.getCountForBlk()); yohoIndexService.addIndexData(ISearchConstants.INDEX_NAME_SUGGEST, MD5Util.string2MD5(keyword.trim().toLowerCase()), dataMap); } return getResultMap(200, "success"); } catch (Exception e) { logger.error(e.getMessage(), e); Map<String, Object> rtnMap = new HashMap<String, Object>(); rtnMap.put("code", 400); rtnMap.put("msg", e.getMessage()); return rtnMap; } } @RequestMapping(value = "/index/cleanExpiredProductVectorFeature") @ResponseBody public Map<String, Object> cleanExpiredProductVectorFeature() { try { cleanExpiredProductVectorFeatureJob.execute(); return getResultMap(200, "success"); } catch (Exception e) { logger.error(e.getMessage(), e); Map<String, Object> rtnMap = new HashMap<String, Object>(); rtnMap.put("code", 400); rtnMap.put("msg", e.getMessage()); return rtnMap; } } private Message createMessage(String json) { byte[] bytes = null; try { bytes = json.getBytes(DEFAULT_CHARSET); } catch (UnsupportedEncodingException e) { throw new MessageConversionException("Failed to convert Message content", e); } MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); messageProperties.setContentEncoding(DEFAULT_CHARSET); if (bytes != null) { messageProperties.setContentLength(bytes.length); } return new Message(bytes, messageProperties); } private Map<String, Object> getResultMap(final int code, final String message) { Map<String, Object> rtnMap = new HashMap<String, Object>(); rtnMap.put("code", code); rtnMap.put("msg", message); return rtnMap; } }