GoMqClient.py 2.1 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: wuxiao
# @Date:   2016-05-24 13:37:45

from __init__ import *


q_name = 'log'     #队列名
ack_interval = Config.rabbitmq_ack_interval     #批量ack响应阀值

#连接rabbitmq
rabbitmq = rabbitmq_connect()
#连接influxdb
influxdb = influxdb_connect()
#连接mongodb
mongodb = mongodb_connect()

print "GoMqClient:Start.\n";

#开始消费消息数据
while True:
    try:
        method_frame, header_frame, body = rabbitmq.basic_get(q_name)
        if not body:
            time.sleep(0.05)
            continue
    except Exception as err:
        print err
        rabbitmq = rabbitmq_connect()
        continue

    try:
        #print method_frame.delivery_tag, method_frame, properties
        print " [Golang] Received %r" % (body,)
        write_log(" [Golang] Received %r\n\n" % (body,))

        data = json.loads(body)
        #print data
        
        #决定库表
        if data['group'] == 'error':
            db = 'error'
            measurement = Config.debug_channel+time.strftime('_%Y%m%d')

            json_body = dict(data['data'],**data['tags'])
            mongodb_insert(db,measurement,json_body)

        elif data['group'] == 'shutdown':
            db = 'error'
            measurement = Config.shutdown_channel+time.strftime('_%Y%m%d')

            json_body = dict(data['data'],**data['tags'])
            mongodb_insert(db,measurement,json_body)

        else:
            db = 'log'
            measurement = Config.info_channel+time.strftime('_%Y%m%d')

            #拼装入库数据
            json_body = [
                {
                    "measurement": measurement,
                    "tags": data['tags'],
                    #"time": "2009-11-10T23:00:00Z",
                    "fields": data['data']
                }
            ]

            influxdb_insert(db,json_body)


        #ack回应
        #rabbitmq.basic_ack(method_frame.delivery_tag)
        if method_frame.delivery_tag % ack_interval == 0:
            rabbitmq.basic_ack(multiple=True)

    except Exception as err:
        print err
        continue

connect_close()