MqClient.py
2.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: wuxiao
# @Date: 2016-05-24 13:37:45
# @Last Modified by: anchen
# @Last Modified time: 2016-06-17 14:19:07
import os
import sys
__FILE__ = os.path.abspath(sys.argv[0])
__DIR__ = os.path.dirname(os.path.abspath(sys.argv[0]))
import json
import time
import pika
from influxdb import InfluxDBClient
from pymongo import MongoClient
from lupa import LuaRuntime
from urllib import quote
'''
读取配置信息
'''
lua = LuaRuntime(unpack_returned_tuples=True)
lua.execute(open(__DIR__+'/Config.lua').read(-1))
Config = lua.globals()
q_name = Config.rabbitmq_queue_name #队列名
ack_interval = Config.rabbitmq_ack_interval #批量ack响应阀值
#连接rabbitmq
credentials = pika.PlainCredentials(Config.rabbitmq_user, Config.rabbitmq_pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(
Config.rabbitmq_host,Config.rabbitmq_port,Config.rabbitmq_vhost,credentials))
if not connection:
exit("Cannot connect to the broker!\n")
channel = connection.channel()
channel.queue_declare(queue=q_name, durable=True)
#连接influxdb
influxdb = InfluxDBClient(Config.influxdb_host, Config.influxdb_port, timeout=1)
#连接mongodb
mongodb = MongoClient( 'mongodb://%s:%s@%s:%d/' % ( quote(Config.mongodb_user),quote(Config.mongodb_pass),Config.mongodb_host,Config.mongodb_port ) )
print "MqClient:Start.\n";
#开始消费消息数据
for method_frame, properties, body in channel.consume(q_name):
try:
#print method_frame.delivery_tag, method_frame, properties
print " [HTTP] Received %r" % (body,)
fp = open(Config.log_file,'a')
fp.write(" [HTTP] Received %r\n\n" % (body,))
fp.close()
data = json.loads(body)
#print data
#决定库表
if data['type'] == 'error':
db = 'error'
measurement = Config.debug_channel+time.strftime('_%Y%m%d')
elif data['type'] == 'shutdown':
db = 'error'
measurement = Config.shutdown_channel+time.strftime('_%Y%m%d')
else:
db = 'log'
measurement = Config.info_channel+time.strftime('_%Y%m%d')
#拼装入库数据
json_body = [
{
"measurement": measurement,
"tags": data['tags'],
#"time": "2009-11-10T23:00:00Z",
"fields": data['data']
}
]
#选择库,写入
influxdb.switch_database(db)
influxdb.write_points(json_body)
#mongodb选择库和集合,写入
mongodb[db][measurement].insert_one(data['data'])
#ack回应
#channel.basic_ack(method_frame.delivery_tag)
if method_frame.delivery_tag % ack_interval == 0:
channel.basic_ack(multiple=True)
except Exception as err:
channel.basic_ack(method_frame.delivery_tag)
print err
continue
channel.cancel()
connection.close()