Rabbitmq.php 1.8 KB
<?php
/**
 * User: wuxiao
 * Date: 2016-5-20
 */

class Rabbitmq
{
    
    static $conn;
    
    static $channel;

    public static function handle($type, array $data,array $tags){
        global $Config;
        $e_name = $Config->rabbitmq_exchange_name.$type; //交换机名 
        $k_route = $Config->rabbitmq_route_key; //路由key 
        
        $channel = self::$channel;
        if (!is_object($channel) || !$channel->isConnected()){
            //创建连接
            $conn = self::$conn;
            if (!is_object($conn) || !$conn->isConnected()){
                //配置信息
                $conn_args = array(
                    'host' => $Config->rabbitmq_host,
                    'port' => $Config->rabbitmq_port,
                    'login' => $Config->rabbitmq_user,
                    'password' => $Config->rabbitmq_pass,
                    'vhost' => $Config->rabbitmq_vhost,
                );
                $conn = new AMQPConnection($conn_args);
                if (!$conn->connect()) {
                    echo ("Cannot connect to the broker!\n");
                    return;
                }
                self::$conn = $conn;
            }
            
            //创建channel 
            self::$channel = $channel = new AMQPChannel($conn);
        }

        //创建交换机对象    
        $ex = new AMQPExchange($channel);
        $ex->setName($e_name);
        $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型  
        $ex->setFlags(AMQP_DURABLE); //持久化 
        $ex->declareExchange();        

        //消息内容 
        $message = json_encode(array(
            'type'=>$type,
            'data'=>$data,
            'tags'=>$tags,
        ));
        
        //发送消息 
        return $ex->publish($message, $k_route);

        //$conn->disconnect();
    }
}