GoMqClient.py
4.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: wuxiao
# @Date: 2016-05-24 13:37:45
from __init__ import *
import traceback
q_name = 'log' #队列名
#连接rabbitmq
rabbitmq = rabbitmq_connect()
print "GoMqClient:Start.\n";
#开始消费消息数据
insert_data = {}
while True:
try:
method_frame, header_frame, body = rabbitmq.basic_get(q_name)
if not body:
time.sleep(0.05)
continue
#ack回应
#rabbitmq.basic_ack(method_frame.delivery_tag)
if method_frame.delivery_tag % Config.rabbitmq_ack_interval == 0:
rabbitmq.basic_ack(multiple=True)
except Exception as err:
print err
rabbitmq = rabbitmq_connect()
continue
try:
#print method_frame.delivery_tag, method_frame, properties
#print " [Golang] Received %r" % (body,)
write_log(" [Golang] Received %r\n" % (body,))
data = json.loads(body)
#print data
#决定库表
if data['group'] == 'error':
'''
错误日志
'''
db = 'error'
measurement = Config.debug_channel+time.strftime('_%Y%m%d')
json_body = dict(data['data'],**data['tags'])
if 'context' in json_body.keys():
json_body['context'] = json.loads(json_body['context'])
middleware = 'mongodb'
elif data['group'] == 'shutdown':
'''
脚本强退日志
'''
db = 'error'
measurement = Config.shutdown_channel+time.strftime('_%Y%m%d')
json_body = dict(data['data'],**data['tags'])
if 'context' in json_body.keys():
json_body['context'] = json.loads(json_body['context'])
middleware = 'mongodb'
elif data['group'] == 'info':
'''
自定义字段日志,可能会比较乱,采用mongodb
'''
db = 'info'
measurement = Config.info_channel+time.strftime('_%Y%m%d')
json_body = dict(data['data'],**data['tags'])
if json_body['message'] == 'sdhfsodifjoisdfsoi':
print json_body
middleware = 'mongodb'
elif data['group'] == 'performance':
'''
性能调试日志
'''
pre = data['data']['creator'] if 'creator' in data['data'].keys() else ''
write_performance(body,pre)
continue
elif data['group'] == 'api':
'''
api调用日志
'''
db = 'api'
measurement = Config.log_channel+time.strftime('_%Y%m%d')
#拼装入库数据
json_body = {
"measurement": measurement,
"tags": data['tags'],
#"time": "2009-11-10T23:00:00Z",
"fields": data['data']
}
middleware = 'influxdb'
else:
'''
信息记录日志
'''
db = 'log'
measurement = Config.log_channel+time.strftime('_%Y%m%d')
#拼装入库数据
json_body = {
"measurement": measurement,
"tags": data['tags'],
#"time": "2009-11-10T23:00:00Z",
"fields": data['data']
}
middleware = 'influxdb'
if middleware not in insert_data.keys():
insert_data[middleware] = {}
if db not in insert_data[middleware].keys():
insert_data[middleware][db] = {}
if measurement not in insert_data[middleware][db].keys():
q = Queue()
q.middleware = middleware
q.db = db
q.measurement = measurement
Timer(Config.multi_insert_timeout,data_pull_timer,[q]).start()
insert_data[middleware][db][measurement] = q
queue = insert_data[middleware][db][measurement]
queue.push(json_body)
if queue.size() >= Config.multi_insert_limit:
data_pull(queue)
except Exception as err:
#print err
_, exc_value, exc_tb = sys.exc_info()
print exc_value
for filename, linenum, funcname, source in traceback.extract_tb(exc_tb):
print "%-23s:%s '%s' in %s()" % (filename, linenum, source, funcname)
continue
connect_close()