Blame view

index/src/main/java/com/yoho/search/consumer/restapi/IndexController.java 17.4 KB
hugufei authored
1
package com.yoho.search.consumer.restapi;
gemingdan authored
2
3 4 5 6 7
import com.alibaba.fastjson.JSON;
import com.yoho.error.event.SearchEvent;
import com.yoho.error.event.SearchLogsEvent;
import com.yoho.search.base.utils.EventReportEnum;
import com.yoho.search.base.utils.ISearchConstants;
8
import com.yoho.search.base.utils.MD5Util;
9 10
import com.yoho.search.consumer.index.common.IYohoIndexService;
import com.yoho.search.consumer.index.rebuild.RebuildFlagService;
Gino Zhang authored
11
import com.yoho.search.consumer.job.CleanExpiredProductVectorFeatureJob;
12
import com.yoho.search.consumer.job.IndexRebuildJob;
13
import com.yoho.search.consumer.job.SuggestionCounterJob;
Gino Zhang authored
14
import com.yoho.search.consumer.job.SuggestionDiscoveryJob;
15
import com.yoho.search.consumer.service.base.ProductIndexService;
16
import com.yoho.search.consumer.service.base.SuggestWordDefService;
17 18
import com.yoho.search.consumer.service.bo.ProductIndexBO;
import com.yoho.search.consumer.service.logic.ProductIndexLogicService;
19 20
import com.yoho.search.consumer.suggests.common.KeywordType;
import com.yoho.search.consumer.suggests.counter.KeywordCounterService;
21 22
import com.yoho.search.core.es.model.ESBluk;
import com.yoho.search.core.es.utils.IgnoreSomeException;
23
import com.yoho.search.dal.model.SuggestWordDef;
Gino Zhang authored
24 25 26 27 28 29 30 31 32 33 34 35
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Controller;
36
import org.springframework.util.Assert;
Gino Zhang authored
37 38 39 40 41 42
import org.springframework.web.bind.annotation.*;

import javax.servlet.http.HttpServletRequest;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.stream.Collectors;
43 44 45 46

/**
 * 索引管理相关请求
 *
Gino Zhang authored
47
 * @author YOHO
48 49
 */
@Controller
50
public class IndexController implements ApplicationEventPublisherAware {
51
Gino Zhang authored
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
    private static final Logger logger = LoggerFactory.getLogger(IndexController.class);
    private static final Logger SEARCH_EVENT_LOG = LoggerFactory.getLogger("SEARCH_EVENT_LOG");

    private static final String DEFAULT_CHARSET = "UTF-8";

    @Autowired
    private IYohoIndexService yohoIndexService;
    @Autowired
    private ProductIndexService productIndexService;
    @Autowired
    private ProductIndexLogicService productIndexLogicService;
    @Autowired
    private AmqpTemplate amqpTemplate;
    @Autowired
    private RebuildFlagService rebuildFlagService;
    @Autowired
    private IndexRebuildJob indexRebuildJob;
    @Autowired
    private SuggestionCounterJob suggestionCounterJob;
    @Autowired
    private SuggestionDiscoveryJob suggestionDiscoveryJob;
    @Autowired
Gino Zhang authored
74
    private CleanExpiredProductVectorFeatureJob cleanExpiredProductVectorFeatureJob;
75 76 77 78
    @Autowired
    private SuggestWordDefService suggestWordDefService;
    @Autowired
    private KeywordCounterService keywordCounterService;
Gino Zhang authored
79 80 81 82 83 84 85

    private ApplicationEventPublisher publisher;

    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.publisher = applicationEventPublisher;
    }
Gino Zhang authored
86
Gino Zhang authored
87 88 89 90 91 92 93 94
    @RequestMapping(value = "/testevent")
    @ResponseBody
    public Map<String, Object> testevent() {
        publisher.publishEvent(new SearchLogsEvent("MqListener", EventReportEnum.INDEXCONTROLLER_INDEX_CREATE.getMoudleName(), "monitor", "test"));
        // 记录上报的日志
        SEARCH_EVENT_LOG.info("report to influxDb,EventName is [{}] ,MoudleName is [{}]", "MqListener_TEST", "consumer");
        return getResultMap(200, "success");
    }
