Relay.bak.php 9.27 KB
<?php

/**
 * Created by PhpStorm.
 * User: Ziy
 * Date: 14/9/5
 * Time: 上午6:50
 */
class YHMIm_Relay
{
    private $conn;

    private $exchange;

    private $queueName;

    private $channel;

    private $queue;

    private $userRouteName;

    private $mongo = null;

    private $messageCollection = 'Message';

    private $talkCollection = 'Talk';

    public function connect()
    {
        if (empty($this->conn)) {
            $amqp = new Q_Message_Amqp_Connection();
            $this->conn = $amqp->connect();
        }
    }

    /**
     * 路由
     * @param $exchangeName
     * @return $this
     */
    public function exchange($exchangeName)
    {
        $this->connect();
        $this->channel = new AMQPChannel($this->conn);
        $this->channel->setPrefetchCount(10);
        $this->exchange = new AMQPExchange($this->channel);
        $this->exchange->setFlags(AMQP_DURABLE);
        $this->exchange->setName($exchangeName);
        return $this;
    }

    public function setType($type = AMQP_EX_TYPE_DIRECT)
    {
        //direct类型
        $this->exchange->setType($type);
        return $this;
    }

    public function setFlags($flags = AMQP_DURABLE)
    {
        $this->exchange->setFlags($flags); //持久化
        return $this;
    }

    public function declareExchange()
    {
        $this->exchange->declareExchange();
        return $this;
    }

    public function publish(array $message, $keyName)
    {
        $this->exchange->publish(json_encode($message), $keyName);
    }

    public function disconnect()
    {
        $this->conn->disconnect();
    }

    /**
     * 设置队列名
     * @param $queueName
     * @return $this
     */
    public function setQueueName($queueName)
    {
        $this->queueName = $queueName;
        return $this;
    }


    public function queue($queueName, $flags = AMQP_DURABLE)
    {
        $this->queue = new AMQPQueue($this->channel);
        $this->queue->setName($queueName);
        $this->queue->setFlags($flags);
        return $this->queue;
    }

    /**
     * Message Total
     * @return mixed
     */
    public function declareQueue()
    {
        return $this->queue->declareQueue();
    }

    /**
     * 绑定交换机与队列,并指定路由键
     * @param $exchangeName
     * @param $routeName
     * @return mixed
     */
    public function bind($exchangeName, $routeName)
    {
        return $this->queue->bind($exchangeName, $routeName);
    }

    /**
     * 处理数据
     */
    public function run()
    {
        while (True) {
           if (!$this->conn->isConnected()) {
                echo "--> connected" . $this->conn->isConnected() . "\n";
              //  $this->conn->connect();
                 $amqp = new Q_Message_Amqp_Connection();
                 $this->conn = $amqp->connect();
            }
            $this->queue->consume(array($this, 'processMessage'));
        }
        $this->conn->disconnect();
    }


    public function userRouteName($userRouteName)
    {
        $this->userRouteName = $userRouteName;
        return $this;
    }

    /**
     * @return null|Q_Db_Mongo_Core_Base
     */
    public function mongo()
    {
        if (empty($this->mongo)) {
            $this->mongo = Q_Db::mongo('MessageLoggers');
        }
        return $this->mongo;
    }

    /**
     * 日志记录
     * @param array $package
     * @param string $collection
     * @return array
     */
    public function logs(array $package, $collection = 'from')
    {
        $logsResult = array(
            'message_id' => '',
            'time' => 0
        );
        try {
            $package['time'] = time();
            $package['create_time'] = new MongoDate();
            $this->mongo()->collection($collection)->insert($package);
            $logsResult['message_id'] = $package['_id']->{'$id'};
            $logsResult['time'] = $package['time'];
        } catch (Exception $e) {
            echo 'mongo:' . $e->getMessage() . "\n";
        }
        return $logsResult;
    }

