IndexController.java 15.5 KB
package com.yoho.search.consumer.restapi;

import com.alibaba.fastjson.JSON;
import com.yoho.error.event.SearchEvent;
import com.yoho.error.event.SearchLogsEvent;
import com.yoho.search.base.utils.EventReportEnum;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.consumer.index.common.IYohoIndexService;
import com.yoho.search.consumer.index.increment.rule.DynamicRuleGenerator;
import com.yoho.search.consumer.index.rebuild.RebuildFlagService;
import com.yoho.search.consumer.job.CleanExpiredProductVectorFeatureJob;
import com.yoho.search.consumer.job.IndexRebuildJob;
import com.yoho.search.consumer.job.SuggestionCounterJob;
import com.yoho.search.consumer.job.SuggestionDiscoveryJob;
import com.yoho.search.consumer.service.base.ProductIndexService;
import com.yoho.search.consumer.service.bo.ProductIndexBO;
import com.yoho.search.consumer.service.logic.ProductIndexLogicService;
import com.yoho.search.consumer.suggests.counter.KeywordCounterService;
import com.yoho.search.core.es.model.ESBluk;
import com.yoho.search.core.es.utils.IgnoreSomeException;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;

import javax.servlet.http.HttpServletRequest;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.stream.Collectors;

/**
 * 索引管理相关请求
 *
 * @author YOHO
 */
@Controller
public class IndexController implements ApplicationEventPublisherAware {

    private static final Logger logger = LoggerFactory.getLogger(IndexController.class);
    private static final Logger SEARCH_EVENT_LOG = LoggerFactory.getLogger("SEARCH_EVENT_LOG");

    private static final String DEFAULT_CHARSET = "UTF-8";

    @Autowired
    private IYohoIndexService yohoIndexService;
    @Autowired
    private ProductIndexService productIndexService;
    @Autowired
    private ProductIndexLogicService productIndexLogicService;
    @Autowired
    private AmqpTemplate amqpTemplate;
    @Autowired
    private RebuildFlagService rebuildFlagService;
    @Autowired
    private IndexRebuildJob indexRebuildJob;
    @Autowired
    private SuggestionCounterJob suggestionCounterJob;
    @Autowired
    private SuggestionDiscoveryJob suggestionDiscoveryJob;
    @Autowired
    private KeywordCounterService keywordCounterService;
    @Autowired
    private DynamicRuleGenerator dynamicRuleGenerator;
    @Autowired
    private CleanExpiredProductVectorFeatureJob cleanExpiredProductVectorFeatureJob;

