GoMqClient.py 3.6 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: wuxiao
# @Date:   2016-05-24 13:37:45

from __init__ import *


q_name = 'log'     #队列名
ack_interval = Config.rabbitmq_ack_interval     #批量ack响应阀值

#连接rabbitmq
rabbitmq = rabbitmq_connect()
#连接influxdb
influxdb = influxdb_connect()
#连接mongodb
mongodb = mongodb_connect()

print "GoMqClient:Start.\n";

#开始消费消息数据
while True:
    try:
        method_frame, header_frame, body = rabbitmq.basic_get(q_name)
        if not body:
            time.sleep(0.05)
            continue
    except Exception as err:
        print err
        rabbitmq = rabbitmq_connect()
        continue

    try:
        #print method_frame.delivery_tag, method_frame, properties
        print " [Golang] Received %r" % (body,)
        write_log(" [Golang] Received %r\n" % (body,))

        data = json.loads(body)
        #print data
        
        #决定库表
        if data['group'] == 'error':
            '''
            错误日志
            '''
            db = 'error'
            measurement = Config.debug_channel+time.strftime('_%Y%m%d')

            json_body = dict(data['data'],**data['tags'])
            if 'context' in json_body.keys():
                json_body['context'] = json.loads(json_body['context'])
            mongodb_insert(db,measurement,json_body)

        elif data['group'] == 'shutdown':
            '''
            脚本强退日志
            '''
            db = 'error'
            measurement = Config.shutdown_channel+time.strftime('_%Y%m%d')

            json_body = dict(data['data'],**data['tags'])
            if 'context' in json_body.keys():
                json_body['context'] = json.loads(json_body['context'])
            mongodb_insert(db,measurement,json_body)

        elif data['group'] == 'info':
            '''
            自定义字段日志,可能会比较乱,采用mongodb
            '''
            db = 'info'
            measurement = Config.info_channel+time.strftime('_%Y%m%d')

            json_body = dict(data['data'],**data['tags'])
            mongodb_insert(db,measurement,json_body)

        elif data['group'] == 'performance':
            '''
            性能调试日志
            '''

            pre = data['data']['creator'] if 'creator' in data['data'].keys() else ''
            write_performance(body,pre)

        elif data['group'] == 'api':
            '''
            api调用日志
            '''
            db = 'api'
            measurement = Config.log_channel+time.strftime('_%Y%m%d')

            #拼装入库数据
            json_body = [
                {
                    "measurement": measurement,
                    "tags": data['tags'],
                    #"time": "2009-11-10T23:00:00Z",
                    "fields": data['data']
                }
            ]

            influxdb_insert(db,json_body)

        else:
            '''
            信息记录日志
            '''
            db = 'log'
            measurement = Config.log_channel+time.strftime('_%Y%m%d')

            #拼装入库数据
            json_body = [
                {
                    "measurement": measurement,
                    "tags": data['tags'],
                    #"time": "2009-11-10T23:00:00Z",
                    "fields": data['data']
                }
            ]

            influxdb_insert(db,json_body)


        #ack回应
        #rabbitmq.basic_ack(method_frame.delivery_tag)
        if method_frame.delivery_tag % ack_interval == 0:
            rabbitmq.basic_ack(multiple=True)

    except Exception as err:
        print err
        continue

connect_close()