Gino Zhang authored
95
Gino Zhang authored
96 97 98 99 100 101 102 103 104 105 106 107
    @RequestMapping(value = "/index/create/{indexName}")
    @ResponseBody
    public Map<String, Object> create(@PathVariable String indexName, HttpServletRequest request) {
        try {
            yohoIndexService.createIndex(indexName, true);
        } catch (Exception e) {
            publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_CREATE.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_CREATE.getFunctionName(),
                    EventReportEnum.INDEXCONTROLLER_INDEX_CREATE.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
            return getResultMap(400, "create " + indexName + " error: " + e.getMessage());
        }
        return getResultMap(200, "create " + indexName + " success");
    }
Gino Zhang authored
108
Gino Zhang authored
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
    @RequestMapping(value = "/index/exist/{indexName}")
    @ResponseBody
    public Map<String, Object> exist(@PathVariable String indexName, HttpServletRequest request) {
        try {
            Boolean bool = yohoIndexService.indexExists(indexName);
            if (bool) {
                return getResultMap(200, indexName + "exist ");
            }
            return getResultMap(200, indexName + " not exist ");
        } catch (Exception e) {
            publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_EXIST.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_EXIST.getFunctionName(),
                    EventReportEnum.INDEXCONTROLLER_INDEX_EXIST.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
            return getResultMap(400, indexName + "not exist " + "error: " + e.getMessage());
        }
    }
124
Gino Zhang authored
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
    @RequestMapping(value = "/index/rebuild/{indexName}")
    @ResponseBody
    public Map<String, Object> rebuild(@PathVariable String indexName, HttpServletRequest request) {
        if (rebuildFlagService.isRebuilding()) {
            return getResultMap(400, "current has index rebuilding,please wait......");
        }
        try {
            boolean isExist = yohoIndexService.indexExists(indexName);
            // 如果不存在,则创建索引
            if (!isExist) {
                yohoIndexService.createIndex(indexName, true);
            }
            yohoIndexService.rebuild(indexName);
        } catch (Exception e) {
            publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getFunctionName(),
                    EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
            return getResultMap(400, "rebuild " + indexName + " error: " + e.getMessage());
        }
        return getResultMap(200, "rebuild " + indexName + " success");
    }
145
Gino Zhang authored
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
    @RequestMapping(value = "/index/refresh/{indexName}")
    @ResponseBody
    public Map<String, Object> updateIndex(@PathVariable String indexName, @RequestParam String ids) {
        if (rebuildFlagService.isRebuilding()) {
            return getResultMap(400, "current has index rebuilding, please wait......");
        }
        if (!ISearchConstants.INDEX_NAME_PRODUCT_INDEX.equals(indexName)) {
            // 暂只支持productindex索引,后续可以考虑支持其他或所有索引
            return getResultMap(400, "only productindex is supported");
        }
        if (ids == null || ids.trim().isEmpty()) {
            return getResultMap(400, "the ids parameter is required");
        }
        try {
            boolean isExist = yohoIndexService.indexExists(indexName);
            // 如果不存在,则创建索引
            if (!isExist) {
                yohoIndexService.createIndex(indexName, true);
            }
            long begin = System.currentTimeMillis();
            List<ESBluk> bulkList = new ArrayList<ESBluk>();
            List<Integer> idList = Arrays.asList(ids.split(",")).stream().map(Integer::valueOf).collect(Collectors.toList());
            List<Integer> updateIdList = new ArrayList<>();
wangnan authored
169
            List<ProductIndexBO> productIndexBOs = productIndexLogicService.getProductIndexBOs(idList);
Gino Zhang authored
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
            if (CollectionUtils.isNotEmpty(productIndexBOs)) {
                for (ProductIndexBO item : productIndexBOs) {
                    updateIdList.add(item.getProductId());
                    String jsonStr = JSON.toJSONString(productIndexService.beanToMap(item));
                    bulkList.add(new ESBluk(jsonStr, String.valueOf(item.getProductId()), indexName, indexName, false));
                }
            }
            List<Integer> deleteIdList = idList.stream().filter(id -> !updateIdList.contains(id)).collect(Collectors.toList());
            if (CollectionUtils.isNotEmpty(deleteIdList)) {
                for (Integer id : deleteIdList) {
                    bulkList.add(new ESBluk(null, String.valueOf(id), indexName, indexName, true));
                }
            }
            logger.info("[func=updateIndex][indexName={}][updateIdList={}][deleteIdList={}][bulkList={}]", indexName, updateIdList, deleteIdList, bulkList);
            yohoIndexService.bulk(bulkList);
            logger.info("[func=updateIndex][indexName={}][cost={}]", indexName, (System.currentTimeMillis() - begin));
        } catch (Exception e) {
            publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getFunctionName(),
                    EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
            return getResultMap(400, "update " + indexName + " error: " + e.getMessage());
        }
        return getResultMap(200, "update " + indexName + " success");
    }
