__init__.py
2.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
__author__ = 'wuxiao'
import os
import sys
__FILE__ = os.path.abspath(sys.argv[0])
__DIR__ = os.path.dirname(os.path.abspath(sys.argv[0]))
import json
import time
import pika
from influxdb import InfluxDBClient
from pymongo import MongoClient
from lupa import LuaRuntime
from urllib import quote
'''
读取配置信息
'''
lua = LuaRuntime(unpack_returned_tuples=True)
lua.execute(open(__DIR__+'/Config.lua').read(-1))
Config = lua.globals()
def write_log(msg):
fp = open(Config.log_file,'a')
fp.write(msg)
fp.close()
#连接rabbitmq
def rabbitmq_connect:
credentials = pika.PlainCredentials(Config.rabbitmq_user, Config.rabbitmq_pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(
Config.rabbitmq_host,Config.rabbitmq_port,Config.rabbitmq_vhost,credentials))
if not connection:
exit("Cannot connect to the broker!\n")
_channel = connection.channel()
return _channel
#连接influxdb
def influxdb_connect:
_influxdb = InfluxDBClient(Config.influxdb_host, Config.influxdb_port, timeout=1)
return _influxdb
#连接mongodb
def mongodb_connect:
_mongodb = MongoClient( 'mongodb://%s:%s@%s:%d/' % ( quote(Config.mongodb_user),quote(Config.mongodb_pass),Config.mongodb_host,Config.mongodb_port ), socketTimeoutMS=Config.mongodb_timeout,connectTimeoutMS=Config.mongodb_timeout,serverSelectionTimeoutMS=Config.mongodb_timeout)
return
#influxdb选择库,写入
def influxdb_insert(db,json_body):
try:
influxdb.switch_database(db)
influxdb.write_points(json_body)
except Exception as err:
influxdb = influxdb_connect()
influxdb.switch_database(db)
influxdb.write_points(json_body)
#mongodb选择库和集合,写入
def mongodb_insert(db,measurement,json_body):
try:
mongodb[db][measurement].insert_one(json_body)
except Exception as err:
mongodb = mongodb_connect()
mongodb[db][measurement].insert_one(json_body)
#关闭所有连接
def connect_close:
#rabbitmq
_channel.cancel()
connection.close()
#influxdb
pass
#mongodb
_mongodb.close()