Yar.php 3.63 KB
<?php

namespace Hood\Utils\Rpc;

class Yar implements IFace
{
    protected static $_callbackList = array();


    protected $_url;

    protected $_method;

    protected $_concurrent = false;

    protected $_format = 'php';

    protected $_callback;

    protected $_errorCallback;

    protected $_timeout = 5;


    public function __construct($url)
    {
        if (empty($url)) {
            throw new \Exception('地址不能为空!');
        }
        $this->_url = $url;
        $this->errorCallback(function ($errno, $error, $callinfo) {

        });
    }

    /**
     * 设置要调用的远程方法名称
     * @param $method
     * @return $this
     * @throws \Exception
     */
    public function method($method)
    {
        if (empty($method)) {
            throw new \Exception('方法不能为空');
        }
        $this->_method = $method;
        return $this;
    }


    /**
     * 设置超时时间(秒)
     * @param $second
     * @return $this
     */
    public function timeout($second)
    {
        if ((int)$second >= 1) {
            $this->_timeout = (int)$second * 1000;
        }
        return $this;
    }


    /**
     * 远程调用
     * @param $method
     * @param $args
     */
    public function __call($method, $args)
    {
        $this->method($method);
        return call_user_func_array(array($this, 'exec'), $args);
    }


    /**
     * 设置回调方法
     * @param $callback
     * @return $this
     * @throws \Exception
     */
    public function callback($callback)
    {
        if (!empty($callback) && is_callable($callback)) {
            $this->_callback = $callback;
        }

        return $this;
    }


    /**
     * 异常时的回调方法
     * @param $errorCallback
     * @return $this
     * @throws \Exception
     */
    public function errorCallback($errorCallback)
    {
        if (!empty($errorCallback) && is_callable($errorCallback)) {
            $this->_errorCallback = $errorCallback;
        }

        return $this;
    }

    /**
     * 设置是否并行
     * @param $concurrent
     * @return $this
     */
    public function concurrent($concurrent)
    {
        $this->_concurrent = (bool)$concurrent;
        return $this;
    }


    /**
     * 串行时使用的执行,参数按接口的参数顺序
     * @return bool
     */
    public function exec()
    {
        if (empty($this->_method)) {
            throw new \Exception('远程调用方法不能为空!');
        }
        if ($this->_concurrent) {//并行
            \Yar_Concurrent_Client::call($this->_url, $this->_method, func_get_args(), $this->_callback, $this->_errorCallback, array('YAR_OPT_TIMEOUT' => $this->_timeout, 'YAR_OPT_CONNECT_TIMEOUT' => $this->_timeout));
            return true;
        } else {//串行
            try {
                $client = new \Yar_Client($this->_url);
                $client->SetOpt(YAR_OPT_CONNECT_TIMEOUT, $this->_timeout);
                $client->SetOpt(YAR_OPT_TIMEOUT, $this->_timeout);
                return call_user_func_array(array($client, $this->_method), func_get_args());
            } catch (\Exception $e) {
                if (!empty($this->_errorCallback)) {
                    call_user_func_array($this->_errorCallback, array($e, $e, $e));
                }
                throw new \Exception($e->getMessage(), $e->getCode());
            }
        }
    }


    /**
     * 发起请求
     * @param null $callback
     * @param null $errorCallback
     */
    public static function loop($callback = null, $errorCallback = null)
    {
        \Yar_Concurrent_Client::loop($callback, $errorCallback);
        \Yar_Concurrent_Client::reset();
    }

}