IndexController.java 7.76 KB
package com.yoho.search.consumer.restapi;

import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;

import javax.servlet.http.HttpServletRequest;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

import com.yoho.error.event.SearchEvent;
import com.yoho.error.event.SearchLogsEvent;
import com.yoho.search.base.utils.EventReportEnum;
import com.yoho.search.base.utils.ISearchConstans;
import com.yoho.search.consumer.index.common.IYohoIndexService;
import com.yoho.search.consumer.index.rebuild.RebuildFlagService;
import com.yoho.search.consumer.job.IndexRebuildJob;
import com.yoho.search.consumer.job.TplAdaptorJob;
import com.yoho.search.consumer.service.base.ProductIndexService;
import com.yoho.search.core.es.utils.IgnoreSomeException;

/**
 * 索引管理相关请求
 * 
 * @author YOHO
 *
 */
@Controller
public class IndexController implements ApplicationEventPublisherAware {

	private static final Logger logger = LoggerFactory.getLogger(IndexController.class);
	private static final Logger SEARCH_EVENT_LOG = LoggerFactory.getLogger("SEARCH_EVENT_LOG");

	private static final String DEFAULT_CHARSET = "UTF-8";

	@Autowired
	private IYohoIndexService yohoIndexService;
	@Autowired
	private ProductIndexService productIndexService;
	@Autowired
	private AmqpTemplate amqpTemplate;
	@Autowired
	private RebuildFlagService rebuildFlagService;
	@Autowired
	private IndexRebuildJob indexRebuildJob;
	@Autowired
	private TplAdaptorJob tplAdaptorJob;

	private ApplicationEventPublisher publisher;

	@Override
	public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
		this.publisher = applicationEventPublisher;
	}

	@RequestMapping(value = "/testevent")
	@ResponseBody
	public Map<String, Object> testevent() {
		publisher.publishEvent(new SearchLogsEvent("MqListener", EventReportEnum.INDEXCONTROLLER_INDEX_CREATE.getMoudleName(), "monitor", "test"));
		// 记录上报的日志
		SEARCH_EVENT_LOG.info("report to influxDb,EventName is [{}] ,MoudleName is [{}]", "MqListener_TEST", "consumer");
		return getResultMap(200, "success");
	}

	@RequestMapping(value = "/index/create/{indexName}")
	@ResponseBody
	public Map<String, Object> create(@PathVariable String indexName, HttpServletRequest request) {
		try {
			yohoIndexService.createIndex(indexName, true);
		} catch (Exception e) {
			publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_CREATE.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_CREATE.getFunctionName(),
					EventReportEnum.INDEXCONTROLLER_INDEX_CREATE.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
			return getResultMap(400, "create " + indexName + " error: " + e.getMessage());
		}
		return getResultMap(200, "create " + indexName + " success");
	}

	@RequestMapping(value = "/index/exist/{indexName}")
	@ResponseBody
	public Map<String, Object> exist(@PathVariable String indexName, HttpServletRequest request) {
		try {
			Boolean bool = yohoIndexService.indexExists(indexName);
			if (bool) {
				return getResultMap(200, indexName + "exist ");
			}
			return getResultMap(200, indexName + " not exist ");
		} catch (Exception e) {
			publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_EXIST.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_EXIST.getFunctionName(),
					EventReportEnum.INDEXCONTROLLER_INDEX_EXIST.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
			return getResultMap(400, indexName + "not exist " + "error: " + e.getMessage());
		}
	}

	@RequestMapping(value = "/index/rebuild/{indexName}")
	@ResponseBody
	public Map<String, Object> rebuild(@PathVariable String indexName, HttpServletRequest request) {
		if (rebuildFlagService.isRebuilding()) {
			return getResultMap(400, "current has index rebuilding,please wait......");
		}
		try {
			boolean isExist = yohoIndexService.indexExists(indexName);
			// 如果不存在,则创建索引
			if (!isExist) {
				yohoIndexService.createIndex(indexName, true);
			}
			yohoIndexService.rebuild(indexName);
		} catch (Exception e) {
			publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getFunctionName(),
					EventReportEnum.INDEXCONTROLLER_INDEX_REBUILD.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
			return getResultMap(400, "rebuild " + indexName + " error: " + e.getMessage());
		}
		return getResultMap(200, "rebuild " + indexName + " success");
	}

	@RequestMapping(value = "/index/rebuildAll")
	@ResponseBody
	public Map<String, Object> rebuildAll() {
		if (rebuildFlagService.isRebuilding()) {
			return getResultMap(400, "current has index rebuilding,please wait......");
		}
		indexRebuildJob.rebuildTblProductIndex();
		indexRebuildJob.execute();
		return getResultMap(200, "rebuildAll success");
	}

	@RequestMapping(value = "/index/tplAdaptor")
	@ResponseBody
	public Map<String, Object> tplAdaptor() {
		if (rebuildFlagService.isRebuilding()) {
			return getResultMap(400, "current has index rebuilding,please wait......");
		}
		tplAdaptorJob.doTplAdaptor();
		return getResultMap(200, "tplAdaptor success");
	}

	@RequestMapping(method = RequestMethod.POST, value = "/index/update/{indexName}")
	@ResponseBody
	public Map<String, Object> update(@PathVariable String indexName, @RequestBody String json, HttpServletRequest request) {
		// 通过indexName得到消息地址,然后发消息
		// 使用这个接口需要了解消息的格式
		String channel = ISearchConstans.REDIS_CHANNEL_PRIFIX + indexName;
		try {
			amqpTemplate.convertAndSend(channel, createMessage(json));
		} catch (AmqpException e) {
			publisher.publishEvent(new SearchEvent(EventReportEnum.INDEXCONTROLLER_INDEX_UPDATE.getEventName(), EventReportEnum.INDEXCONTROLLER_INDEX_UPDATE.getFunctionName(),
					EventReportEnum.INDEXCONTROLLER_INDEX_UPDATE.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
			logger.error("[func=sendMessage][step=execption][e={}]", e.getMessage());
		}
		logger.info("[func=sendMessage][step=success][indexName={}][msg={}]", indexName, json);
		return getResultMap(200, "update " + indexName + " success");
	}

	private Message createMessage(String json) {
		byte[] bytes = null;
		try {
			bytes = json.getBytes(DEFAULT_CHARSET);
		} catch (UnsupportedEncodingException e) {
			throw new MessageConversionException("Failed to convert Message content", e);
		}
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
		messageProperties.setContentEncoding(DEFAULT_CHARSET);
		if (bytes != null) {
			messageProperties.setContentLength(bytes.length);
		}
		return new Message(bytes, messageProperties);
	}

	private Map<String, Object> getResultMap(final int code, final String message) {
		Map<String, Object> rtnMap = new HashMap<String, Object>();
		rtnMap.put("code", code);
		rtnMap.put("msg", message);
		return rtnMap;
	}
}