__init__.py
3.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# -*- coding: utf-8 -*-
__author__ = 'wuxiao'
import os
import sys
__FILE__ = os.path.abspath(sys.argv[0])
__DIR__ = os.path.dirname(os.path.abspath(sys.argv[0]))
reload(sys)
sys.setdefaultencoding('utf8')
import json
import time
import pika
from influxdb import InfluxDBClient
from pymongo import MongoClient
from lupa import LuaRuntime
from urllib import quote
'''
读取配置信息
'''
lua = LuaRuntime(unpack_returned_tuples=True)
lua.execute(open(__DIR__+'/Config.lua').read(-1))
Config = lua.globals()
#写日志
def write_log(msg):
fp = open(Config.log_file,'a')
fp.write(msg+"\n")
fp.close()
#写性能调试日志
def write_performance(msg, pre = None):
if not os.path.isdir(Config.performance_dir):
os.makedirs(Config.performance_dir,0777)
file = time.strftime('%Y%m%d',time.localtime(time.time()))+'.log'
if pre:
file = pre+'_'+file
fp = open(Config.performance_dir+'/'+file,'a')
fp.write(msg+"\n")
fp.close()
#连接rabbitmq
def rabbitmq_connect():
credentials = pika.PlainCredentials(Config.rabbitmq_user, Config.rabbitmq_pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(
Config.rabbitmq_host,Config.rabbitmq_port,Config.rabbitmq_vhost,credentials))
if not connection:
exit("Cannot connect to the broker!\n")
_channel = connection.channel()
return _channel
#连接influxdb
def influxdb_connect():
_influxdb = InfluxDBClient(Config.influxdb_host, Config.influxdb_port, timeout=1)
return _influxdb
#连接mongodb
def mongodb_connect():
_mongodb = MongoClient( 'mongodb://%s:%s@%s:%d/' % ( quote(Config.mongodb_user),quote(Config.mongodb_pass),Config.mongodb_host,Config.mongodb_port ), socketTimeoutMS=Config.mongodb_timeout,connectTimeoutMS=Config.mongodb_timeout,serverSelectionTimeoutMS=Config.mongodb_timeout)
return _mongodb
#influxdb选择库,写入
def influxdb_insert(db, measurement, json_bodys):
try:
influxdb.switch_database(db)
return influxdb.write_points(json_bodys)
except Exception as err:
influxdb = influxdb_connect()
influxdb.switch_database(db)
return influxdb.write_points(json_bodys)
#mongodb选择库和集合,写入
def mongodb_insert(db, measurement, json_bodys):
try:
return mongodb[db][measurement].insert_many(json_bodys)
except Exception as err:
mongodb = mongodb_connect()
return mongodb[db][measurement].insert_many(json_bodys)
#关闭所有连接
def connect_close():
#rabbitmq
_channel.cancel()
connection.close()
#influxdb
pass
#mongodb
_mongodb.close()
class Queue:
def __init__(self):
self.time = 0
self.list = []
#添加一条记录
def append(self,data):
self.list.append(data)
if self.time == 0:
self.time = time.time()
#是否已满足导出条件
def is_was_time(self):
if self.time == 0:
return False
if len(self.list) == 0:
return False
return (len(self.list) >= Config.multi_insert_limit) or (time.time() - self.time >= Config.multi_insert_timeout)
#获取所有
def get_all(self):
return self.list
#重置
def reset(self):
self.list = []
self.time = 0