__init__.py 3.79 KB
# -*- coding: utf-8 -*-

__author__ = 'wuxiao'

import os
import sys
__FILE__ = os.path.abspath(sys.argv[0])
__DIR__ = os.path.dirname(os.path.abspath(sys.argv[0]))
reload(sys)
sys.setdefaultencoding('utf8')

import json
import time
import pika
from influxdb import InfluxDBClient
from pymongo import MongoClient
from lupa import LuaRuntime
from urllib import quote
from threading import Timer

'''
读取配置信息
'''
lua = LuaRuntime(unpack_returned_tuples=True)
lua.execute(open(__DIR__+'/Config.lua').read(-1))
Config = lua.globals()

#写日志
def write_log(msg):
    fp = open(Config.log_file,'a')
    fp.write(msg+"\n")
    fp.close()

#写性能调试日志
def write_performance(msg, pre = None):
    if not os.path.isdir(Config.performance_dir):
        os.makedirs(Config.performance_dir,0777)
    file = time.strftime('%Y%m%d',time.localtime(time.time()))+'.log'
    if pre:
        file = pre+'_'+file
    fp = open(Config.performance_dir+'/'+file,'a')
    fp.write(msg+"\n")
    fp.close()

#连接rabbitmq
def rabbitmq_connect():
    credentials = pika.PlainCredentials(Config.rabbitmq_user, Config.rabbitmq_pass)
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        Config.rabbitmq_host,Config.rabbitmq_port,Config.rabbitmq_vhost,credentials))
    if not connection:
        exit("Cannot connect to the broker!\n")
    _channel = connection.channel()
    return _channel

#连接influxdb
def influxdb_connect():
    _influxdb = InfluxDBClient(Config.influxdb_host, Config.influxdb_port, timeout=1)
    return _influxdb

#连接mongodb
def mongodb_connect():
    _mongodb = MongoClient( 'mongodb://%s:%s@%s:%d/' % ( quote(Config.mongodb_user),quote(Config.mongodb_pass),Config.mongodb_host,Config.mongodb_port ), socketTimeoutMS=Config.mongodb_timeout,connectTimeoutMS=Config.mongodb_timeout,serverSelectionTimeoutMS=Config.mongodb_timeout)
    return _mongodb

#influxdb选择库,写入
def influxdb_insert(db, measurement, json_bodys):
    try:
        influxdb.switch_database(db)
        return influxdb.write_points(json_bodys)
    except Exception as err:
        influxdb = influxdb_connect()
        influxdb.switch_database(db)
        return influxdb.write_points(json_bodys)

#mongodb选择库和集合,写入
def mongodb_insert(db, measurement, json_bodys):
    try:
        return mongodb[db][measurement].insert_many(json_bodys)
    except Exception as err:
        mongodb = mongodb_connect()
        return mongodb[db][measurement].insert_many(json_bodys)

#关闭所有连接
def connect_close():
    #rabbitmq
    _channel.cancel()
    connection.close()
    #influxdb
    pass
    #mongodb
    _mongodb.close()

#数据入库定时器
def data_pull_timer(insert_data):
    print time.time()
    for middleware,dbs in insert_data.items():
        for db,measurements in dbs.items():
            for measurement,queue in measurements.items():
                data_pull(queue,'Timer')
    Timer(Config.multi_insert_timeout,data_pull_timer,[insert_data]).start()

#数据入库
def data_pull(queue,type = 'Normal'):
    if queue.is_empty():
        return
    json_bodys = queue.get_all()
    if queue.middleware == 'mongodb':
        r = mongodb_insert(queue.db, queue.measurement, json_bodys)
    elif queue.middleware == 'influxdb':
        r = influxdb_insert(queue.db, queue.measurement, json_bodys)
    if r:
        queue.reset()

#队列类
class Queue:
    def __init__(self):
        self.list = []
        self.num = 0

    #添加一条记录
    def push(self,data):
        self.list.append(data)
        self.num += 1

    #获取数量
    def size(self):
        return self.num

    #获取所有
    def get_all(self):
        return self.list

    #重置
    def reset(self):
        self.list = []
        self.num = 0

    #判断列表是否为空
    def is_empty(self):
        return self.num == 0