__init__.py
3.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# -*- coding: utf-8 -*-
__author__ = 'wuxiao'
import os
import sys
__FILE__ = os.path.abspath(sys.argv[0])
__DIR__ = os.path.dirname(os.path.abspath(sys.argv[0]))
reload(sys)
sys.setdefaultencoding('utf8')
import json
import time
import pika
from influxdb import InfluxDBClient
from pymongo import MongoClient
from lupa import LuaRuntime
from urllib import quote
from threading import Timer
'''
读取配置信息
'''
lua = LuaRuntime(unpack_returned_tuples=True)
lua.execute(open(__DIR__+'/Config.lua').read(-1))
Config = lua.globals()
#写日志
def write_log(msg):
fp = open(Config.log_file,'a')
fp.write(msg+"\n")
fp.close()
#写性能调试日志
def write_performance(msg, pre = None):
if not os.path.isdir(Config.performance_dir):
os.makedirs(Config.performance_dir,0777)
file = time.strftime('%Y%m%d',time.localtime(time.time()))+'.log'
if pre:
file = pre+'_'+file
fp = open(Config.performance_dir+'/'+file,'a')
fp.write(msg+"\n")
fp.close()
#连接rabbitmq
def rabbitmq_connect():
credentials = pika.PlainCredentials(Config.rabbitmq_user, Config.rabbitmq_pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(
Config.rabbitmq_host,Config.rabbitmq_port,Config.rabbitmq_vhost,credentials))
if not connection:
exit("Cannot connect to the broker!\n")
_channel = connection.channel()
return _channel
#连接influxdb
def influxdb_connect():
_influxdb = InfluxDBClient(Config.influxdb_host, Config.influxdb_port, timeout=1)
return _influxdb
#连接mongodb
def mongodb_connect():
_mongodb = MongoClient( 'mongodb://%s:%s@%s:%d/' % ( quote(Config.mongodb_user),quote(Config.mongodb_pass),Config.mongodb_host,Config.mongodb_port ), socketTimeoutMS=Config.mongodb_timeout,connectTimeoutMS=Config.mongodb_timeout,serverSelectionTimeoutMS=Config.mongodb_timeout)
return _mongodb
#influxdb选择库,写入
def influxdb_insert(db, measurement, json_bodys):
try:
influxdb.switch_database(db)
return influxdb.write_points(json_bodys)
except Exception as err:
influxdb = influxdb_connect()
influxdb.switch_database(db)
return influxdb.write_points(json_bodys)
#mongodb选择库和集合,写入
def mongodb_insert(db, measurement, json_bodys):
try:
return mongodb[db][measurement].insert_many(json_bodys)
except Exception as err:
mongodb = mongodb_connect()
return mongodb[db][measurement].insert_many(json_bodys)
#关闭所有连接
def connect_close():
#rabbitmq
_channel.cancel()
connection.close()
#influxdb
pass
#mongodb
_mongodb.close()
#数据入库定时器
def data_pull_timer(insert_data):
print time.time()
for middleware,dbs in insert_data.items():
for db,measurements in dbs.items():
for measurement,queue in measurements.items():
data_pull(queue,'Timer')
Timer(Config.multi_insert_timeout,data_pull_timer,[insert_data]).start()
#数据入库
def data_pull(queue,type = 'Normal'):
if queue.is_empty():
return
json_bodys = queue.get_all()
if queue.middleware == 'mongodb':
r = mongodb_insert(queue.db, queue.measurement, json_bodys)
elif queue.middleware == 'influxdb':
r = influxdb_insert(queue.db, queue.measurement, json_bodys)
if r:
queue.reset()
#队列类
class Queue:
def __init__(self):
self.list = []
self.num = 0
#添加一条记录
def push(self,data):
self.list.append(data)
self.num += 1
#获取数量
def size(self):
return self.num
#获取所有
def get_all(self):
return self.list
#重置
def reset(self):
self.list = []
self.num = 0
#判断列表是否为空
def is_empty(self):
return self.num == 0