CommonConsumerFactory.java
4.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package com.yoho.unions.message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import java.util.Map;
/**
* Consumer工厂 读取consumer的class,并且创建queue,绑定到exchange中。
* <p>
* Created by chang@yoho.cn on 2015/11/5.
*/
public class CommonConsumerFactory implements ApplicationContextAware {
private final Logger logger = LoggerFactory.getLogger(CommonConsumerFactory.class);
private final ConnectionFactory[] connectionFactorys;
private final RabbitAdmin[] admins;
private final SimpleMessageConverter simpleMessageConverter;
private final String context;
private final TopicExchange topicExchange;
@Value("${rabbit_concurrent:1}")
private int concurrentConsumers;
public CommonConsumerFactory(ConnectionFactory[] connectionFactorys, RabbitAdmin[] admins, SimpleMessageConverter simpleMessageConverter, String context) {
this.connectionFactorys = connectionFactorys;
this.admins = admins;
this.simpleMessageConverter = simpleMessageConverter;
this.context = context;
this.topicExchange = new TopicExchange("amq.topic");
}
/**
* 获取所有实现了 {@link CommonMessageConsumer}接口的bean
*
* @param applicationContext context
* @throws BeansException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, CommonMessageConsumer> consumerBeans = applicationContext.getBeansOfType(CommonMessageConsumer.class);
this.addMessageListener(consumerBeans);
logger.info("Loaded rabbit consumers {} success.", consumerBeans);
}
private void addMessageListener(Map<String, CommonMessageConsumer> consumerBeans) {
if (consumerBeans == null) {
logger.info("can not load any rabbit consumer...");
return;
}
try {
for (CommonMessageConsumer consumer : consumerBeans.values()) {
int size = connectionFactorys.length;
for (int i = 0; i < size; i++) {
try {
String queueName = this.declareAndGetQueueName(consumer, admins[i]);
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactorys[i]);
listenerContainer.setMessageConverter(this.simpleMessageConverter);
listenerContainer.setConcurrentConsumers(concurrentConsumers);
//linstener
MessageListenerAdapter adapter = new MessageListenerAdapter(consumer);
adapter.setMessageConverter(this.simpleMessageConverter);
listenerContainer.setMessageListener(adapter);
listenerContainer.setQueueNames(queueName);
listenerContainer.start();
logger.info("starting rabbitmq success.container {},connectionFactory {}", consumer, connectionFactorys[i]);
}catch (Exception e){
logger.info("starting rabbitmq fail.container {},connectionFactory {}", consumer, connectionFactorys[i]);
}
}
}
} catch (Exception e) {
logger.warn("starting rabbitmq container failed.", e);
}
}
private String declareAndGetQueueName(CommonMessageConsumer consumer, RabbitAdmin admin) {
//new queue
String queueName = "yoho:" + this.context + ":" + consumer.getMessageTopic();
Queue queue = new Queue(queueName);
admin.declareQueue(queue);
//binding
Binding binding = BindingBuilder.bind(queue).to(topicExchange).with(consumer.getMessageTopic());
admin.declareBinding(binding);
return queueName;
}
}