Httpc.php 7.66 KB
<?php


namespace Hood\Utils\Rpc;


class Httpc implements IFace
{
    protected static $_callbackList = array();


    protected $_url;

    protected $_method;

    protected $_concurrent = false;

    protected $_format = 'php';

    protected $_callback;

    protected $_errorCallback;

    protected $_timeout = 5;

    protected static $loopTimeout = 5;


    public function __construct($url)
    {
        if (empty($url)) {
            throw new \Exception('调用地址不能为空');
        }
        $this->_url = $url;
        $this->errorCallback(function ($errno, $error, $callinfo) {

        });
    }


    public function __call($method, $args)
    {
        $this->method($method);
        return call_user_func_array(array($this, 'exec'), $args);
    }

    /**
     * 设置超时时间(秒)
     * @param $second
     * @return $this
     */
    public function timeout($second)
    {
        if ((int)$second >= 1) {
            $this->_timeout = (int)$second;
        }
        return $this;
    }


    /**
     * 执行
     * @return array|bool|mixed
     */
    public function exec()
    {
        if (!$this->_concurrent) {
            return $this->single(func_get_args());
        } else {
            if ($this->_format == 'php') {
                $args = serialize(func_get_args());
            } else {
                $args = json_encode(func_get_args());
            }
            Httpc::$_callbackList[] = array(
                'url' => $this->_url,
                'callback' => $this->_callback,
                'format' => $this->_format,
                'method' => $this->_method,
                'param' => $args,
                'timeout' => $this->_timeout,
                'errorCallback' => $this->_errorCallback
            );
            return true;
        }
    }


    /**
     * 串行时调用
     * @param $args
     * @return array|mixed
     */
    protected function single($args)
    {
        if ($this->_format == 'php') {
            $args = serialize($args);
        } else {
            $args = json_encode($args);
        }
        $data = array(
            'method' => $this->_method,
            'format' => $this->_format,
            'param' => $args
        );
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $this->_url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($ch, CURLOPT_HEADER, 0);
        curl_setopt($ch, CURLOPT_HTTPHEADER, array('User-Agent:HTTPC'));
        curl_setopt($ch, CURLOPT_TIMEOUT, $this->_timeout);
        curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query($data, '', '&'));
        $output = curl_exec($ch);

        $callinfo = curl_getinfo($ch);
        $error = curl_error($ch);
        $errno = curl_errno($ch);

        curl_close($ch);


        if (empty($output)) {
            return array();
        }
        if ($this->_format == 'php') {
            $result = @unserialize($output);
        } else {
            $result = @json_decode($output, true);
        }

        if ($result === false) {
            if (!empty($this->_errorCallback)) {
                call_user_func($this->_errorCallback, $errno, $output.$error, $callinfo);
            }
            return $output;
        }

        return $result;
    }


    /**
     * 设置调用的方法
     * @param $method
     * @return $this
     * @throws \Exception
     */
    public function method($method)
    {
        if (empty($method)) {
            throw new \Exception('调用方法不能为空');
        }
        $this->_method = $method;
        return $this;
    }


    /**
     * 正常返回结果时回调
     * @param $callback
     * @return $this
     */
    public function callback($callback)
    {
        if (!empty($callback) && is_callable($callback)) {
            $this->_callback = $callback;
        }

        return $this;
    }


    /**
     * 调用错误时回调, 回调时传入3个参数,curl的errno, 返回结果+curl错误信息,curl调用的信息
     * @param $errorCallback
     * @return $this
     */
    public function errorCallback($errorCallback)
    {
        if (!empty($errorCallback) && is_callable($errorCallback)) {
            $this->_errorCallback = $errorCallback;
        }
        return $this;
    }


    /**
     * 是否并行
     * @param bool|false $concurrent
     * @return $this
     */
    public function concurrent($concurrent=false)
    {
        $this->_concurrent = (bool)$concurrent;
        return $this;
    }

    /**
     * 返回一个curl资源
     * @param $resources
     * @return resource
     */
    protected static function curl_handle($resources)
    {
        $data = array(
            'method' => $resources['method'],
            'format' => $resources['format'],
            'param' => $resources['param']
        );
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $resources['url']);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($ch, CURLOPT_HEADER, 0);
        curl_setopt($ch, CURLOPT_HTTPHEADER, array('User-Agent:HTTPC'));
        curl_setopt($ch, CURLOPT_TIMEOUT, $resources['timeout']);
        curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query($data, '', '&'));

        return $ch;
    }


    /**
     * 发起并行请求
     */
    public static function loop($callback=null, $errorCallback=null)
    {
        $master = curl_multi_init();
        $requestMap = array();
        foreach (self::$_callbackList as $key=>$val) {
            $ch = self::curl_handle($val);
            curl_multi_add_handle($master, $ch);
            $requestMap[(string)$ch] = $key;
        }

        do {

            while (($execrun = curl_multi_exec($master, $running)) == CURLM_CALL_MULTI_PERFORM) ;
            if ($execrun != CURLM_OK) {
                break;
            }
            // a request was just completed -- find out which one
            while ($done = curl_multi_info_read($master)) {
                // get the info and content returned on the request
                $callinfo = curl_getinfo($done['handle']);
                $output = curl_multi_getcontent($done['handle']);
                $error = curl_error($done['handle']);
                $errno = curl_errno($done['handle']);
                $key = (string) $done['handle'];
                self::$_callbackList[$requestMap[$key]]['output'] = $output;
                self::$_callbackList[$requestMap[$key]]['callinfo'] = $callinfo;
                self::$_callbackList[$requestMap[$key]]['error'] = $error;
                self::$_callbackList[$requestMap[$key]]['errno'] = $errno;

                // remove the curl handle that just completed
                curl_multi_remove_handle($master, $done['handle']);

            }

            if ($running > 0) {
                curl_multi_select($master, self::$loopTimeout);
            }

        } while ($running);

        curl_multi_close($master);

        foreach (self::$_callbackList as $val) {
            if ($val['format'] == 'php' && $val['output'] !== false && $val['callinfo']['http_code'] == 200) {
                $result = @unserialize($val['output']);
            } else if ($val['output'] !== false && $val['callinfo']['http_code'] == 200) {
                $result = @json_decode($val['output'], true);
            } else {
                $result = $val['output'];
            }
            if ($val['callinfo']['http_code'] != 200 && isset($val['errorCallback'])) {
                call_user_func($val['errorCallback'], $val['errno'], $val['output'].$val['error'], $val['callinfo']);
            } else if ($val['callinfo']['http_code'] == 200) {
                call_user_func($val['callback'], $result, $val['callinfo']);
            }
        }

        self::$_callbackList = array();
    }
}