IndexController.java 10.3 KB
package com.yoho.search.consumer.restapi;

import com.alibaba.fastjson.JSON;
import com.yoho.error.event.SearchEvent;
import com.yoho.error.event.SearchLogsEvent;
import com.yoho.search.base.utils.EventReportEnum;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.consumer.index.common.IYohoIndexService;
import com.yoho.search.consumer.index.rebuild.RebuildFlagService;
import com.yoho.search.consumer.job.IndexRebuildJob;
import com.yoho.search.consumer.job.TplAdaptorJob;
import com.yoho.search.consumer.service.base.ProductIndexService;
import com.yoho.search.consumer.service.bo.ProductIndexBO;
import com.yoho.search.consumer.service.logic.ProductIndexLogicService;
import com.yoho.search.core.es.model.ESBluk;
import com.yoho.search.core.es.utils.IgnoreSomeException;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;

import javax.servlet.http.HttpServletRequest;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.stream.Collectors;

/**
 * 索引管理相关请求
 * 
 * @author YOHO
 *
 */
@Controller
public class IndexController implements ApplicationEventPublisherAware {

	private static final Logger logger = LoggerFactory.getLogger(IndexController.class);
	private static final Logger SEARCH_EVENT_LOG = LoggerFactory.getLogger("SEARCH_EVENT_LOG");

	private static final String DEFAULT_CHARSET = "UTF-8";

	@Autowired
	private IYohoIndexService yohoIndexService;
	@Autowired
	private ProductIndexService productIndexService;
	@Autowired
	private ProductIndexLogicService productIndexLogicService;
	@Autowired
	private AmqpTemplate amqpTemplate;
	@Autowired
	private RebuildFlagService rebuildFlagService;
	@Autowired
	private IndexRebuildJob indexRebuildJob;
	@Autowired
	private TplAdaptorJob tplAdaptorJob;

	private ApplicationEventPublisher publisher;

