Yar.php
3.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
<?php
namespace Hood\Utils\Rpc;
class Yar implements IFace
{
protected static $_callbackList = array();
protected $_url;
protected $_method;
protected $_concurrent = false;
protected $_format = 'php';
protected $_callback;
protected $_errorCallback;
protected $_timeout = 5;
public function __construct($url)
{
if (empty($url)) {
throw new \Exception('地址不能为空!');
}
$this->_url = $url;
$this->errorCallback(function ($errno, $error, $callinfo) {
});
}
/**
* 设置要调用的远程方法名称
* @param $method
* @return $this
* @throws \Exception
*/
public function method($method)
{
if (empty($method)) {
throw new \Exception('方法不能为空');
}
$this->_method = $method;
return $this;
}
/**
* 设置超时时间(秒)
* @param $second
* @return $this
*/
public function timeout($second)
{
if ((int)$second >= 1) {
$this->_timeout = (int)$second * 1000;
}
return $this;
}
/**
* 远程调用
* @param $method
* @param $args
*/
public function __call($method, $args)
{
$this->method($method);
return call_user_func_array(array($this, 'exec'), $args);
}
/**
* 设置回调方法
* @param $callback
* @return $this
* @throws \Exception
*/
public function callback($callback)
{
if (!empty($callback) && is_callable($callback)) {
$this->_callback = $callback;
}
return $this;
}
/**
* 异常时的回调方法
* @param $errorCallback
* @return $this
* @throws \Exception
*/
public function errorCallback($errorCallback)
{
if (!empty($errorCallback) && is_callable($errorCallback)) {
$this->_errorCallback = $errorCallback;
}
return $this;
}
/**
* 设置是否并行
* @param $concurrent
* @return $this
*/
public function concurrent($concurrent)
{
$this->_concurrent = (bool)$concurrent;
return $this;
}
/**
* 串行时使用的执行,参数按接口的参数顺序
* @return bool
*/
public function exec()
{
if (empty($this->_method)) {
throw new \Exception('远程调用方法不能为空!');
}
if ($this->_concurrent) {//并行
\Yar_Concurrent_Client::call($this->_url, $this->_method, func_get_args(), $this->_callback, $this->_errorCallback, array('YAR_OPT_TIMEOUT' => $this->_timeout, 'YAR_OPT_CONNECT_TIMEOUT' => $this->_timeout));
return true;
} else {//串行
try {
$client = new \Yar_Client($this->_url);
$client->SetOpt(YAR_OPT_CONNECT_TIMEOUT, $this->_timeout);
$client->SetOpt(YAR_OPT_TIMEOUT, $this->_timeout);
return call_user_func_array(array($client, $this->_method), func_get_args());
} catch (\Exception $e) {
if (!empty($this->_errorCallback)) {
call_user_func_array($this->_errorCallback, array($e, $e, $e));
}
throw new \Exception($e->getMessage(), $e->getCode());
}
}
}
/**
* 发起请求
* @param null $callback
* @param null $errorCallback
*/
public static function loop($callback = null, $errorCallback = null)
{
\Yar_Concurrent_Client::loop($callback, $errorCallback);
\Yar_Concurrent_Client::reset();
}
}