MessageProducerTemplate.java 4.37 KB
package com.yoho.message.sdk.common.mq;

import java.io.UnsupportedEncodingException;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import javax.annotation.PostConstruct;

import org.apache.commons.codec.Charsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;

import com.alibaba.fastjson.JSON;

/**
 * 发送消息到Message broker中 当前是发送到topic中 Created by chang@yoho.cn on 2015/11/4.
 */
public class MessageProducerTemplate {

	private final Logger logger = LoggerFactory.getLogger(MessageProducerTemplate.class);

	private final Logger asynclogger = LoggerFactory.getLogger("asyncproducer");

	private final int DEFAULT_QUEUE_SIZE = 8000;

	private final LinkedBlockingQueue<Entry> asyncSendQueues = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);

	private boolean asyncEnable = false;// 针对单个bean的开关,可以在配置文件里面注入,默认true

	private AmqpTemplate amqpTemplate;

//	private final String TOPIC_EXCHAGE = "amq.topic";

	private final String FEDERATION_TOPIC_EXCHANGE = "yoho.federation.topic";



	@PostConstruct
	void init() {
		new Thread("MessageProducerTemplate-thread") {
			@Override
			public void run() {
				try {
					Entry e;
					while ((e = asyncSendQueues.take()) != null) {
						try {
							amqpTemplate.send(e.topicExchange, e.topic, e.amqpMsg);
						} catch (Exception exception) {
							asynclogger.error("mq send exception,{}", exception);
						}
					}
				} catch (Exception e) {
					asynclogger.error("YhProducerTemplate-thread error ,{}", e);
				}
			}
		}.start();
	}

	/**
	 * 延迟发送消息
	 *
	 * @param topic
	 *            消息的主题
	 * @param object
	 *            消息对象,框架会自动转化为JSON
	 * @param attributes
	 *            消息属性
	 * @param delayInMinutes
	 *            延迟多少分钟,必须和consumers配置的保持一致
	 */
	public void send(String topic, Object object, Map<String, Object> attributes, int delayInMinutes) {
		// yoho_delay.2m.topic
		String sent_topic = "yoho_delay." + delayInMinutes + "m." + topic;
		this.send(sent_topic, object, attributes);
	}

	/**
	 * 发送消息
	 *
	 * @param topic
	 *            消息主题
	 * @param object
	 *            消息体,java对象
	 */
	public void send(String topic, Object object) {
		this.send(topic, object, null);
	}

	/**
	 * 发送消息
	 *
	 * @param topic
	 *            消息主题
	 * @param object
	 *            消息体 会转化为JSON,然后发送
	 * @param attributes
	 *            消息属性
	 */
	public void send(String topic, Object object, Map<String, Object> attributes) {
		// 消息的属性
		MessageProperties properties = new MessageProperties();
		properties.setContentType("text");
		if (attributes != null) {
			for (Map.Entry<String, Object> entry : attributes.entrySet()) {
				properties.setHeader(entry.getKey(), entry.getValue());
			}
		}
		// 消息体
		byte[] body = JSON.toJSONString(object).getBytes(Charsets.toCharset("UTF-8"));
		Message amqpMsg = new Message(body, properties);
		if (asyncEnable) {
			asycnSend(FEDERATION_TOPIC_EXCHANGE, topic, amqpMsg);
		} else {
			this.amqpTemplate.send(FEDERATION_TOPIC_EXCHANGE, topic, amqpMsg);
		}
		logger.debug("send mq message success. exchange:{}, topic:{}, message:{}", FEDERATION_TOPIC_EXCHANGE, topic, object);
	}

	private void asycnSend(String topicExchange, String topic, Message amqpMsg) {
		if (!asyncSendQueues.offer(new Entry(topicExchange, topic, amqpMsg))) {
			try {
				asynclogger.error("mq queue is full ,and topicExchange={} ,topic={},amqpMsg={}", topicExchange, topic, new String(amqpMsg.getBody(), "UTF-8"));
			} catch (UnsupportedEncodingException e) {
				asynclogger.error(e.getMessage());
			}
		}
	}

	private class Entry {
		private String topicExchange;
		private String topic;
		private Message amqpMsg;

		public Entry(String topicExchange, String topic, Message amqpMsg) {
			this.amqpMsg = amqpMsg;
			this.topic = topic;
			this.topicExchange = topicExchange;
		}
	}
	
	public boolean isAsyncEnable() {
		return asyncEnable;
	}

	public void setAsyncEnable(boolean asyncEnable) {
		this.asyncEnable = asyncEnable;
	}

	// spring setting, do not call
	public void setAmqpTemplate(AmqpTemplate amqpTemplate) {
		this.amqpTemplate = amqpTemplate;
	}
}