Relay.php 13.1 KB
<?php

/**
 * Created by PhpStorm.
 * User: Ziy
 * Date: 14/9/5
 * Time: 上午6:50
 */
class YHMIm_Relay
{
    private $conn;

    private $exchange;

    private $queueName;

    private $channel;

    private $queue;

    private $userRouteName;

    private $mongo = null;

    private $messageCollection = 'Message';

    private $talkCollection = 'Talk';

    public function connect()
    {
        if (empty($this->conn)) {
            $amqp = new Q_Message_Amqp_Connection();
            $this->conn = $amqp->connect();
        }
    }

    /**
     * 路由
     * @param $exchangeName
     * @return $this
     */
    public function exchange($exchangeName)
    {
        $this->connect();
        $this->channel = new AMQPChannel($this->conn);
        $this->channel->setPrefetchCount(10);
        $this->exchange = new AMQPExchange($this->channel);
        $this->exchange->setFlags(AMQP_DURABLE);
        $this->exchange->setName($exchangeName);
        return $this;
    }

    public function setType($type = AMQP_EX_TYPE_DIRECT)
    {
        //direct类型
        $this->exchange->setType($type);
        return $this;
    }

    public function setFlags($flags = AMQP_DURABLE)
    {
        $this->exchange->setFlags($flags); //持久化
        return $this;
    }

    public function declareExchange()
    {
        $this->exchange->declareExchange();
        return $this;
    }

    public function publish(array $message, $keyName)
    {
        $this->exchange->publish(json_encode($message), $keyName);
    }

    public function disconnect()
    {
        $this->conn->disconnect();
    }

    /**
     * 设置队列名
     * @param $queueName
     * @return $this
     */
    public function setQueueName($queueName)
    {
        $this->queueName = $queueName;
        return $this;
    }


    public function queue($queueName, $flags = AMQP_DURABLE)
    {
        $this->queue = new AMQPQueue($this->channel);
        $this->queue->setName($queueName);
        $this->queue->setFlags($flags);
        return $this->queue;
    }

    /**
     * Message Total
     * @return mixed
     */
    public function declareQueue()
    {
        return $this->queue->declareQueue();
    }

    /**
     * 绑定交换机与队列,并指定路由键
     * @param $exchangeName
     * @param $routeName
     * @return mixed
     */
    public function bind($exchangeName, $routeName)
    {
        return $this->queue->bind($exchangeName, $routeName);
    }

    /**
     * 处理数据
     */
    public function run() {
        $i=0;
        while ($i<1) {
            try {
              $txt= $this->queue->consume(array($this, 'processMessage'));
              if (!$txt)
              {
                 $starttime = microtime(true);   
                 file_put_contents("/Data/code/test/log.txt","我是在最后写进来的,时间:$starttime"."/n", FILE_APPEND);
              }
            } catch (Exception $e) {
                echo 'mongo:' . $e->getMessage() . "\n";
            }
        }
        $this->conn->disconnect();
    }

    public function userRouteName($userRouteName)
    {
        $this->userRouteName = $userRouteName;
        return $this;
    }

    /**
     * @return null|Q_Db_Mongo_Core_Base
     */
    public function mongo()
    {
        if (empty($this->mongo)) {
            $this->mongo = Q_Db::mongo('MessageLoggers');
        }
        return $this->mongo;
    }

    /**
     * 日志记录
     * @param array $package
     * @param string $collection
     * @return array
     */
    public function logs(array $package, $collection = 'from')
    {
        $logsResult = array(
            'message_id' => '',
            'time' => 0
        );
        try {
            $package['time'] =  (isset($package['time']))?$package['time']: time();
            $package['create_time'] = new MongoDate();
            $this->mongo()->collection($collection)->insert($package);
            $logsResult['message_id'] = $package['_id']->{'$id'};
            $logsResult['time'] = $package['time'];
        } catch (Exception $e) {
            echo 'mongo:' . $e->getMessage() . "\n";
        }
        return $logsResult;
    }

