GoMqClient.py
3.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: wuxiao
# @Date: 2016-05-24 13:37:45
from __init__ import *
import traceback
q_name = 'log' #队列名
ack_interval = Config.rabbitmq_ack_interval #批量ack响应阀值
#连接rabbitmq
rabbitmq = rabbitmq_connect()
#连接influxdb
influxdb = influxdb_connect()
#连接mongodb
mongodb = mongodb_connect()
print "GoMqClient:Start.\n";
#开始消费消息数据
while True:
try:
method_frame, header_frame, body = rabbitmq.basic_get(q_name)
if not body:
time.sleep(0.05)
continue
except Exception as err:
print err
rabbitmq = rabbitmq_connect()
continue
try:
#print method_frame.delivery_tag, method_frame, properties
print " [Golang] Received %r" % (body,)
write_log(" [Golang] Received %r\n" % (body,))
data = json.loads(body)
#print data
#决定库表
if data['group'] == 'error':
'''
错误日志
'''
db = 'error'
measurement = Config.debug_channel+time.strftime('_%Y%m%d')
json_body = dict(data['data'],**data['tags'])
if 'context' in json_body.keys():
json_body['context'] = json.loads(json_body['context'])
mongodb_insert(db,measurement,json_body)
elif data['group'] == 'shutdown':
'''
脚本强退日志
'''
db = 'error'
measurement = Config.shutdown_channel+time.strftime('_%Y%m%d')
json_body = dict(data['data'],**data['tags'])
if 'context' in json_body.keys():
json_body['context'] = json.loads(json_body['context'])
mongodb_insert(db,measurement,json_body)
elif data['group'] == 'info':
'''
自定义字段日志,可能会比较乱,采用mongodb
'''
db = 'info'
measurement = Config.info_channel+time.strftime('_%Y%m%d')
json_body = dict(data['data'],**data['tags'])
mongodb_insert(db,measurement,json_body)
elif data['group'] == 'performance':
'''
性能调试日志
'''
pre = data['data']['creator'] if 'creator' in data['data'].keys() else ''
write_performance(body,pre)
elif data['group'] == 'api':
'''
api调用日志
'''
db = 'api'
measurement = Config.log_channel+time.strftime('_%Y%m%d')
#拼装入库数据
json_body = {
"measurement": measurement,
"tags": data['tags'],
#"time": "2009-11-10T23:00:00Z",
"fields": data['data']
}
influxdb_insert(db,measurement,json_body)
else:
'''
信息记录日志
'''
db = 'log'
measurement = Config.log_channel+time.strftime('_%Y%m%d')
#拼装入库数据
json_body = {
"measurement": measurement,
"tags": data['tags'],
#"time": "2009-11-10T23:00:00Z",
"fields": data['data']
}
influxdb_insert(db,measurement,json_body)
#ack回应
#rabbitmq.basic_ack(method_frame.delivery_tag)
if method_frame.delivery_tag % ack_interval == 0:
rabbitmq.basic_ack(multiple=True)
except Exception as err:
#print err
_, exc_value, exc_tb = sys.exc_info()
print exc_value
for filename, linenum, funcname, source in traceback.extract_tb(exc_tb):
print "%-23s:%s '%s' in %s()" % (filename, linenum, source, funcname)
continue
connect_close()