GoMqClient.py
2.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: wuxiao
# @Date: 2016-05-24 13:37:45
from __init__ import *
q_name = 'log' #队列名
ack_interval = Config.rabbitmq_ack_interval #批量ack响应阀值
#连接rabbitmq
rabbitmq = rabbitmq_connect()
#连接influxdb
influxdb = influxdb_connect()
#连接mongodb
mongodb = mongodb_connect()
print "GoMqClient:Start.\n";
#开始消费消息数据
while True:
try:
method_frame, header_frame, body = rabbitmq.basic_get(q_name)
if not body:
time.sleep(0.05)
continue
except Exception as err:
print err
rabbitmq = rabbitmq_connect()
continue
try:
#print method_frame.delivery_tag, method_frame, properties
print " [Golang] Received %r" % (body,)
write_log(" [Golang] Received %r\n\n" % (body,))
data = json.loads(body)
#print data
#决定库表
if data['group'] == 'error':
db = 'error'
measurement = Config.debug_channel+time.strftime('_%Y%m%d')
json_body = dict(data['data'],**data['tags'])
mongodb_insert(db,measurement,json_body)
elif data['group'] == 'shutdown':
db = 'error'
measurement = Config.shutdown_channel+time.strftime('_%Y%m%d')
json_body = dict(data['data'],**data['tags'])
mongodb_insert(db,measurement,json_body)
else:
db = 'log'
measurement = Config.info_channel+time.strftime('_%Y%m%d')
#拼装入库数据
json_body = [
{
"measurement": measurement,
"tags": data['tags'],
#"time": "2009-11-10T23:00:00Z",
"fields": data['data']
}
]
influxdb_insert(db,json_body)
#ack回应
#rabbitmq.basic_ack(method_frame.delivery_tag)
if method_frame.delivery_tag % ack_interval == 0:
rabbitmq.basic_ack(multiple=True)
except Exception as err:
print err
continue
connect_close()