Yac.php 9.23 KB
<?php


namespace Hood\Utils\Rpc;


class Yac implements IFace
{
    protected static $_callbackList = array();

    protected $_url;

    protected $_method;

    protected $_concurrent = false;

    protected $_format = 'json';

    protected $_callback;

    protected $_errorCallback;

    protected $_timeout = 10;

    protected $_connectTimeout = 10;

    protected static $loopTimeout = 10;

    protected $_base = '/rest/';


    public function __construct($url, $base='')
    {
        if (empty($url)) {
            throw new \Exception('调用地址不能为空');
        }
        $this->_url = $url;
        if (!empty($base)) {
            $this->_url = $base . ltrim($url, '/');
        }
        $this->errorCallback(function ($errno, $error, $callinfo) {

        });
    }


    public function __call($method, $args)
    {
        $this->method($method);
        return call_user_func_array(array($this, 'exec'), $args);
    }

    /**
     * 设置超时时间(秒)
     * @param $second
     * @return $this
     */
    public function timeout($second)
    {
        if ((int)$second >= 1) {
            $this->_timeout = (int)$second;
        }
        return $this;
    }


    /**
     * 设置超时时间(秒)
     * @param $second
     * @return $this
     */
    public function connectTimeout($second)
    {
        if ((int)$second >= 1) {
            $this->_connectTimeout = (int)$second;
        }
        return $this;
    }



    /**
     * 执行
     * @return array|bool|mixed
     */
    public function exec()
    {
        if (!$this->_concurrent) {
            return $this->single(func_get_args());
        } else {
            if ($this->_format == 'php') {
                $args = serialize(func_get_args());
            } else {
                $args = json_encode(func_get_args());
            }
            Yac::$_callbackList[] = array(
                'url' => rtrim($this->_url, '/') . '/' . $this->_method,
                'callback' => $this->_callback,
                'format' => $this->_format,
                'param' => $args,
                'timeout' => $this->_timeout,
                'connecttimeout' => $this->_connectTimeout,
                'errorCallback' => $this->_errorCallback
            );
            return true;
        }
    }