	@Override
	public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
		this.publisher = applicationEventPublisher;
	}

	@RequestMapping(value = "/testevent")
	@ResponseBody
	public Map<String, Object> testevent() {
		publisher.publishEvent(new SearchLogsEvent("MqListener", EventReportEnum.INDEXCONTROLLER_INDEX_CREATE.getMoudleName(), "monitor", "test"));
		// 记录上报的日志
		SEARCH_EVENT_LOG.info("report to influxDb,EventName is [{}] ,MoudleName is [{}]", "MqListener_TEST", "consumer");
		return getResultMap(200, "success");
	}

	@RequestMapping(value = "/index/create/{indexName}")
	@ResponseBody
	public Map<String, Object> create(@PathVariable String indexName, HttpServletRequest request) {
		try {
			yohoIndexService.createIndex(indexName, true);
		} catch (Exception e) {
			publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_CREATE.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_CREATE.getFunctionName(),
					EventReportEnum.INDEXCONTROLLER_INDEX_CREATE.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
			return getResultMap(400, "create " + indexName + " error: " + e.getMessage());
		}
		return getResultMap(200, "create " + indexName + " success");
	}

	@RequestMapping(value = "/index/exist/{indexName}")
	@ResponseBody
	public Map<String, Object> exist(@PathVariable String indexName, HttpServletRequest request) {
		try {
			Boolean bool = yohoIndexService.indexExists(indexName);
			if (bool) {
				return getResultMap(200, indexName + "exist ");
			}
			return getResultMap(200, indexName + " not exist ");
		} catch (Exception e) {
			publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_EXIST.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_EXIST.getFunctionName(),
					EventReportEnum.INDEXCONTROLLER_INDEX_EXIST.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
			return getResultMap(400, indexName + "not exist " + "error: " + e.getMessage());
		}
	}

	@RequestMapping(value = "/index/rebuild/{indexName}")
	@ResponseBody
	public Map<String, Object> rebuild(@PathVariable String indexName, HttpServletRequest request) {
		if (rebuildFlagService.isRebuilding()) {
			return getResultMap(400, "current has index rebuilding,please wait......");
		}
		try {
			boolean isExist = yohoIndexService.indexExists(indexName);
			// 如果不存在,则创建索引
			if (!isExist) {
				yohoIndexService.createIndex(indexName, true);
			}
			yohoIndexService.rebuild(indexName);
		} catch (Exception e) {
			publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getFunctionName(),
					EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
			return getResultMap(400, "rebuild " + indexName + " error: " + e.getMessage());
		}
		return getResultMap(200, "rebuild " + indexName + " success");
	}

	@RequestMapping(value = "/index/update/{indexName}")
	@ResponseBody
	public Map<String, Object> updateIndex(@PathVariable String indexName, @RequestParam String ids) {
		if (rebuildFlagService.isRebuilding()) {
			return getResultMap(400, "current has index rebuilding, please wait......");
		}

		if(!ISearchConstants.INDEX_NAME_PRODUCT_INDEX.equals(indexName))
		{
			// 暂只支持productindex索引,后续可以考虑支持其他或所有索引
			return getResultMap(400, "only productindex is supported");
		}

		if(ids == null || ids.trim().isEmpty())
		{
			return getResultMap(400, "the ids parameter is required");
		}

		try {
			boolean isExist = yohoIndexService.indexExists(indexName);
			// 如果不存在,则创建索引
			if (!isExist) {
				yohoIndexService.createIndex(indexName, true);
			}

			long begin = System.currentTimeMillis();
			List<ESBluk> bulkList = new ArrayList<ESBluk>();
			List<Integer> idList = Arrays.asList(ids.split(",")).stream().map(Integer::valueOf).collect(Collectors.toList());
			List<Integer> updateIdList = new ArrayList<>();
			List<ProductIndexBO> productIndexBOs = productIndexLogicService.getProductIndexs(idList);
			if(CollectionUtils.isNotEmpty(productIndexBOs))
			{
				for(ProductIndexBO item : productIndexBOs)
				{
					updateIdList.add(item.getProductId());
					bulkList.add(new ESBluk(JSON.toJSONString(item), String.valueOf(item.getProductId()), indexName, indexName, false));
				}
			}

			List<Integer> deleteIdList = idList.stream().filter(id -> !updateIdList.contains(id)).collect(Collectors.toList());
			if(CollectionUtils.isNotEmpty(deleteIdList))
			{
				for(Integer id : deleteIdList)
				{
					bulkList.add(new ESBluk(null, String.valueOf(id), indexName, indexName, true));
				}
			}

			logger.info("[func=updateIndex][indexName={}][updateIdList={}][deleteIdList={}][bulkList={}]", indexName, updateIdList, deleteIdList, bulkList);
			yohoIndexService.bulk(bulkList);
			logger.info("[func=updateIndex][indexName={}][cost={}]", indexName, (System.currentTimeMillis() - begin));
		} catch (Exception e) {
			publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getFunctionName(),
					EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
			return getResultMap(400, "update " + indexName + " error: " + e.getMessage());
		}

		return getResultMap(200, "update " + indexName + " success");
	}

	@RequestMapping(value = "/index/rebuildAll")
	@ResponseBody
	public Map<String, Object> rebuildAll() {
		if (rebuildFlagService.isRebuilding()) {
			return getResultMap(400, "current has index rebuilding,please wait......");
		}
		indexRebuildJob.rebuildTblProductIndex();
		indexRebuildJob.execute();
		return getResultMap(200, "rebuildAll success");
	}

	@RequestMapping(value = "/index/tplAdaptor")
	@ResponseBody
	public Map<String, Object> tplAdaptor() {
		if (rebuildFlagService.isRebuilding()) {
			return getResultMap(400, "current has index rebuilding,please wait......");
		}
		tplAdaptorJob.doTplAdaptor();
		return getResultMap(200, "tplAdaptor success");
	}

	@RequestMapping(method = RequestMethod.POST, value = "/index/update/{indexName}")
	@ResponseBody
	public Map<String, Object> update(@PathVariable String indexName, @RequestBody String json, HttpServletRequest request) {
		// 通过indexName得到消息地址,然后发消息
		// 使用这个接口需要了解消息的格式
		String channel = ISearchConstants.REDIS_CHANNEL_PRIFIX + indexName;
		try {
			amqpTemplate.convertAndSend(channel, createMessage(json));
		} catch (AmqpException e) {
			publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_UPDATE.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_UPDATE.getFunctionName(),
					EventReportEnum.INDEXCONTROLLER_INDEX_UPDATE.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
			logger.error("[func=sendMessage][step=execption][e={}]", e.getMessage());
		}
		logger.info("[func=sendMessage][step=success][indexName={}][msg={}]", indexName, json);
		return getResultMap(200, "update " + indexName + " success");
	}

	private Message createMessage(String json) {
		byte[] bytes = null;
		try {
			bytes = json.getBytes(DEFAULT_CHARSET);
		} catch (UnsupportedEncodingException e) {
			throw new MessageConversionException("Failed to convert Message content", e);
		}
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
		messageProperties.setContentEncoding(DEFAULT_CHARSET);
		if (bytes != null) {
			messageProperties.setContentLength(bytes.length);
		}
		return new Message(bytes, messageProperties);
	}

	private Map<String, Object> getResultMap(final int code, final String message) {
		Map<String, Object> rtnMap = new HashMap<String, Object>();
		rtnMap.put("code", code);
		rtnMap.put("msg", message);
		return rtnMap;
	}
}