Relay22.php 11 KB
<?php

/**
 * Created by PhpStorm.
 * User: Ziy
 * Date: 14/9/5
 * Time: 上午6:50
 */
class YHMIm_Relay
{
    private $conn;

    private $exchange;

    private $queueName;

    private $channel;

    private $queue;

    private $userRouteName;

    private $mongo = null;

    private $messageCollection = 'Message';

    private $talkCollection = 'Talk';

    public function connect()
    {
        if (empty($this->conn)) {
            $amqp = new Q_Message_Amqp_Connection();
            $this->conn = $amqp->connect();
        }
    }

    /**
     * 路由
     * @param $exchangeName
     * @return $this
     */
    public function exchange($exchangeName)
    {
        $this->connect();
        $this->channel = new AMQPChannel($this->conn);
        $this->channel->setPrefetchCount(10);
        $this->exchange = new AMQPExchange($this->channel);
        $this->exchange->setFlags(AMQP_DURABLE);
        $this->exchange->setName($exchangeName);
        return $this;
    }

    public function setType($type = AMQP_EX_TYPE_DIRECT)
    {
        //direct类型
        $this->exchange->setType($type);
        return $this;
    }

    public function setFlags($flags = AMQP_DURABLE)
    {
        $this->exchange->setFlags($flags); //持久化
        return $this;
    }

    public function declareExchange()
    {
        $this->exchange->declareExchange();
        return $this;
    }

    public function publish(array $message, $keyName)
    {
        $this->exchange->publish(json_encode($message), $keyName);
    }

    public function disconnect()
    {
        $this->conn->disconnect();
    }

    /**
     * 设置队列名
     * @param $queueName
     * @return $this
     */
    public function setQueueName($queueName)
    {
        $this->queueName = $queueName;
        return $this;
    }


    public function queue($queueName, $flags = AMQP_DURABLE)
    {
        $this->queue = new AMQPQueue($this->channel);
        $this->queue->setName($queueName);
        $this->queue->setFlags($flags);
        return $this->queue;
    }

    /**
     * Message Total
     * @return mixed
     */
    public function declareQueue()
    {
        return $this->queue->declareQueue();
    }

    /**
     * 绑定交换机与队列,并指定路由键
     * @param $exchangeName
     * @param $routeName
     * @return mixed
     */
    public function bind($exchangeName, $routeName)
    {
        return $this->queue->bind($exchangeName, $routeName);
    }

    /**
     * 处理数据
     */
    public function run()
    {
         while (True) {
           if (!$this->conn->isConnected()) {
                echo "--> connected" . $this->conn->isConnected() . "\n";
              //  $this->conn->connect();
                 $amqp = new Q_Message_Amqp_Connection();
                 $this->conn = $amqp->connect();
            }
            $this->queue->consume(array($this, 'processMessage'));
        }
         
//         while (true) {
//            $message = $this->queue->get();
//            if ($message === false) {
//                sleep(1);
//                continue;
//            }
//            $this->processMessage($message);
//            //$this->queue->consume(array($this, 'processMessage'));
//         }
    //   $this->queue->consume(array($this, 'processMessage'));
        $this->conn->disconnect();
    }


    public function userRouteName($userRouteName)
    {
        $this->userRouteName = $userRouteName;
        return $this;
    }

    /**
     * @return null|Q_Db_Mongo_Core_Base
     */
    public function mongo()
    {
        if (empty($this->mongo)) {
            $this->mongo = Q_Db::mongo('MessageLoggers');
        }
        return $this->mongo;
    }

    /**
     * 日志记录
     * @param array $package
     * @param string $collection
     * @return array
     */
    public function logs(array $package, $collection = 'from')
    {
        $logsResult = array(
            'message_id' => '',
            'time' => 0
        );
        try {
            $package['time'] = time();
            $package['create_time'] = new MongoDate();
            $this->mongo()->collection($collection)->insert($package);
            $logsResult['message_id'] = $package['_id']->{'$id'};
            $logsResult['time'] = $package['time'];
        } catch (Exception $e) {
            echo 'mongo:' . $e->getMessage() . "\n";
        }
        return $logsResult;
    }

