__init__.py 2.08 KB
__author__ = 'wuxiao'

import os
import sys
__FILE__ = os.path.abspath(sys.argv[0])
__DIR__ = os.path.dirname(os.path.abspath(sys.argv[0]))

import json
import time
import pika
from influxdb import InfluxDBClient
from pymongo import MongoClient
from lupa import LuaRuntime
from urllib import quote

'''
读取配置信息
'''
lua = LuaRuntime(unpack_returned_tuples=True)
lua.execute(open(__DIR__+'/Config.lua').read(-1))
Config = lua.globals()

def write_log(msg):
    fp = open(Config.log_file,'a')
    fp.write(msg)
    fp.close()

#连接rabbitmq
def rabbitmq_connect:
    credentials = pika.PlainCredentials(Config.rabbitmq_user, Config.rabbitmq_pass)
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        Config.rabbitmq_host,Config.rabbitmq_port,Config.rabbitmq_vhost,credentials))
    if not connection:
        exit("Cannot connect to the broker!\n")
    _channel = connection.channel()
    return _channel

#连接influxdb
def influxdb_connect:
    _influxdb = InfluxDBClient(Config.influxdb_host, Config.influxdb_port, timeout=1)
    return _influxdb

#连接mongodb
def mongodb_connect:
    _mongodb = MongoClient( 'mongodb://%s:%s@%s:%d/' % ( quote(Config.mongodb_user),quote(Config.mongodb_pass),Config.mongodb_host,Config.mongodb_port ), socketTimeoutMS=Config.mongodb_timeout,connectTimeoutMS=Config.mongodb_timeout,serverSelectionTimeoutMS=Config.mongodb_timeout)
    return 

#influxdb选择库,写入
def influxdb_insert(db,json_body):
    try:
        influxdb.switch_database(db)
        influxdb.write_points(json_body)
    except Exception as err:
        influxdb = influxdb_connect()
        influxdb.switch_database(db)
        influxdb.write_points(json_body)

#mongodb选择库和集合,写入
def mongodb_insert(db,measurement,json_body):
    try:
        mongodb[db][measurement].insert_one(json_body)
    except Exception as err:
        mongodb = mongodb_connect()
        mongodb[db][measurement].insert_one(json_body)

#关闭所有连接
def connect_close:
    #rabbitmq
    _channel.cancel()
    connection.close()
    #influxdb
    pass
    #mongodb
    _mongodb.close()