Rabbitmq.php
1.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
<?php
/**
* User: wuxiao
* Date: 2016-5-20
*/
class Rabbitmq
{
static $conn;
static $channel;
public static function handle($type, array $data,array $tags){
global $Config;
$e_name = $Config->rabbitmq_exchange_name.$type; //交换机名
$k_route = $Config->rabbitmq_route_key; //路由key
$channel = self::$channel;
if (!is_object($channel) || !$channel->isConnected()){
//创建连接
$conn = self::$conn;
if (!is_object($conn) || !$conn->isConnected()){
//配置信息
$conn_args = array(
'host' => $Config->rabbitmq_host,
'port' => $Config->rabbitmq_port,
'login' => $Config->rabbitmq_user,
'password' => $Config->rabbitmq_pass,
'vhost' => $Config->rabbitmq_vhost,
);
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
echo ("Cannot connect to the broker!\n");
return;
}
self::$conn = $conn;
}
//创建channel
self::$channel = $channel = new AMQPChannel($conn);
}
//创建交换机对象
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex->setFlags(AMQP_DURABLE); //持久化
$ex->declareExchange();
//消息内容
$message = json_encode(array(
'type'=>$type,
'data'=>$data,
'tags'=>$tags,
));
//发送消息
return $ex->publish($message, $k_route);
//$conn->disconnect();
}
}