|
|
#!/usr/bin/env python
|
|
|
# -*- coding: utf-8 -*-
|
|
|
# @Author: wuxiao
|
|
|
# @Date: 2016-05-24 13:37:45
|
|
|
# @Last Modified by: anchen
|
|
|
# @Last Modified time: 2016-05-24 18:40:09
|
|
|
|
|
|
import os
|
|
|
import sys
|
|
|
__FILE__ = os.path.abspath(sys.argv[0])
|
|
|
__DIR__ = os.path.dirname(os.path.abspath(sys.argv[0]))
|
|
|
|
|
|
import json
|
|
|
import time
|
|
|
import pika
|
|
|
from influxdb import InfluxDBClient
|
|
|
from lupa import LuaRuntime
|
|
|
|
|
|
'''
|
|
|
读取配置信息
|
|
|
'''
|
|
|
lua = LuaRuntime(unpack_returned_tuples=True)
|
|
|
lua.execute(open(__DIR__+'/Config.lua').read(-1))
|
|
|
Config = lua.globals()
|
|
|
|
|
|
q_name = 'log' #队列名
|
|
|
ack_interval = Config.rabbitmq_ack_interval #批量ack响应阀值
|
|
|
|
|
|
#连接rabbitmq
|
|
|
credentials = pika.PlainCredentials(Config.rabbitmq_user, Config.rabbitmq_pass)
|
|
|
connection = pika.BlockingConnection(pika.ConnectionParameters(
|
|
|
Config.rabbitmq_host,Config.rabbitmq_port,Config.rabbitmq_vhost,credentials))
|
|
|
if not connection:
|
|
|
exit("Cannot connect to the broker!\n")
|
|
|
channel = connection.channel()
|
|
|
|
|
|
#连接influxdb
|
|
|
influxdb = InfluxDBClient(Config.influxdb_host, Config.influxdb_port, timeout=1)
|
|
|
|
|
|
print "MqClient:Start.\n";
|
|
|
|
|
|
#开始消费消息数据
|
|
|
for method_frame, properties, body in channel.consume(q_name):
|
|
|
try:
|
|
|
|
|
|
#print method_frame.delivery_tag, method_frame, properties
|
|
|
print " [x] Received %r" % (body,)
|
|
|
data = json.loads(body)
|
|
|
#print data
|
|
|
|
|
|
#决定库表
|
|
|
if data['group'] == 'error':
|
|
|
db = 'error'
|
|
|
measurement = Config.debug_channel+time.strftime('_%Y%m%d')
|
|
|
elif data['group'] == 'shutdown':
|
|
|
db = 'error'
|
|
|
measurement = Config.shutdown_channel+time.strftime('_%Y%m%d')
|
|
|
else:
|
|
|
db = 'log'
|
|
|
measurement = Config.info_channel+time.strftime('_%Y%m%d')
|
|
|
|
|
|
#拼装入库数据
|
|
|
json_body = [
|
|
|
{
|
|
|
"measurement": measurement,
|
|
|
"tags": data['tags'],
|
|
|
#"time": "2009-11-10T23:00:00Z",
|
|
|
"fields": data['data']
|
|
|
}
|
|
|
]
|
|
|
|
|
|
#选择库,写入
|
|
|
influxdb.switch_database(db)
|
|
|
influxdb.write_points(json_body)
|
|
|
|
|
|
#ack回应
|
|
|
#channel.basic_ack(method_frame.delivery_tag)
|
|
|
if method_frame.delivery_tag % ack_interval == 0:
|
|
|
channel.basic_ack(multiple=True)
|
|
|
|
|
|
except Exception as err:
|
|
|
channel.basic_ack(method_frame.delivery_tag)
|
|
|
print err
|
|
|
continue
|
|
|
|
|
|
|
|
|
channel.cancel()
|
|
|
connection.close() |
|
|
\ No newline at end of file |
...
|
...
|
|