Authored by 沈括号

rpc

<?php
namespace Hood\Utils\Rpc;
class Factory
{
private static $type = null;
/**
* 返回一个rpc
* @param $url
* @param string $type
* @return Httpc|Yar
*/
public static function getInstance($url, $type='yar')
{
if (strtolower($type) == 'yar') {
self::$type = 'yar';
return new Yar($url);
} else {
self::$type = 'httpc';
return new Httpc($url);
}
}
public static function loop($callback=null, $errorCallback=null)
{
if (self::$type == 'yar') {
Yar::loop($callback, $errorCallback);
} else {
Httpc::loop($callback, $errorCallback);
}
}
}
\ No newline at end of file
... ...
<?php
namespace Hood\Utils\Rpc;
class Httpc implements IFace
{
protected static $_callbackList = array();
protected $_url;
protected $_method;
protected $_concurrent = false;
protected $_format = 'php';
protected $_callback;
protected $_errorCallback;
protected $_timeout = 5;
protected static $loopTimeout = 5;
public function __construct($url)
{
if (empty($url)) {
throw new \Exception('调用地址不能为空');
}
$this->_url = $url;
$this->errorCallback(function ($errno, $error, $callinfo) {
});
}
public function __call($method, $args)
{
$this->method($method);
return call_user_func_array(array($this, 'exec'), $args);
}
/**
* 设置超时时间(秒)
* @param $second
* @return $this
*/
public function timeout($second)
{
if ((int)$second >= 1) {
$this->_timeout = (int)$second;
}
return $this;
}
/**
* 执行
* @return array|bool|mixed
*/
public function exec()
{
if (!$this->_concurrent) {
return $this->single(func_get_args());
} else {
if ($this->_format == 'php') {
$args = serialize(func_get_args());
} else {
$args = json_encode(func_get_args());
}
Httpc::$_callbackList[] = array(
'url' => $this->_url,
'callback' => $this->_callback,
'format' => $this->_format,
'method' => $this->_method,
'param' => $args,
'timeout' => $this->_timeout,
'errorCallback' => $this->_errorCallback
);
return true;
}
}
/**
* 串行时调用
* @param $args
* @return array|mixed
*/
protected function single($args)
{
if ($this->_format == 'php') {
$args = serialize($args);
} else {
$args = json_encode($args);
}
$data = array(
'method' => $this->_method,
'format' => $this->_format,
'param' => $args
);
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $this->_url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_HEADER, 0);
curl_setopt($ch, CURLOPT_HTTPHEADER, array('User-Agent:HTTPC'));
curl_setopt($ch, CURLOPT_TIMEOUT, $this->_timeout);
curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query($data, '', '&'));
$output = curl_exec($ch);
$callinfo = curl_getinfo($ch);
$error = curl_error($ch);
$errno = curl_errno($ch);
curl_close($ch);
if (empty($output)) {
return array();
}
if ($this->_format == 'php') {
$result = @unserialize($output);
} else {
$result = @json_decode($output, true);
}
if ($result === false) {
if (!empty($this->_errorCallback)) {
call_user_func($this->_errorCallback, $errno, $output.$error, $callinfo);
}
return $output;
}
return $result;
}
/**
* 设置调用的方法
* @param $method
* @return $this
* @throws \Exception
*/
public function method($method)
{
if (empty($method)) {
throw new \Exception('调用方法不能为空');
}
$this->_method = $method;
return $this;
}
/**
* 正常返回结果时回调
* @param $callback
* @return $this
*/
public function callback($callback)
{
if (!empty($callback) && is_callable($callback)) {
$this->_callback = $callback;
}
return $this;
}
/**
* 调用错误时回调, 回调时传入3个参数,curl的errno, 返回结果+curl错误信息,curl调用的信息
* @param $errorCallback
* @return $this
*/
public function errorCallback($errorCallback)
{
if (!empty($errorCallback) && is_callable($errorCallback)) {
$this->_errorCallback = $errorCallback;
}
return $this;
}
/**
* 是否并行
* @param bool|false $concurrent
* @return $this
*/
public function concurrent($concurrent=false)
{
$this->_concurrent = (bool)$concurrent;
return $this;
}
/**
* 返回一个curl资源
* @param $resources
* @return resource
*/
protected static function curl_handle($resources)
{
$data = array(
'method' => $resources['method'],
'format' => $resources['format'],
'param' => $resources['param']
);
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $resources['url']);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_HEADER, 0);
curl_setopt($ch, CURLOPT_HTTPHEADER, array('User-Agent:HTTPC'));
curl_setopt($ch, CURLOPT_TIMEOUT, $resources['timeout']);
curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query($data, '', '&'));
return $ch;
}
/**
* 发起并行请求
*/
public static function loop($callback=null, $errorCallback=null)
{
$master = curl_multi_init();
$requestMap = array();
foreach (self::$_callbackList as $key=>$val) {
$ch = self::curl_handle($val);
curl_multi_add_handle($master, $ch);
$requestMap[(string)$ch] = $key;
}
do {
while (($execrun = curl_multi_exec($master, $running)) == CURLM_CALL_MULTI_PERFORM) ;
if ($execrun != CURLM_OK) {
break;
}
// a request was just completed -- find out which one
while ($done = curl_multi_info_read($master)) {
// get the info and content returned on the request
$callinfo = curl_getinfo($done['handle']);
$output = curl_multi_getcontent($done['handle']);
$error = curl_error($done['handle']);
$errno = curl_errno($done['handle']);
$key = (string) $done['handle'];
self::$_callbackList[$requestMap[$key]]['output'] = $output;
self::$_callbackList[$requestMap[$key]]['callinfo'] = $callinfo;
self::$_callbackList[$requestMap[$key]]['error'] = $error;
self::$_callbackList[$requestMap[$key]]['errno'] = $errno;
// remove the curl handle that just completed
curl_multi_remove_handle($master, $done['handle']);
}
if ($running > 0) {
curl_multi_select($master, self::$loopTimeout);
}
} while ($running);
curl_multi_close($master);
foreach (self::$_callbackList as $val) {
if ($val['format'] == 'php' && $val['output'] !== false && $val['callinfo']['http_code'] == 200) {
$result = @unserialize($val['output']);
} else if ($val['output'] !== false && $val['callinfo']['http_code'] == 200) {
$result = @json_decode($val['output'], true);
} else {
$result = $val['output'];
}
if ($val['callinfo']['http_code'] != 200 && isset($val['errorCallback'])) {
call_user_func($val['errorCallback'], $val['errno'], $val['output'].$val['error'], $val['callinfo']);
} else if ($val['callinfo']['http_code'] == 200) {
call_user_func($val['callback'], $result, $val['callinfo']);
}
}
self::$_callbackList = array();
}
}
... ...
<?php
namespace Hood\Utils\Rpc;
interface IFace
{
public function __call($method, $args);
public function timeout($second);
public function exec();
public function method($method);
public function callback($callback);
public function errorCallback($errorCallback);
public function concurrent($concurrent);
public static function loop($callback, $errorCallback);
}
\ No newline at end of file
... ...
<?php
namespace Hood\Utils\Rpc;
class Yar implements IFace
{
protected static $_callbackList = array();
protected $_url;
protected $_method;
protected $_concurrent = false;
protected $_format = 'php';
protected $_callback;
protected $_errorCallback;
protected $_timeout = 5;
public function __construct($url)
{
if (empty($url)) {
throw new \Exception('地址不能为空!');
}
$this->_url = $url;
$this->errorCallback(function ($errno, $error, $callinfo) {
});
}
/**
* 设置要调用的远程方法名称
* @param $method
* @return $this
* @throws \Exception
*/
public function method($method)
{
if (empty($method)) {
throw new \Exception('方法不能为空');
}
$this->_method = $method;
return $this;
}
/**
* 设置超时时间(秒)
* @param $second
* @return $this
*/
public function timeout($second)
{
if ((int)$second >= 1) {
$this->_timeout = (int)$second * 1000;
}
return $this;
}
/**
* 远程调用
* @param $method
* @param $args
*/
public function __call($method, $args)
{
$this->method($method);
return call_user_func_array(array($this, 'exec'), $args);
}
/**
* 设置回调方法
* @param $callback
* @return $this
* @throws \Exception
*/
public function callback($callback)
{
if (!empty($callback) && is_callable($callback)) {
$this->_callback = $callback;
}
return $this;
}
/**
* 异常时的回调方法
* @param $errorCallback
* @return $this
* @throws \Exception
*/
public function errorCallback($errorCallback)
{
if (!empty($errorCallback) && is_callable($errorCallback)) {
$this->_errorCallback = $errorCallback;
}
return $this;
}
/**
* 设置是否并行
* @param $concurrent
* @return $this
*/
public function concurrent($concurrent)
{
$this->_concurrent = (bool)$concurrent;
return $this;
}
/**
* 串行时使用的执行,参数按接口的参数顺序
* @return bool
*/
public function exec()
{
if (empty($this->_method)) {
throw new \Exception('远程调用方法不能为空!');
}
if ($this->_concurrent) {//并行
\Yar_Concurrent_Client::call($this->_url, $this->_method, func_get_args(), $this->_callback, $this->_errorCallback, array('YAR_OPT_TIMEOUT'=>$this->_timeout, 'YAR_OPT_CONNECT_TIMEOUT'=>$this->_timeout));
return true;
} else {//串行
$client = new \Yar_Client($this->_url);
$client->SetOpt(YAR_OPT_CONNECT_TIMEOUT, $this->_timeout);
$client->SetOpt(YAR_OPT_TIMEOUT, $this->_timeout);
try {
return call_user_func_array(array($client, $this->_method), func_get_args());
} catch (\Exception $e) {
if (!empty($this->_errorCallback)) {
call_user_func_array($this->_errorCallback, array($e, $e, $e));
}
return $e->getMessage();
}
}
}
/**
* 发起请求
* @param null $callback
* @param null $errorCallback
*/
public static function loop($callback=null, $errorCallback=null)
{
\Yar_Concurrent_Client::loop($callback, $errorCallback);
}
}
... ...