    /**
     * 消费回调函数
     * 处理消息
     */
    function processMessage($envelope, $queue)
    {
        $msg = $envelope->getBody();
        $messageData = json_decode($msg, true);
        if (empty($messageData)) {
            echo $msg . " Error \n";
            $this->logs(array('msg' => $msg), 'MessageError');
            $queue->ack($envelope->getDeliveryTag());
            return;
        }
        $agreement = YHMIm_Agreement::factory($messageData);
         $queueData = $agreement->getQueueData();
        if ($queueData['type'] == "yhm-product") {
            $queuePackageData = $agreement->getQueuePackageData();
            $skc = $queuePackageData['body']['product_skc'];
            $from_id = $agreement->from_uid;
            $arr = array(
                'type' => 'yhm-product',
                'body.product_skc' => (int) $skc,
                "from_uid" => (int) $from_id,
                'time' => array(
                    '$gt' => time() - 84600
                )
            );
            $mongo = Q_Db::mongo('MessageLoggers')->collection('Message');
            $count = $mongo->count($arr);
            if ($count > 1) {
                return;
            }
        }
        if ($queueData['type'] == "yhm-order-info") {
            $queuePackageData = $agreement->getQueuePackageData();
            $skc = $queuePackageData['body']['product_skc'];
            $from_id = $agreement->from_uid;
            $arr = array(
                'type' => 'yhm-info',
                'body.product_skc' => (int) $skc,
                "from_uid" => (int) $from_id,
                'time' => array(
                    '$gt' => time() - 84600
                )
            );
            $mongo = Q_Db::mongo('MessageLoggers')->collection('Message');
            $count = $mongo->count($arr);
            if ($count > 1) {
                return;
            }
        }
        $queue->ack($envelope->getDeliveryTag());
        if ($agreement->relay == false) {
            $queueData = $agreement->getQueueData();
            $to = empty($queueData['to']) ? 0 : $queueData['to'];
            $this->printLog($queueData, $to, $queueData['type'], $agreement->getErrorMessage());
            return;
        }
        #存储对话
        $talkID = $this->setTalk($agreement);
        #存储日志
        $agreement->setTalkID($talkID);
        foreach ($agreement->to_uids as $toUid) {
            $agreement->setQueueDataVar('to', 'user:' . $toUid);
            $agreement->setQueueDataVar('to_uid', $toUid);
            $packacgeData = $agreement->getMessagePackageData();
            $resultLog = $this->logs($packacgeData, $this->messageCollection);
            $agreement->setMessageID($resultLog['message_id']);
            $queuePackageData = $agreement->getQueuePackageData();
            $this->printLog($queuePackageData, $toUid, $queuePackageData['type']);
           // YHMIm_Relay::delivery($queuePackageData, $toUid);
        }
    }

    function printLog(array $package, $toUid, $logTitle = 'Logs', $message = '')
    {
        $from = empty($package['from']) ? 0 : $package['from'];
        $logs = "\n\n++++++++++++++++++++++ Begin " . $logTitle . " ++++++++++++++++++++++\n";
        $logs .= $this->userRouteName . $toUid . ' from:' . $from . ' to:' . $toUid . "\n";
        $logs .= json_encode($package);
        if (!empty($message)) {
            $logs .= "\nError: " . $message;
        }
        $logs .= "\n++++++++++++++++++++++ End  " . $logTitle . " +++++++++++++++++++++++\n";
        echo $logs;
    }

    /**
     * 存储对话数据
     * @param YHMIm_Agreement_Abstract $agreement
     * @return mixed
     */
    function setTalk(YHMIm_Agreement_Abstract $agreement)
    {
        $participator = $agreement->getParticipator();
        $queueData = $agreement->getQueueData();
        $talkID = YHMIm_Utils::getTalkID($participator);
        $talkPackage = new YHMIm_Package_Talk();
        $talkPackage->body = $agreement->getQueueDataBody();
        $talkPackage->participator = $participator;
        $talkPackage->talk_id = $talkID;
        $talkPackage->time = time();
        $talkPackage->type = $agreement->getQueueDataType();
        $talkPackage->from = $queueData['from'];
        $talkPackage->from_uid = $agreement->from_uid;

        $toData = $this->mongo()->collection($this->talkCollection)->findOne(array('talk_id' => $talkID));
        if ($toData == null) {
            $this->mongo()->collection($this->talkCollection)->insert((array)$talkPackage);
        } else {
            $messageID = $toData['_id']->{'$id'};
            $this->mongo()->collection($this->talkCollection)->update(array('_id' => new MongoID($messageID)), array(
                '$set' => array(
                    'unread_number' => $toData['unread_number'] + 1,
                    'type' => $talkPackage->type,
                    'body' => $talkPackage->body,
                    'time' => $talkPackage->time,
                    'delete_list' => array()
                )
            ));
        }
        return $talkID;
    }

    /**
     * 传递数据
     * @param $msgData
     * @param $uid
     */
    static function delivery(array $msgData, $uid)
    {
        $amqp = new YHMIm_Relay();
        $amqp->connect();
        $amqp->exchange('yhm.im');
        $amqp->publish($msgData, 'yhm.route.user.' . $uid);
        $amqp->disconnect();
    }
}