193
Gino Zhang authored
194 195 196 197 198 199
    @RequestMapping(value = "/index/rebuildAll")
    @ResponseBody
    public Map<String, Object> rebuildAll() {
        if (rebuildFlagService.isRebuilding()) {
            return getResultMap(400, "current has index rebuilding,please wait......");
        }
胡古飞 authored
200
        indexRebuildJob.rebuildTblProductIndex();
Gino Zhang authored
201 202 203
        indexRebuildJob.execute();
        return getResultMap(200, "rebuildAll success");
    }
gemingdan authored
204
Gino Zhang authored
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
    @RequestMapping(method = RequestMethod.POST, value = "/index/update/{indexName}")
    @ResponseBody
    public Map<String, Object> update(@PathVariable String indexName, @RequestBody String json, HttpServletRequest request) {
        // 通过indexName得到消息地址,然后发消息
        // 使用这个接口需要了解消息的格式
        String channel = ISearchConstants.REDIS_CHANNEL_PRIFIX + indexName;
        try {
            amqpTemplate.convertAndSend(channel, createMessage(json));
        } catch (AmqpException e) {
            publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_UPDATE.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_UPDATE.getFunctionName(),
                    EventReportEnum.INDEXCONTROLLER_INDEX_UPDATE.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
            logger.error("[func=sendMessage][step=execption][e={}]", e.getMessage());
        }
        logger.info("[func=sendMessage][step=success][indexName={}][msg={}]", indexName, json);
        return getResultMap(200, "update " + indexName + " success");
    }
221
Gino Zhang authored
222 223 224 225 226 227 228
    @RequestMapping(value = "/index/suggestion/flow")
    @ResponseBody
    public Map<String, Object> runSuggestionFlow() {
        try {
            if (rebuildFlagService.isRebuilding()) {
                return getResultMap(400, "current has index rebuilding, please wait......");
            }
gemingdan authored
229
Gino Zhang authored
230 231 232
            suggestionDiscoveryJob.execute();
            suggestionCounterJob.execute();
            indexRebuildJob.rebuildSuggestIndex();
233
Gino Zhang authored
234 235 236 237 238 239 240 241 242
            return getResultMap(200, "success");
        } catch (Exception e) {
            logger.error("[func=suggestionCounter][step=execption][e={}]", e.getMessage());
            Map<String, Object> rtnMap = new HashMap<String, Object>();
            rtnMap.put("code", 400);
            rtnMap.put("msg", e.getMessage());
            return rtnMap;
        }
    }
243
Gino Zhang authored
244 245
    @RequestMapping(value = "/index/suggestionCounter")
    @ResponseBody
Gino Zhang authored
246
    public Map<String, Object> suggestionCounter(@RequestParam String flowName) {
Gino Zhang authored
247 248 249 250
        try {
            if (rebuildFlagService.isRebuilding()) {
                return getResultMap(400, "current has index rebuilding, please wait......");
            }
Gino Zhang authored
251
            if ("all".equalsIgnoreCase(flowName)) {
Gino Zhang authored
252 253 254
                suggestionCounterJob.execute();
                return getResultMap(200, "success");
            }
Gino Zhang authored
255 256

            suggestionCounterJob.executeFlow(flowName);
Gino Zhang authored
257 258 259 260 261 262 263 264 265
            return getResultMap(200, "success");
        } catch (Exception e) {
            logger.error("[func=suggestionCounter][step=execption][e={}]", e.getMessage());
            Map<String, Object> rtnMap = new HashMap<String, Object>();
            rtnMap.put("code", 400);
            rtnMap.put("msg", e.getMessage());
            return rtnMap;
        }
    }
hugufei authored
266
Gino Zhang authored
267 268
    @RequestMapping(value = "/index/suggestionDiscovery")
    @ResponseBody
