MqClient.php.example
1.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
<?php
error_reporting(E_ALL);
ini_set('display_errors',true);
require_once __DIR__.'/Config.php';
require_once __DIR__.'/InfluxdbLog.php';
/* * ***********************************
* PHP amqp(RabbitMQ) Demo - consumer
* Author: Linvo
* Date: 2012/7/30
* *********************************** */
//配置信息
$conn_args = array(
'host' => Config::rabbitmq_host,
'port' => Config::rabbitmq_port,
'login' => Config::rabbitmq_user,
'password' => Config::rabbitmq_pass,
'vhost' => Config::rabbitmq_vhost,
);
//交换机名
$e_name = array(
'yohoExchange_error',
'yohoExchange_shutdown',
);
$q_name = 'phplog'; //队列名
$k_route = 'phplog'; //路由key
//创建连接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);
//创建队列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
$q->declareQueue();
foreach ($e_name as $e_name){
//绑定交换机与队列,并指定路由键
$q->bind($e_name, $k_route);
}
//阻塞模式接收消息
while (True) {
try{
$q->consume('processMessage');
//$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
}catch(\Exception $e){
echo $e->getMessage(),"\n";
}
}
$conn->disconnect();
/**
* 消费回调函数
* 处理消息
*/
function processMessage($envelope, $queue) {
static $n = 0;
$msg = $envelope->getBody();
if (!$msg){
return;
}
echo "Message:\n";
echo ++$n,' ',$msg . "\n"; //处理消息
$queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
$args = unserialize($msg);
if (!is_array($args)){
return;
}
//var_dump($args);
return InfluxdbLog::handle($args['type'],$args['data'],$args['tags']);
}