__init__.py
3.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# -*- coding: utf-8 -*-
__author__ = 'wuxiao'
import os
import sys
__FILE__ = os.path.abspath(sys.argv[0])
__DIR__ = os.path.dirname(os.path.abspath(sys.argv[0]))
reload(sys)
sys.setdefaultencoding('utf8')
import json
import time
import pika
from influxdb import InfluxDBClient
from pymongo import MongoClient
from lupa import LuaRuntime
from urllib import quote
from threading import Timer
'''
读取配置信息
'''
lua = LuaRuntime(unpack_returned_tuples=True)
lua.execute(open(__DIR__+'/Config.lua').read(-1))
Config = lua.globals()
#写日志
def write_log(msg):
fp = open(Config.log_file,'a')
fp.write(msg+"\n")
fp.close()
#写性能调试日志
def write_performance(msg, pre = None):
if not os.path.isdir(Config.performance_dir):
os.makedirs(Config.performance_dir,0777)
file = time.strftime('%Y%m%d',time.localtime(time.time()))+'.log'
if pre:
file = pre+'_'+file
fp = open(Config.performance_dir+'/'+file,'a')
fp.write(msg+"\n")
fp.close()
#连接rabbitmq
def rabbitmq_connect():
credentials = pika.PlainCredentials(Config.rabbitmq_user, Config.rabbitmq_pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(
Config.rabbitmq_host,Config.rabbitmq_port,Config.rabbitmq_vhost,credentials))
if not connection:
exit("Cannot connect to the broker!\n")
_channel = connection.channel()
return _channel
#连接influxdb
def influxdb_connect():
_influxdb = InfluxDBClient(Config.influxdb_host, Config.influxdb_port, timeout=1)
return _influxdb
#连接mongodb
def mongodb_connect():
_mongodb = MongoClient( 'mongodb://%s:%s@%s:%d/' % ( quote(Config.mongodb_user),quote(Config.mongodb_pass),Config.mongodb_host,Config.mongodb_port ), socketTimeoutMS=Config.mongodb_timeout,connectTimeoutMS=Config.mongodb_timeout,serverSelectionTimeoutMS=Config.mongodb_timeout)
return _mongodb
#influxdb选择库,写入
def influxdb_insert(db, measurement, json_bodys):
try:
influxdb.switch_database(db)
return influxdb.write_points(json_bodys)
except Exception as err:
influxdb = influxdb_connect()
influxdb.switch_database(db)
return influxdb.write_points(json_bodys)
#mongodb选择库和集合,写入
def mongodb_insert(db, measurement, json_bodys):
try:
return mongodb[db][measurement].insert_many(json_bodys)
except Exception as err:
mongodb = mongodb_connect()
return mongodb[db][measurement].insert_many(json_bodys)
#关闭所有连接
def connect_close():
#rabbitmq
_channel.cancel()
connection.close()
#influxdb
pass
#mongodb
_mongodb.close()
#数据入库
def data_pull(queue):
json_bodys = queue.get_all()
if queue.middleware == 'mongodb':
r = mongodb_insert(queue.db, queue.measurement, json_bodys)
elif queue.middleware == 'influxdb':
r = influxdb_insert(queue.db, queue.measurement, json_bodys)
if r:
queue.reset()
class Queue:
def __init__(self):
self.time = 0
self.list = []
#添加一条记录
def append(self,data):
self.list.append(data)
if self.time == 0:
self.time = time.time()
#是否已满足导出条件
def is_was_time(self):
if self.time == 0:
return False
if len(self.list) == 0:
return False
return (len(self.list) >= Config.multi_insert_limit) or (time.time() - self.time >= Config.multi_insert_timeout)
#获取所有
def get_all(self):
return self.list
#重置
def reset(self):
self.list = []
self.time = 0
#获取数量
def is_empty(self):
return len(self.list) == 0