    /**
     * 消费回调函数
     * 处理消息
     */
    function processMessage($envelope, $queue) {
        try {
            $msg = $envelope->getBody();
            $messageData = json_decode($msg, true);
            file_put_contents("/Data/code/test/amqp.txt",$msg."/n", FILE_APPEND);
            if (empty($messageData)) {
                echo $msg . " Error \n";
                $this->logs(array('msg' => $msg), 'MessageError');
                $queue->ack($envelope->getDeliveryTag());
                return;
            }
            $agreement = YHMIm_Agreement::factory($messageData);
            $queueData = $agreement->getQueueData();
            $queue->ack($envelope->getDeliveryTag());
            if ($queueData['type'] == "yhm-product") {
                $queuePackageData = $agreement->getQueuePackageData();
                $skc = $queuePackageData['body']['product_skc'];
                $from_id = $agreement->from_uid;
                $to_arr = explode(":", $queueData['to']);
                $key = "yhm_product_" . $from_id ;
                $message = Q_Cache::factory('Memcached')->get($key);
                if ($message) {
                    //  print_r($message);
                    $agreement_mem = YHMIm_Agreement::factory($message);
                    $queueData_mem = $agreement_mem->getQueueData();
                    $queuePackageData_mem = $agreement_mem->getQueuePackageData();
                    $skc_mem = $queuePackageData_mem['body']['product_skc'];
                    //  echo   $skc."---".$skc_mem;
                    if ($skc == $skc_mem) {
                        return;
                    } else {
                        $queueData['to'] = $agreement->from_uid;
                        // $agreement->setQueueDataVar('from_uid',0);
                        $agreement->to_uids = array($agreement->from_uid);
                    }
                } else {
                    $queueData['to'] = $agreement->from_uid;
                 //   $agreement->setQueueDataVar('from_uid',0);
                    $agreement->to_uids = array($agreement->from_uid);
                }
                $list = Q_Cache::factory('Memcached')->set($key, $messageData, 86400);
            }
               if ($queueData['type'] == "yhm-order-info") {
                $queuePackageData = $agreement->getQueuePackageData();
                $code = $queuePackageData['body']['order_code'];
                $from_id = $agreement->from_uid;
                $to_arr = explode(":", $queueData['to']);
                $key = "yhm_product_" . $from_id ;
                $message = Q_Cache::factory('Memcached')->get($key);
                if ($message) {
                    //  print_r($message);
                    $agreement_mem = YHMIm_Agreement::factory($message);
                    $queueData_mem = $agreement_mem->getQueueData();
                    $queuePackageData_mem = $agreement_mem->getQueuePackageData();
                    $code_mem = $queuePackageData_mem['body']['order_code'];
                    //  echo   $skc."---".$skc_mem;
                    if ($code == $code_mem) {
                        return;
                    } else {
                        $queueData['to'] = $agreement->from_uid;
                        // $agreement->setQueueDataVar('from_uid',0);
                        $agreement->to_uids = array($agreement->from_uid);
                    }
                } else {
                    $queueData['to'] = $agreement->from_uid;
                 //   $agreement->setQueueDataVar('from_uid',0);
                    $agreement->to_uids = array($agreement->from_uid);
                }
                $list = Q_Cache::factory('Memcached')->set($key, $messageData, 86400);
            }
            
            
            
            
            if ($agreement->relay == false) {
                $queueData = $agreement->getQueueData();
                $to = empty($queueData['to']) ? 0 : $queueData['to'];
                $this->printLog($queueData, $to, $queueData['type'], $agreement->getErrorMessage());
                return;
            }
            #存储对话
            $talkID = $this->setTalk($agreement);
            #存储日志
            if ($queueData['type'] == "yhm-text") {
                $from_id = $agreement->from_uid;
                $to_arr = explode(":", $queueData['to']);
                $key = "yhm_product_" . $from_id ;
                $toUid = $to_arr[1];
                $message = Q_Cache::factory('Memcached')->get($key);
                if ($message) {
                    //   print_r($message);
                    if (!isset($message['pro_time'])) {
                        $agreement_mem = YHMIm_Agreement::factory($message);
                        $agreement_mem->setTalkID($talkID);
                        $agreement_mem->setQueueDataVar('to', 'user:' . $toUid);
                        $agreement_mem->setQueueDataVar('time', time()-1);
                        $agreement_mem->setQueueDataVar('to_uid', $toUid);
                        $agreement_mem->setQueueDataVar('delete_list', array((int)  $to_arr[1]));
                        $packacgeData_mem = $agreement_mem->getMessagePackageData();
                        $toUid_mem = $to_arr[1];
                        $resultLog_mem = $this->logs($packacgeData_mem, $this->messageCollection);
                        $agreement_mem->setMessageID($resultLog_mem['message_id']);
                        $queuePackageData_mem = $agreement_mem->getQueuePackageData();
                        //$this->printLog($queuePackageData_mem, $toUid_mem, $queuePackageData_mem['type']);
                        YHMIm_Relay::delivery($queuePackageData_mem, $toUid_mem);
                        $message['pro_time'] = time();
                        $list = Q_Cache::factory('Memcached')->set($key, $message, 86400);
                    }
                }
            }
            $agreement->setTalkID($talkID);
            foreach ($agreement->to_uids as $toUid) {
                $agreement->setQueueDataVar('to', 'user:' . $toUid);
                $agreement->setQueueDataVar('to_uid', $toUid);
                $packacgeData = $agreement->getMessagePackageData();
                $resultLog = $this->logs($packacgeData, $this->messageCollection);
                $agreement->setMessageID($resultLog['message_id']);
                $queuePackageData = $agreement->getQueuePackageData();
                $this->printLog($queuePackageData, $toUid, $queuePackageData['type']);
                YHMIm_Relay::delivery($queuePackageData, $toUid);
            }
        } catch (Exception $e) {
            echo 'process:' . $e->getMessage() . "\n";
        }
    }

