GoMqClient.py 4.28 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: wuxiao
# @Date:   2016-05-24 13:37:45

from __init__ import *
import traceback

q_name = 'log'     #队列名

#连接rabbitmq
rabbitmq = rabbitmq_connect()

print "GoMqClient:Start.\n";

#开始消费消息数据
insert_data = {}
Timer(Config.multi_insert_timeout,data_pull_timer,[insert_data]).start()
while True:
    try:
        method_frame, header_frame, body = rabbitmq.basic_get(q_name)
        if not body:
            time.sleep(0.05)
            continue

        #ack回应
        #rabbitmq.basic_ack(method_frame.delivery_tag)
        if method_frame.delivery_tag % Config.rabbitmq_ack_interval == 0:
            rabbitmq.basic_ack(multiple=True)

    except Exception as err:
        print err
        rabbitmq = rabbitmq_connect()
        continue

    try:
        #print method_frame.delivery_tag, method_frame, properties
        print " [Golang] Received %r" % (body,)
        write_log(" [Golang] Received %r\n" % (body,))

        data = json.loads(body)
        #print data
        
        #决定库表
        if data['group'] == 'error':
            '''
            错误日志
            '''
            db = 'error'
            measurement = Config.debug_channel+time.strftime('_%Y%m%d')

            json_body = dict(data['data'],**data['tags'])
            if 'context' in json_body.keys():
                json_body['context'] = json.loads(json_body['context'])

            middleware = 'mongodb'

        elif data['group'] == 'shutdown':
            '''
            脚本强退日志
            '''
            db = 'error'
            measurement = Config.shutdown_channel+time.strftime('_%Y%m%d')

            json_body = dict(data['data'],**data['tags'])
            if 'context' in json_body.keys():
                json_body['context'] = json.loads(json_body['context'])
            
            middleware = 'mongodb'

        elif data['group'] == 'info':
            '''
            自定义字段日志,可能会比较乱,采用mongodb
            '''
            db = 'info'
            measurement = Config.info_channel+time.strftime('_%Y%m%d')

            json_body = dict(data['data'],**data['tags'])
            
            middleware = 'mongodb'

        elif data['group'] == 'performance':
            '''
            性能调试日志
            '''

            pre = data['data']['creator'] if 'creator' in data['data'].keys() else ''
            write_performance(body,pre)

            continue

        elif data['group'] == 'api':
            '''
            api调用日志
            '''
            db = 'api'
            measurement = Config.log_channel+time.strftime('_%Y%m%d')

            #拼装入库数据
            json_body = {
                "measurement": measurement,
                "tags": data['tags'],
                #"time": "2009-11-10T23:00:00Z",
                "fields": data['data']
            }

            middleware = 'influxdb'

        else:
            '''
            信息记录日志
            '''
            db = 'log'
            measurement = Config.log_channel+time.strftime('_%Y%m%d')

            #拼装入库数据
            json_body = {
                "measurement": measurement,
                "tags": data['tags'],
                #"time": "2009-11-10T23:00:00Z",
                "fields": data['data']
            }

            middleware = 'influxdb'

        if middleware not in insert_data.keys():
            insert_data[middleware] = {}
        if db not in insert_data[middleware].keys():
            insert_data[middleware][db] = {}
        if measurement not in insert_data[middleware][db].keys():
            q = Queue()
            q.middleware = middleware
            q.db = db
            q.measurement = measurement
            insert_data[middleware][db][measurement] = q

        queue = insert_data[middleware][db][measurement]
        queue.push(json_body)
        if queue.size() >= Config.multi_insert_limit:
            data_pull(queue)
        
        

    except Exception as err:
        #print err
        _, exc_value, exc_tb = sys.exc_info()
        print exc_value
        for filename, linenum, funcname, source in traceback.extract_tb(exc_tb):
            print "%-23s:%s '%s' in %s()" % (filename, linenum, source, funcname)
        continue

connect_close()