    /**
     * 串行时调用
     * @param $args
     * @return array|mixed
     */
    protected function single($args)
    {
        $headers = array('User-Agent:Yac');
        if ($this->_format == 'json') {
            $headers[] = 'Content-Type:application/json';
        } else if ($this->_format == 'php') {
            $headers[] = 'Content-Type:application/php';
        }

        $ch = curl_init();
        curl_setopt($ch, CURLOPT_ENCODING, "gzip");
        curl_setopt($ch, CURLOPT_URL, rtrim($this->_url, '/') . '/' . $this->_method);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($ch, CURLOPT_HEADER, 0);
        curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
        curl_setopt($ch, CURLOPT_TIMEOUT, $this->_timeout);
        curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, $this->_connectTimeout);
        curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($args));
        $output = curl_exec($ch);

        $callinfo = curl_getinfo($ch);
        $error = curl_error($ch);
        $errno = curl_errno($ch);

        curl_close($ch);


        if (empty($output)) {
            throw new \Exception(400, '服务端返回结果为空');
        }
        if ($this->_format == 'php') {
            $result = @unserialize($output);
        } else {
            $result = @json_decode($output, true);
        }

        if ($callinfo['http_code'] == 200 ) {
            if (isset($result['code']) && $result['code'] == 200) {
                return $result['data'];
            } else if (isset($result['code'])) {
                throw new  \Exception($result['message'], $result['code']);
            } else {
                throw new \Exception($output, $callinfo['http_code']);
            }
        } else {
            if (isset($result['code'])) {
                throw new \Exception($result['message'], $result['code']);
            } else {
                throw new \Exception($error. ":" .$output, $callinfo['http_code']);
            }
        }
    }


    /**
     * 设置调用的方法
     * @param $method
     * @return $this
     * @throws \Exception
     */
    public function method($method)
    {
        if (empty($method)) {
            throw new \Exception('调用方法不能为空');
        }
        $this->_method = $method;
        return $this;
    }


    /**
     * 正常返回结果时回调
     * @param $callback
     * @return $this
     */
    public function callback($callback)
    {
        if (!empty($callback) && is_callable($callback)) {
            $this->_callback = $callback;
        }

        return $this;
    }


    /**
     * 调用错误时回调, 回调时传入3个参数,curl的errno, 返回结果+curl错误信息,curl调用的信息
     * @param $errorCallback
     * @return $this
     */
    public function errorCallback($errorCallback)
    {
        if (!empty($errorCallback) && is_callable($errorCallback)) {
            $this->_errorCallback = $errorCallback;
        }
        return $this;
    }


    /**
     * 是否并行
     * @param bool|false $concurrent
     * @return $this
     */
    public function concurrent($concurrent=false)
    {
        $this->_concurrent = (bool)$concurrent;
        return $this;
    }

    /**
     * 返回一个curl资源
     * @param $resources
     * @return resource
     */
    protected static function curl_handle($resources)
    {
        $headers = array('User-Agent:Yac');
        if ($resources['format'] == 'json') {
            $headers[] = 'Content-Type:application/json';
        }
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_ENCODING, "gzip");
        curl_setopt($ch, CURLOPT_URL, $resources['url']);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($ch, CURLOPT_HEADER, 0);
        curl_setopt($ch, CURLOPT_HTTPHEADER, array('User-Agent:Yac'));
        curl_setopt($ch, CURLOPT_TIMEOUT, $resources['timeout']);
        curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, $resources['connecttimeout']);
        curl_setopt($ch, CURLOPT_POSTFIELDS, $resources['param']);

        return $ch;
    }


    /**
     * 发起并行请求
     */
    public static function loop($callback=null, $errorCallback=null)
    {
        $master = curl_multi_init();
        $requestMap = array();
        foreach (self::$_callbackList as $key=>$val) {
            $ch = self::curl_handle($val);
            curl_multi_add_handle($master, $ch);
            $requestMap[(string)$ch] = $key;
        }

        do {

            while (($execrun = curl_multi_exec($master, $running)) == CURLM_CALL_MULTI_PERFORM) ;
            if ($execrun != CURLM_OK) {
                break;
            }
            // a request was just completed -- find out which one
            while ($done = curl_multi_info_read($master)) {
                // get the info and content returned on the request
                $callinfo = curl_getinfo($done['handle']);
                $output = curl_multi_getcontent($done['handle']);
                $error = curl_error($done['handle']);
                $errno = curl_errno($done['handle']);
                $key = (string) $done['handle'];
                self::$_callbackList[$requestMap[$key]]['output'] = $output;
                self::$_callbackList[$requestMap[$key]]['callinfo'] = $callinfo;
                self::$_callbackList[$requestMap[$key]]['error'] = $error;
                self::$_callbackList[$requestMap[$key]]['errno'] = $errno;

                // remove the curl handle that just completed
                curl_multi_remove_handle($master, $done['handle']);

            }

            if ($running > 0) {
                curl_multi_select($master, self::$loopTimeout);
            }

        } while ($running);

        curl_multi_close($master);

        foreach (self::$_callbackList as $val) {

            if ($val['format'] == 'php') {
                $result = @unserialize($val['output']);
            } else {
                $result = @json_decode($val['output'], true);
            }

            if ($val['callinfo']['http_code'] == 200) {
                if (isset($result['code']) && $result['code'] == 200) {
                    call_user_func($val['callback'], $result['data']);
                } else if (isset($result['code']) && !empty($val['errorCallback'])) {
                    call_user_func($val['errorCallback'], $result['code'], $result['message'], $val['callinfo']);
                } else if (!empty($val['errorCallback'])) {
                    call_user_func($val['errorCallback'], $val['errno'], $val['error'].$val['output'], $val['callinfo']);
                }
            } else {
                if (isset($result['code']) && !empty($val['errorCallback'])) {
                    call_user_func($val['errorCallback'], $result['code'], $result['message'], $val['callinfo']);
                } else if (!empty($val['errorCallback'])) {
                    call_user_func($val['errorCallback'], $val['errno'], $val['error'].$val['output'], $val['callinfo']);
                }
            }
        }

        self::$_callbackList = array();
    }
}