MqClient.py
2.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: wuxiao
# @Date: 2016-05-24 13:37:45
# @Last Modified by: anchen
# @Last Modified time: 2016-05-24 18:40:09
import json
import time
import pika
from influxdb import InfluxDBClient
q_name = 'phplog' #队列名
ack_interval = 10 #批量ack响应阀值
#配置
class Config:
rabbitmq_host = '54.222.219.223'
rabbitmq_port = 5672
rabbitmq_user = 'guest'
rabbitmq_pass = 'guest'
rabbitmq_vhost = '/'
influxdb_host = '54.222.219.223'
influxdb_port = 8086
info_channel = 'YHInfo'
debug_channel = 'YHDebug'
shutdown_channel = 'YHShutdown'
#连接rabbitmq
credentials = pika.PlainCredentials(Config.rabbitmq_user, Config.rabbitmq_pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(
Config.rabbitmq_host,Config.rabbitmq_port,Config.rabbitmq_vhost,credentials))
if not connection:
exit("Cannot connect to the broker!\n")
channel = connection.channel()
channel.queue_declare(queue=q_name, durable=True)
#连接influxdb
influxdb = InfluxDBClient(Config.influxdb_host, Config.influxdb_port, timeout=1)
print "MqClient:Start.\n";
#开始消费消息数据
for method_frame, properties, body in channel.consume(q_name):
try:
#print method_frame.delivery_tag, method_frame, properties
print " [x] Received %r" % (body,)
data = json.loads(body)
#print data
#决定库表
if data['type'] == 'error':
db = 'error'
measurement = Config.debug_channel+time.strftime('_%Y%m%d')
elif data['type'] == 'shutdown':
db = 'error'
measurement = Config.shutdown_channel+time.strftime('_%Y%m%d')
else:
db = 'log'
measurement = Config.info_channel+time.strftime('_%Y%m%d')
#拼装入库数据
json_body = [
{
"measurement": measurement,
"tags": data['tags'],
#"time": "2009-11-10T23:00:00Z",
"fields": data['data']
}
]
#选择库,写入
influxdb.switch_database(db)
influxdb.write_points(json_body)
#ack回应
#channel.basic_ack(method_frame.delivery_tag)
if method_frame.delivery_tag % ack_interval == 0:
channel.basic_ack(multiple=True)
except Exception as err:
channel.basic_ack(method_frame.delivery_tag)
print err
continue
channel.cancel()
connection.close()