    function printLog(array $package, $toUid, $logTitle = 'Logs', $message = '')
    {
        $from = empty($package['from']) ? 0 : $package['from'];
        $logs = "\n\n++++++++++++++++++++++ Begin " . $logTitle . " ++++++++++++++++++++++\n";
        $logs .= $this->userRouteName . $toUid . ' from:' . $from . ' to:' . $toUid . "\n";
        $logs .= json_encode($package);
        if (!empty($message)) {
            $logs .= "\nError: " . $message;
        }
        $logs .= "\n++++++++++++++++++++++ End  " . $logTitle . " +++++++++++++++++++++++\n";
        echo $logs;
    }

    /**
     * 存储对话数据
     * @param YHMIm_Agreement_Abstract $agreement
     * @return mixed
     */
    function setTalk(YHMIm_Agreement_Abstract $agreement)
    {
        $participator = $agreement->getParticipator();
        $queueData = $agreement->getQueueData();
        $talkID = YHMIm_Utils::getTalkID($participator);
        $talkPackage = new YHMIm_Package_Talk();
        $talkPackage->body = $agreement->getQueueDataBody();
        $talkPackage->participator = $participator;
        $talkPackage->talk_id = $talkID;
        $talkPackage->time = time();
        $talkPackage->type = $agreement->getQueueDataType();
        $talkPackage->from = $queueData['from'];
        $talkPackage->from_uid = $agreement->from_uid;

        $toData = $this->mongo()->collection($this->talkCollection)->findOne(array('talk_id' => $talkID));
        if ($toData == null) {
            $this->mongo()->collection($this->talkCollection)->insert((array)$talkPackage);
        } else {
            $messageID = $toData['_id']->{'$id'};
            $this->mongo()->collection($this->talkCollection)->update(array('_id' => new MongoID($messageID)), array(
                '$set' => array(
                    'unread_number' => $toData['unread_number'] + 1,
                    'type' => $talkPackage->type,
                    'body' => $talkPackage->body,
                    'time' => $talkPackage->time,
                    'delete_list' => array()
                )
            ));
        }
        return $talkID;
    }

    /**
     * 传递数据
     * @param $msgData
     * @param $uid
     */
    static function delivery(array $msgData, $uid)
    {
        $amqp = new YHMIm_Relay();
        $amqp->connect();
        $amqp->exchange('yhm.im');
        $amqp->publish($msgData, 'yhm.route.user.' . $uid);
        $amqp->disconnect();
    }
}