MqClient.py 2.92 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: wuxiao
# @Date:   2016-05-24 13:37:45
# @Last Modified by:   anchen
# @Last Modified time: 2016-06-17 14:19:07

import os
import sys
__FILE__ = os.path.abspath(sys.argv[0])
__DIR__ = os.path.dirname(os.path.abspath(sys.argv[0]))

import json
import time
import pika
from influxdb import InfluxDBClient
from pymongo import MongoClient
from lupa import LuaRuntime
from urllib import quote

'''
读取配置信息
'''
lua = LuaRuntime(unpack_returned_tuples=True)
lua.execute(open(__DIR__+'/Config.lua').read(-1))
Config = lua.globals()

q_name = Config.rabbitmq_queue_name     #队列名
ack_interval = Config.rabbitmq_ack_interval     #批量ack响应阀值

#连接rabbitmq
credentials = pika.PlainCredentials(Config.rabbitmq_user, Config.rabbitmq_pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(
    Config.rabbitmq_host,Config.rabbitmq_port,Config.rabbitmq_vhost,credentials))
if not connection:
    exit("Cannot connect to the broker!\n")
channel = connection.channel()
channel.queue_declare(queue=q_name, durable=True)

#连接influxdb
influxdb = InfluxDBClient(Config.influxdb_host, Config.influxdb_port, timeout=1)

#连接mongodb
mongodb = MongoClient( 'mongodb://%s:%s@%s:%d/' % ( quote(Config.mongodb_user),quote(Config.mongodb_pass),Config.mongodb_host,Config.mongodb_port ) )

print "MqClient:Start.\n";

#开始消费消息数据
for method_frame, properties, body in channel.consume(q_name):
    try:
    
        #print method_frame.delivery_tag, method_frame, properties
        print " [HTTP] Received %r" % (body,)
        fp = open(Config.log_file,'a')
        fp.write(" [HTTP] Received %r\n\n" % (body,))
        fp.close()

        data = json.loads(body)
        #print data

        #决定库表
        if data['type'] == 'error':
            db = 'error'
            measurement = Config.debug_channel+time.strftime('_%Y%m%d')
        elif data['type'] == 'shutdown':
            db = 'error'
            measurement = Config.shutdown_channel+time.strftime('_%Y%m%d')
        else:
            db = 'log'
            measurement = Config.info_channel+time.strftime('_%Y%m%d')

        #拼装入库数据
        json_body = [
            {
                "measurement": measurement,
                "tags": data['tags'],
                #"time": "2009-11-10T23:00:00Z",
                "fields": data['data']
            }
        ]

        #选择库,写入
        influxdb.switch_database(db)
        influxdb.write_points(json_body)

        #mongodb选择库和集合,写入
        mongodb[db][measurement].insert_one(data['data'])

        #ack回应
        #channel.basic_ack(method_frame.delivery_tag)
        if method_frame.delivery_tag % ack_interval == 0:
            channel.basic_ack(multiple=True)

    except Exception as err:
        channel.basic_ack(method_frame.delivery_tag)
        print err
        continue
    

channel.cancel()
connection.close()