MqClient.php.example 1.83 KB
<?php

error_reporting(E_ALL);
ini_set('display_errors',true);

require_once __DIR__.'/Config.php';
require_once __DIR__.'/InfluxdbLog.php';

/* * ***********************************
 * PHP amqp(RabbitMQ) Demo - consumer
 * Author: Linvo
 * Date: 2012/7/30
 * *********************************** */
//配置信息 
$conn_args = array(
    'host' => Config::rabbitmq_host,
    'port' => Config::rabbitmq_port,
    'login' => Config::rabbitmq_user,
    'password' => Config::rabbitmq_pass,
    'vhost' => Config::rabbitmq_vhost,
);

//交换机名 
$e_name = array(
    'yohoExchange_error',
    'yohoExchange_shutdown',
);
$q_name = 'phplog'; //队列名 
$k_route = 'phplog'; //路由key 

//创建连接和channel 
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

//创建队列    
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化  
$q->declareQueue();

foreach ($e_name as $e_name){
    //绑定交换机与队列,并指定路由键 
    $q->bind($e_name, $k_route);
}

//阻塞模式接收消息 
while (True) {
    try{
        $q->consume('processMessage');
        //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答  
    }catch(\Exception $e){
        echo $e->getMessage(),"\n";
    }
}
$conn->disconnect();

/**
 * 消费回调函数
 * 处理消息
 */

function processMessage($envelope, $queue) {
    static $n = 0;
    $msg = $envelope->getBody();
    if (!$msg){
        return;
    }
    echo "Message:\n";
    echo ++$n,' ',$msg . "\n"; //处理消息 
    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 
    $args = unserialize($msg);
    if (!is_array($args)){
        return;
    }
    //var_dump($args);
    return InfluxdbLog::handle($args['type'],$args['data'],$args['tags']);
}