CommonConsumerFactory.java 4.62 KB
package com.yoho.unions.message;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

import java.util.Map;

/**
 * Consumer工厂 读取consumer的class,并且创建queue,绑定到exchange中。
 * <p>
 * Created by chang@yoho.cn on 2015/11/5.
 */
public class CommonConsumerFactory implements ApplicationContextAware {

    private final Logger logger = LoggerFactory.getLogger(CommonConsumerFactory.class);


    private final ConnectionFactory[] connectionFactorys;
    private final RabbitAdmin[] admins;
    private final SimpleMessageConverter simpleMessageConverter;
    private final String context;
    private final TopicExchange topicExchange;

    @Value("${rabbit_concurrent:1}")
    private int concurrentConsumers;

    public CommonConsumerFactory(ConnectionFactory[] connectionFactorys, RabbitAdmin[] admins, SimpleMessageConverter simpleMessageConverter, String context) {
        this.connectionFactorys = connectionFactorys;
        this.admins = admins;
        this.simpleMessageConverter = simpleMessageConverter;
        this.context = context;
        this.topicExchange = new TopicExchange("amq.topic");
    }


    /**
     * 获取所有实现了  {@link CommonMessageConsumer}接口的bean
     *
     * @param applicationContext context
     * @throws BeansException
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, CommonMessageConsumer> consumerBeans = applicationContext.getBeansOfType(CommonMessageConsumer.class);
        this.addMessageListener(consumerBeans);
        logger.info("Loaded rabbit consumers {} success.", consumerBeans);

    }

    private void addMessageListener(Map<String, CommonMessageConsumer> consumerBeans) {
        if (consumerBeans == null) {
            logger.info("can not load any rabbit consumer...");
            return;
        }

        try {
            for (CommonMessageConsumer consumer : consumerBeans.values()) {
                int size = connectionFactorys.length;
                for (int i = 0; i < size; i++) {
                    try {
                        String queueName = this.declareAndGetQueueName(consumer, admins[i]);
                        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
                        listenerContainer.setConnectionFactory(connectionFactorys[i]);
                        listenerContainer.setMessageConverter(this.simpleMessageConverter);
                        listenerContainer.setConcurrentConsumers(concurrentConsumers);
                        //linstener
                        MessageListenerAdapter adapter = new MessageListenerAdapter(consumer);
                        adapter.setMessageConverter(this.simpleMessageConverter);
                        listenerContainer.setMessageListener(adapter);
                        listenerContainer.setQueueNames(queueName);
                        listenerContainer.start();
                        logger.info("starting rabbitmq  success.container {},connectionFactory {}", consumer, connectionFactorys[i]);
                    }catch (Exception e){
                        logger.info("starting rabbitmq  fail.container {},connectionFactory {}", consumer, connectionFactorys[i]);
                    }
                }
            }
        } catch (Exception e) {
            logger.warn("starting rabbitmq container failed.", e);
        }
    }


    private String declareAndGetQueueName(CommonMessageConsumer consumer, RabbitAdmin admin) {
        //new queue
        String queueName = "yoho:" + this.context + ":" + consumer.getMessageTopic();
        Queue queue = new Queue(queueName);
        admin.declareQueue(queue);
        //binding
        Binding binding = BindingBuilder.bind(queue).to(topicExchange).with(consumer.getMessageTopic());
        admin.declareBinding(binding);
        return queueName;
    }


}