Gino Zhang authored
269
    public Map<String, Object> suggestionDiscovery(@RequestParam String flowName) {
Gino Zhang authored
270 271 272 273
        try {
            if (rebuildFlagService.isRebuilding()) {
                return getResultMap(400, "current has index rebuilding, please wait......");
            }
274
Gino Zhang authored
275
            if ("all".equalsIgnoreCase(flowName)) {
Gino Zhang authored
276 277
                suggestionDiscoveryJob.execute();
            } else {
Gino Zhang authored
278
                suggestionDiscoveryJob.executeFlow(flowName);
Gino Zhang authored
279
            }
280
Gino Zhang authored
281 282 283 284 285 286 287 288 289
            return getResultMap(200, "success");
        } catch (Exception e) {
            logger.error("[func=suggestionCounter][step=execption][e={}]", e.getMessage());
            Map<String, Object> rtnMap = new HashMap<String, Object>();
            rtnMap.put("code", 400);
            rtnMap.put("msg", e.getMessage());
            return rtnMap;
        }
    }
290
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
    @RequestMapping(value = "/index/suggestWordMgr")
    @ResponseBody
    public Map<String, Object> suggestWordMgr(@RequestParam String keyword, @RequestParam(defaultValue = "false") boolean isDelete) {
        Assert.notNull(keyword);
        try {
            SuggestWordDef suggestWordDef = suggestWordDefService.selectByKeyword(keyword);
            if (isDelete) {
                if (suggestWordDef == null) {
                    throw new RuntimeException("The keyword is not found!");
                }

                if (!Integer.valueOf(1).equals(suggestWordDef.getStatus())) {
                    throw new RuntimeException("The keyword has been deleted already!");
                }

                suggestWordDefService.updateStatusByPrimaryKey(suggestWordDef.getId(), 0);
                yohoIndexService.deleteIndexData(ISearchConstants.INDEX_NAME_SUGGEST, MD5Util.string2MD5(keyword.trim().toLowerCase()));
            } else {
                if (suggestWordDef != null) {
                    throw new RuntimeException("The keyword has been added!");
                }

                suggestWordDef = keywordCounterService.countKeyword(keyword);
                suggestWordDef.setStatus(1);
                suggestWordDef.setType(KeywordType.Customized.getType());
                suggestWordDef.setWeight(KeywordType.Customized.getWeightValue());
                suggestWordDefService.insertBatch(Arrays.asList(suggestWordDef));
319 320 321 322 323 324 325
                Map<String, Object> dataMap = new HashMap();
                dataMap.put("keyword", keyword);
                dataMap.put("type", suggestWordDef.getType());
                dataMap.put("weight", suggestWordDef.getWeight());
                dataMap.put("count", suggestWordDef.getCount());
                dataMap.put("countForApp", suggestWordDef.getCountForApp());
                dataMap.put("countForBlk", suggestWordDef.getCountForBlk());
326
327
                yohoIndexService.addIndexData(ISearchConstants.INDEX_NAME_SUGGEST, MD5Util.string2MD5(keyword.trim().toLowerCase()), dataMap);
Gino Zhang authored
328
            }
329 330 331

            return getResultMap(200, "success");
        } catch (Exception e) {
Gino Zhang authored
332
            logger.error(e.getMessage(), e);
333 334 335 336 337 338 339
            Map<String, Object> rtnMap = new HashMap<String, Object>();
            rtnMap.put("code", 400);
            rtnMap.put("msg", e.getMessage());
            return rtnMap;
        }
    }
Gino Zhang authored
340 341 342 343 344 345 346 347 348 349
    @RequestMapping(value = "/index/cleanExpiredProductVectorFeature")
    @ResponseBody
    public Map<String, Object> cleanExpiredProductVectorFeature() {
        try {
            cleanExpiredProductVectorFeatureJob.execute();
            return getResultMap(200, "success");
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            Map<String, Object> rtnMap = new HashMap<String, Object>();
            rtnMap.put("code", 400);
Gino Zhang authored
350 351 352 353
            rtnMap.put("msg", e.getMessage());
            return rtnMap;
        }
    }
354
Gino Zhang authored
355 356 357 358 359 360 361 362 363 364 365 366 367 368 369
    private Message createMessage(String json) {
        byte[] bytes = null;
        try {
            bytes = json.getBytes(DEFAULT_CHARSET);
        } catch (UnsupportedEncodingException e) {
            throw new MessageConversionException("Failed to convert Message content", e);
        }
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setContentEncoding(DEFAULT_CHARSET);
        if (bytes != null) {
            messageProperties.setContentLength(bytes.length);
        }
        return new Message(bytes, messageProperties);
    }
370
Gino Zhang authored
371 372 373 374 375 376
    private Map<String, Object> getResultMap(final int code, final String message) {
        Map<String, Object> rtnMap = new HashMap<String, Object>();
        rtnMap.put("code", code);
        rtnMap.put("msg", message);
        return rtnMap;
    }
377
}