init_config_worker.lua
8.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
local http = require "resty.http"
local cjson = require "cjson"
local rate_limit_conf_url="http://config.server.yohoops.org/nginx_limit_api/default"
local limit_ip_access_url="http://config.server.yohoops.org/nginx_limit_ip/default"
local nginx_common_conf_url="http://config.server.yohoops.org/nginx_common_conf/default"
local limit_ip_redis=lua_context["redis_limit_ip"]
function split_str_list(str,spliter)
local ips={}
if not str then
return ips
end
string.gsub(str,'[^' .. spliter ..']+',function(w) table.insert(ips, w) end )
return ips
end
function toboolean(str,default)
if str == nil then
return default
end
if str == "true" then
return true
else
return false
end
end
local query_common_conf=function()
local httpc = http:new()
local ok, code, headers, status, body = httpc:request{
url=nginx_common_conf_url,
method="GET",
timeout=120000
}
if ok then
local rate_limit_conf=cjson.decode(body)
local property=rate_limit_conf["propertySources"]
if property then
local property=property[1]
if property then
local source=property["source"]
if source then
local common_conf={}
common_conf.lua_golbal_switch=source["lua_golbal_switch"]
lua_context.lua_conf_cache:set("common_conf",cjson.encode(common_conf))
end
end
end
else
ngx.log(ngx.ERR, "request error:" .. tostring(code))
end
end
-->>begin: query rate limit config function definition
local query_rate_limit_conf=function()
local httpc = http:new()
local ok, code, headers, status, body = httpc:request{
url=rate_limit_conf_url,
method="GET",
timeout=120000
}
if ok then
local rate_limit_conf=cjson.decode(body)
local property=rate_limit_conf["propertySources"]
if property then
local property=property[1]
if property then
local api_rate_limit_conf={api_rate_limit={}}
local source=property["source"]
api_rate_limit_conf["is_open"]=source["open_limit_flow"]
for k,v in pairs(source) do
if string.find(k,"^api_rate_limit*") then
local t=split_str_list(k,".")
table.remove(t,1)
local key=table.concat(t,".")
local vals=split_str_list(v,",")
api_rate_limit_conf.api_rate_limit[key]={tonumber(vals[1]),tonumber(vals[2]),vals[3]}
end
end
lua_context.lua_conf_cache:set("api_rate_limit_conf",cjson.encode(api_rate_limit_conf))
end
end
else
ngx.log(ngx.ERR, "request error:" .. tostring(code))
end
end
--<<end: query rate limit config function definition
-->>begin: query limit ip access config function definition
local query_limit_ip_access_conf=function()
local httpc = http:new()
local ok, code, headers, status, body = httpc:request{
url=limit_ip_access_url,
method="GET",
timeout=120000
}
if ok then
--ngx.log(ngx.ERR, ">>>>>>>>>>>>>>" .. tostring(body))
local rate_limit_conf=cjson.decode(body)
local property=rate_limit_conf["propertySources"]
if property then
local property=property[1]
if property then
local source=property["source"]
if source then
local limit_ip_access={}
local is_open=source["is_open"]
limit_ip_access["is_open"]=is_open
local ip_qps_limit=source["ip_qps_limit"]
ip_qps_limit=split_str_list(ip_qps_limit,",")
local ip_qps_limit_table={}
table.insert(ip_qps_limit_table,tonumber(ip_qps_limit[1]))
table.insert(ip_qps_limit_table,tonumber(ip_qps_limit[2]))
limit_ip_access["ip_qps_limit"]=ip_qps_limit_table
local interface_ip_qps_limit={}
local white_ips={}
for k,v in pairs(source) do
if string.find(k,"^interface_ip_qps_limit%[*") then
local t=split_str_list(k,".")
table.remove(t,1)
local key=table.concat(t,".")
local vals=split_str_list(v,",")
interface_ip_qps_limit[key]={tonumber(vals[1]),tonumber(vals[2])}
end
if string.find(k,"^white_ips%[*") then
table.insert(white_ips,v)
end
end
limit_ip_access["white_ips"]=white_ips
limit_ip_access["interface_ip_qps_limit"]=interface_ip_qps_limit
lua_context.lua_conf_cache:set("limit_ip_access",cjson.encode(limit_ip_access))
end
end
end
else
ngx.log(ngx.ERR, "request error:" .. tostring(code))
end
end
--<<end: query limit ip access config function definition
-->>begin: subscribe ip blacklist event
local cache=lua_context.mal_ip_cache
local subscribe_mal_ips=function()
if ngx.worker.id() ~= 0 then
return false
end
local connect=limit_ip_redis:getConnect()
if not connect then
ngx.log(ngx.ERR,"subscribe blacklist ip get connection err" )
return false
end
local res, err=connect:subscribe("mal_ips")
if not res then
connect:close()
ngx.log(ngx.ERR,"subscribe blacklist ip connection subscribe err:" .. tostring(err))
return false
end
connect:set_timeout(86400000)
while true do
local res, err = connect:read_reply()
if res then
if res[3] then
local t=cjson.decode(res[3])
local ips=t.ips
local expire=(not t.expire) and 43200 or t.expire
if t.type == "add" then
for ip in string.gmatch(ips,"[^',']+") do
cache:set("yh:mip:" .. ip,"1",expire)
ngx.log(ngx.INFO,"nginx subscribe add mal ip:" .. tostring(ip) .. ":" .. tostring(expire))
end
elseif t.type == "del" then
for ip in string.gmatch(ips,"[^',']+") do
cache:delete("yh:mip:" .. ip)
ngx.log(ngx.INFO,"nginx subscribe del mal ip:" .. tostring(ip) .. ":" .. tostring(expire))
end
elseif t.type == "flush" then
cache:flush_all()
ngx.log(ngx.INFO,"nginx subscribe flush all mal ip")
end
end
elseif err ~= "timeout" then
connect:close()
ngx.log(ngx.ERR,"subscribe blacklist ip socket timeout")
return false
end
if ngx.worker.exiting() then
connect:close()
ngx.log(ngx.ERR,"subscribe blacklist ip ngx worker exit")
return false
end
end
return false
end
--<< end: subscribe ip blacklist event
function subscribe_mal_ips_loop()
if ngx.worker.id() ~= 0 then
return
end
local b = ture
while true do
local res=subscribe_mal_ips()
-- subscribe error sleep 10 seconds and then retry
ngx.sleep(10)
if ngx.worker.exiting() then
return
end
end
end
-->>begin: timer at fix rate call function.
local timer_handler
timer_handler=function(premature,t,f,id)
if id then
if ngx.worker.id() == id then
local b,errinfo=pcall(f)
if not b then
ngx.log(ngx.ERR, "task request error:" .. tostring(errinfo))
end
end
else
local b,errinfo=pcall(f)
if not b then
ngx.log(ngx.ERR, "task request error:" .. tostring(errinfo))
end
end
ngx.timer.at(t,timer_handler,t,f)
end
--<<end: timer at fix rate call function.
-- subscribe mal ips task
ngx.timer.at(2,subscribe_mal_ips_loop)
timer_handler(true,20,query_rate_limit_conf,0)
timer_handler(true,25,query_limit_ip_access_conf,0)
timer_handler(true,30,query_common_conf,0)
-- every worker timing schedule configs from share cache
function rate_limit_conf_to_worker()
local t=lua_context.lua_conf_cache:get("api_rate_limit_conf")
if t then
local r=cjson.decode(t)
if r then
lua_context.configs["api_rate_limit_conf"]=r
--ngx.log(ngx.INFO,"++++++++++++++" .. cjson.encode(r.api_rate_limit["web.passport.getUserVerifyInfo"]))
end
end
end
function limit_ip_access_conf_to_worker()
local t=lua_context.lua_conf_cache:get("limit_ip_access")
if t then
local r=cjson.decode(t)
if r then
r["white_method"]={"app.graphic.img","app.graphic.verify"}
lua_context.configs["limit_ip_access"]=r
--ngx.log(ngx.INFO,"++++++++++++++" .. cjson.encode(lua_context.configs["limit_ip_access"]))
end
end
end
function query_common_conf_to_worker()
local t=lua_context.lua_conf_cache:get("common_conf")
if t then
local r=cjson.decode(t)
if r then
lua_context.configs["common_conf"]=r
end
end
end
timer_handler(true,2,rate_limit_conf_to_worker)
timer_handler(true,2,limit_ip_access_conf_to_worker)
timer_handler(true,2,query_common_conf_to_worker)