    private ApplicationEventPublisher publisher;

    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.publisher = applicationEventPublisher;
    }

    @RequestMapping(value = "/testevent")
    @ResponseBody
    public Map<String, Object> testevent() {
        publisher.publishEvent(new SearchLogsEvent("MqListener", EventReportEnum.INDEXCONTROLLER_INDEX_CREATE.getMoudleName(), "monitor", "test"));
        // 记录上报的日志
        SEARCH_EVENT_LOG.info("report to influxDb,EventName is [{}] ,MoudleName is [{}]", "MqListener_TEST", "consumer");
        return getResultMap(200, "success");
    }

    @RequestMapping(value = "/index/create/{indexName}")
    @ResponseBody
    public Map<String, Object> create(@PathVariable String indexName, HttpServletRequest request) {
        try {
            yohoIndexService.createIndex(indexName, true);
        } catch (Exception e) {
            publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_CREATE.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_CREATE.getFunctionName(),
                    EventReportEnum.INDEXCONTROLLER_INDEX_CREATE.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
            return getResultMap(400, "create " + indexName + " error: " + e.getMessage());
        }
        return getResultMap(200, "create " + indexName + " success");
    }

    @RequestMapping(value = "/index/exist/{indexName}")
    @ResponseBody
    public Map<String, Object> exist(@PathVariable String indexName, HttpServletRequest request) {
        try {
            Boolean bool = yohoIndexService.indexExists(indexName);
            if (bool) {
                return getResultMap(200, indexName + "exist ");
            }
            return getResultMap(200, indexName + " not exist ");
        } catch (Exception e) {
            publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_EXIST.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_EXIST.getFunctionName(),
                    EventReportEnum.INDEXCONTROLLER_INDEX_EXIST.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
            return getResultMap(400, indexName + "not exist " + "error: " + e.getMessage());
        }
    }

    @RequestMapping(value = "/index/rebuild/{indexName}")
    @ResponseBody
    public Map<String, Object> rebuild(@PathVariable String indexName, HttpServletRequest request) {
        if (rebuildFlagService.isRebuilding()) {
            return getResultMap(400, "current has index rebuilding,please wait......");
        }
        try {
            boolean isExist = yohoIndexService.indexExists(indexName);
            // 如果不存在,则创建索引
            if (!isExist) {
                yohoIndexService.createIndex(indexName, true);
            }
            yohoIndexService.rebuild(indexName);
        } catch (Exception e) {
            publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getFunctionName(),
                    EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
            return getResultMap(400, "rebuild " + indexName + " error: " + e.getMessage());
        }
        return getResultMap(200, "rebuild " + indexName + " success");
    }

    @RequestMapping(value = "/index/refresh/{indexName}")
    @ResponseBody
    public Map<String, Object> updateIndex(@PathVariable String indexName, @RequestParam String ids) {
        if (rebuildFlagService.isRebuilding()) {
            return getResultMap(400, "current has index rebuilding, please wait......");
        }
        if (!ISearchConstants.INDEX_NAME_PRODUCT_INDEX.equals(indexName)) {
            // 暂只支持productindex索引,后续可以考虑支持其他或所有索引
            return getResultMap(400, "only productindex is supported");
        }
        if (ids == null || ids.trim().isEmpty()) {
            return getResultMap(400, "the ids parameter is required");
        }
        try {
            boolean isExist = yohoIndexService.indexExists(indexName);
            // 如果不存在,则创建索引
            if (!isExist) {
                yohoIndexService.createIndex(indexName, true);
            }
            long begin = System.currentTimeMillis();
            List<ESBluk> bulkList = new ArrayList<ESBluk>();
            List<Integer> idList = Arrays.asList(ids.split(",")).stream().map(Integer::valueOf).collect(Collectors.toList());
            List<Integer> updateIdList = new ArrayList<>();
            List<ProductIndexBO> productIndexBOs = productIndexLogicService.getProductIndexBOs(idList);
            if (CollectionUtils.isNotEmpty(productIndexBOs)) {
                for (ProductIndexBO item : productIndexBOs) {
                    updateIdList.add(item.getProductId());
                    String jsonStr = JSON.toJSONString(productIndexService.beanToMap(item));
                    bulkList.add(new ESBluk(jsonStr, String.valueOf(item.getProductId()), indexName, indexName, false));
                }
            }
            List<Integer> deleteIdList = idList.stream().filter(id -> !updateIdList.contains(id)).collect(Collectors.toList());
            if (CollectionUtils.isNotEmpty(deleteIdList)) {
                for (Integer id : deleteIdList) {
                    bulkList.add(new ESBluk(null, String.valueOf(id), indexName, indexName, true));
                }
            }
            logger.info("[func=updateIndex][indexName={}][updateIdList={}][deleteIdList={}][bulkList={}]", indexName, updateIdList, deleteIdList, bulkList);
            yohoIndexService.bulk(bulkList);
            logger.info("[func=updateIndex][indexName={}][cost={}]", indexName, (System.currentTimeMillis() - begin));
        } catch (Exception e) {
            publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getFunctionName(),
                    EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
            return getResultMap(400, "update " + indexName + " error: " + e.getMessage());
        }
        return getResultMap(200, "update " + indexName + " success");
    }

    @RequestMapping(value = "/index/rebuildAll")
    @ResponseBody
    public Map<String, Object> rebuildAll() {
        if (rebuildFlagService.isRebuilding()) {
            return getResultMap(400, "current has index rebuilding,please wait......");
        }
        indexRebuildJob.rebuildTblProductIndex();
        indexRebuildJob.execute();
        suggestionDiscoveryJob.execute();
        suggestionCounterJob.execute();
        indexRebuildJob.rebuildSuggestIndex();
        return getResultMap(200, "rebuildAll success");
    }

    @RequestMapping(method = RequestMethod.POST, value = "/index/update/{indexName}")
    @ResponseBody
    public Map<String, Object> update(@PathVariable String indexName, @RequestBody String json, HttpServletRequest request) {
        // 通过indexName得到消息地址,然后发消息
        // 使用这个接口需要了解消息的格式
        String channel = ISearchConstants.REDIS_CHANNEL_PRIFIX + indexName;
        try {
            amqpTemplate.convertAndSend(channel, createMessage(json));
        } catch (AmqpException e) {
            publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_UPDATE.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_UPDATE.getFunctionName(),
                    EventReportEnum.INDEXCONTROLLER_INDEX_UPDATE.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
            logger.error("[func=sendMessage][step=execption][e={}]", e.getMessage());
        }
        logger.info("[func=sendMessage][step=success][indexName={}][msg={}]", indexName, json);
        return getResultMap(200, "update " + indexName + " success");
    }

    @RequestMapping(value = "/index/suggestion/flow")
    @ResponseBody
    public Map<String, Object> runSuggestionFlow() {
        try {
            if (rebuildFlagService.isRebuilding()) {
                return getResultMap(400, "current has index rebuilding, please wait......");
            }

            suggestionDiscoveryJob.execute();
            suggestionCounterJob.execute();
            indexRebuildJob.rebuildSuggestIndex();

            return getResultMap(200, "success");
        } catch (Exception e) {
            logger.error("[func=suggestionCounter][step=execption][e={}]", e.getMessage());
            Map<String, Object> rtnMap = new HashMap<String, Object>();
            rtnMap.put("code", 400);
            rtnMap.put("msg", e.getMessage());
            return rtnMap;
        }
    }

    @RequestMapping(value = "/index/suggestionCounter")
    @ResponseBody
    public Map<String, Object> suggestionCounter(@RequestParam String flowName) {
        try {
            if (rebuildFlagService.isRebuilding()) {
                return getResultMap(400, "current has index rebuilding, please wait......");
            }
            if ("all".equalsIgnoreCase(flowName)) {
                suggestionCounterJob.execute();
                return getResultMap(200, "success");
            }

            suggestionCounterJob.executeFlow(flowName);
            return getResultMap(200, "success");
        } catch (Exception e) {
            logger.error("[func=suggestionCounter][step=execption][e={}]", e.getMessage());
            Map<String, Object> rtnMap = new HashMap<String, Object>();
            rtnMap.put("code", 400);
            rtnMap.put("msg", e.getMessage());
            return rtnMap;
        }
    }

    @RequestMapping(value = "/index/suggestionDiscovery")
    @ResponseBody
    public Map<String, Object> suggestionDiscovery(@RequestParam String flowName) {
        try {
            if (rebuildFlagService.isRebuilding()) {
                return getResultMap(400, "current has index rebuilding, please wait......");
            }

            if ("all".equalsIgnoreCase(flowName)) {
                suggestionDiscoveryJob.execute();
            } else {
                suggestionDiscoveryJob.executeFlow(flowName);
            }

            return getResultMap(200, "success");
        } catch (Exception e) {
            logger.error("[func=suggestionCounter][step=execption][e={}]", e.getMessage());
            Map<String, Object> rtnMap = new HashMap<String, Object>();
            rtnMap.put("code", 400);
            rtnMap.put("msg", e.getMessage());
            return rtnMap;
        }
    }

    @RequestMapping(value = "/index/generateRule")
    @ResponseBody
    public Map<String, Object> generateRule() {
        try {
            dynamicRuleGenerator.generate();
            return getResultMap(200, "success");
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            Map<String, Object> rtnMap = new HashMap<String, Object>();
            rtnMap.put("code", 400);
            rtnMap.put("msg", e.getMessage());
            return rtnMap;
        }
    }

    @RequestMapping(value = "/index/cleanExpiredProductVectorFeature")
    @ResponseBody
    public Map<String, Object> cleanExpiredProductVectorFeature() {
        try {
            cleanExpiredProductVectorFeatureJob.execute();
            return getResultMap(200, "success");
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            Map<String, Object> rtnMap = new HashMap<String, Object>();
            rtnMap.put("code", 400);
            rtnMap.put("msg", e.getMessage());
            return rtnMap;
        }
    }

    private Message createMessage(String json) {
        byte[] bytes = null;
        try {
            bytes = json.getBytes(DEFAULT_CHARSET);
        } catch (UnsupportedEncodingException e) {
            throw new MessageConversionException("Failed to convert Message content", e);
        }
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setContentEncoding(DEFAULT_CHARSET);
        if (bytes != null) {
            messageProperties.setContentLength(bytes.length);
        }
        return new Message(bytes, messageProperties);
    }

    private Map<String, Object> getResultMap(final int code, final String message) {
        Map<String, Object> rtnMap = new HashMap<String, Object>();
        rtnMap.put("code", code);
        rtnMap.put("msg", message);
        return rtnMap;
    }
}