    /**
     * 消费回调函数
     * 处理消息
     */
    function processMessage($envelope,$queue)
    {
        $msg = $envelope->getBody();
        $messageData = json_decode($msg, true);
        if (empty($messageData)) {
            echo $msg . " Error \n";
            $this->logs(array('msg' => $msg), 'MessageError');
           $queue->ack($envelope->getDeliveryTag());
            return;
        }
        $agreement = YHMIm_Agreement::factory($messageData);
        $queueData = $agreement->getQueueData();
      if ($queueData['type'] == "yhm-product") {
            $queuePackageData = $agreement->getQueuePackageData();
            $skc = $queuePackageData['body']['product_skc'];
            $from_id = $agreement->from_uid;
            $to_arr=  explode(":", $queueData['to']);
            $key="yhm_product_".$from_id."_".$to_arr[1];
            $message= Q_Cache::factory('Memcached')->get($key);
            if ($message)
            {
             //  print_r($message);
                $agreement_mem = YHMIm_Agreement::factory($message);
                $queueData_mem = $agreement_mem->getQueueData();
                $queuePackageData_mem = $agreement_mem->getQueuePackageData();
                $skc_mem = $queuePackageData_mem['body']['product_skc'];
               //  echo   $skc."---".$skc_mem;
                if($skc==$skc_mem)
                {
                   return ;
                }else{
                    $queueData['to']=$agreement->from_uid;
                    $agreement->to_uids=array($agreement->from_uid);
                 //   $agreement->delete_list=array($to_arr[1]);
                     $agreement->setQueueDataVar('delete_list', array((int)$to_arr[1]));
                }
            }else{
                    $queueData['to']=$agreement->from_uid;
                    $agreement->to_uids=array($agreement->from_uid);
                    $agreement->setQueueDataVar('delete_list', array((int)$to_arr[1]));
                }
           //  echo $key;   
            $list= Q_Cache::factory('Memcached')->set($key,$messageData,86400); 
        }
  
   
    
        $queue->ack($envelope->getDeliveryTag());
        if ($agreement->relay == false) {
            $to = empty($queueData['to']) ? 0 : $queueData['to'];
            $this->printLog($queueData, $to, $queueData['type'], $agreement->getErrorMessage());
            return;
        }
        #存储对话
        $talkID = $this->setTalk($agreement);
        #存储日志
        $agreement->setTalkID($talkID);
//       if ($queueData['type'] == "yhm-text") {
//           $from_id = $agreement->from_uid;
//           $to_arr=  explode(":", $queueData['to']);
//           $key="yhm_product_".$from_id."_".$to_arr[1];
//           $message= Q_Cache::factory('Memcached')->get($key);
//           if ($message)
//            {
//            //   print_r($message);
//           if (!isset($message['pro_time']))
//           {
//            $agreement_mem = YHMIm_Agreement::factory($message);
//            $packacgeData_mem =$agreement_mem->getMessagePackageData();
//            $toUid_mem=$to_arr[1];
//            $agreement_mem->setTalkID($talkID);
//            $resultLog_mem = $this->logs($packacgeData_mem, $this->messageCollection);
//            $agreement_mem->setMessageID($resultLog_mem['message_id']);
//            $queuePackageData_mem = $agreement_mem->getQueuePackageData();
//            $this->printLog($queuePackageData_mem, $toUid_mem, $queuePackageData_mem['type']);
//            YHMIm_Relay::delivery($queuePackageData_mem, $toUid_mem);
//             $message['pro_time']=time();
//             $list= Q_Cache::factory('Memcached')->set($key,$message,86400); 
//             }
//            }
//      }
       // print_r($agreement->to_uids);
        foreach ($agreement->to_uids as $toUid) {
            $agreement->setQueueDataVar('to', 'user:' . $toUid);
            $agreement->setQueueDataVar('to_uid', $toUid);
            $packacgeData = $agreement->getMessagePackageData();
            $resultLog = $this->logs($packacgeData, $this->messageCollection);
            $agreement->setMessageID($resultLog['message_id']);
            $queuePackageData = $agreement->getQueuePackageData();
            $this->printLog($queuePackageData, $toUid, $queuePackageData['type']);
            YHMIm_Relay::delivery($queuePackageData, $toUid);
        }
    }

    function printLog(array $package, $toUid, $logTitle = 'Logs', $message = '')
    {
        $from = empty($package['from']) ? 0 : $package['from'];
        $logs = "\n\n++++++++++++++++++++++ Begin " . $logTitle . " ++++++++++++++++++++++\n";
        $logs .= $this->userRouteName . $toUid . ' from:' . $from . ' to:' . $toUid . "\n";
        $logs .= json_encode($package);
        if (!empty($message)) {
            $logs .= "\nError: " . $message;
        }
        $logs .= "\n++++++++++++++++++++++ End  " . $logTitle . " +++++++++++++++++++++++\n";
        echo $logs;
    }

    /**
     * 存储对话数据
     * @param YHMIm_Agreement_Abstract $agreement
     * @return mixed
     */
    function setTalk(YHMIm_Agreement_Abstract $agreement)
    {
        $participator = $agreement->getParticipator();
        $queueData = $agreement->getQueueData();
        $talkID = YHMIm_Utils::getTalkID($participator);
        $talkPackage = new YHMIm_Package_Talk();
        $talkPackage->body = $agreement->getQueueDataBody();
        $talkPackage->participator = $participator;
        $talkPackage->talk_id = $talkID;
        $talkPackage->time = time();
        $talkPackage->type = $agreement->getQueueDataType();
        $talkPackage->from = $queueData['from'];
        $talkPackage->from_uid = $agreement->from_uid;

        $toData = $this->mongo()->collection($this->talkCollection)->findOne(array('talk_id' => $talkID));
        if ($toData == null) {
            $this->mongo()->collection($this->talkCollection)->insert((array)$talkPackage);
        } else {
            $messageID = $toData['_id']->{'$id'};
            $this->mongo()->collection($this->talkCollection)->update(array('_id' => new MongoID($messageID)), array(
                '$set' => array(
                    'unread_number' => $toData['unread_number'] + 1,
                    'type' => $talkPackage->type,
                    'body' => $talkPackage->body,
                    'time' => $talkPackage->time,
                    'delete_list' => array()
                )
            ));
        }
        return $talkID;
    }

    /**
     * 传递数据
     * @param $msgData
     * @param $uid
     */
    static function delivery(array $msgData, $uid)
    {
        $amqp = new YHMIm_Relay();
        $amqp->connect();
        $amqp->exchange('yhm.im');
        $amqp->publish($msgData, 'yhm.route.user.' . $uid);
        $amqp->disconnect();
    }
}