Authored by wuxiao

批量提交模式最后修改

@@ -20,7 +20,7 @@ rabbitmq_vhost = '/' @@ -20,7 +20,7 @@ rabbitmq_vhost = '/'
20 influxdb_host = '127.0.0.1' 20 influxdb_host = '127.0.0.1'
21 --influxdb_host = '54.222.219.223' 21 --influxdb_host = '54.222.219.223'
22 influxdb_port = 8086 22 influxdb_port = 8086
23 - 23 +influxdb_timeout = 2
24 24
25 --[[ 25 --[[
26 mongodb服务器 26 mongodb服务器
@@ -29,7 +29,7 @@ mongodb_host = '127.0.0.1' @@ -29,7 +29,7 @@ mongodb_host = '127.0.0.1'
29 mongodb_port = 27017 29 mongodb_port = 27017
30 mongodb_user = 'root' 30 mongodb_user = 'root'
31 mongodb_pass = 'yoho@Uniformpass' 31 mongodb_pass = 'yoho@Uniformpass'
32 -mongodb_timeout = 3 32 +mongodb_timeout = 2
33 33
34 --[[ 34 --[[
35 mysql服务器 35 mysql服务器
@@ -46,7 +46,7 @@ mysql_pass = 'yoho@Uniformpass' @@ -46,7 +46,7 @@ mysql_pass = 'yoho@Uniformpass'
46 --http_mq_port = 9500 46 --http_mq_port = 9500
47 --worker_num = 3 47 --worker_num = 3
48 --daemonize = false 48 --daemonize = false
49 -multi_insert_limit = 10 49 +multi_insert_limit = 200
50 multi_insert_timeout = 3 50 multi_insert_timeout = 3
51 51
52 52
@@ -76,9 +76,6 @@ while True: @@ -76,9 +76,6 @@ while True:
76 measurement = Config.info_channel+time.strftime('_%Y%m%d') 76 measurement = Config.info_channel+time.strftime('_%Y%m%d')
77 77
78 json_body = dict(data['data'],**data['tags']) 78 json_body = dict(data['data'],**data['tags'])
79 -  
80 - if json_body['message'] == 'sdhfsodifjoisdfsoi':  
81 - print json_body  
82 79
83 middleware = 'mongodb' 80 middleware = 'mongodb'
84 81
@@ -54,7 +54,7 @@ def rabbitmq_connect(): @@ -54,7 +54,7 @@ def rabbitmq_connect():
54 54
55 #连接influxdb 55 #连接influxdb
56 def influxdb_connect(): 56 def influxdb_connect():
57 - _influxdb = InfluxDBClient(Config.influxdb_host, Config.influxdb_port, timeout=1) 57 + _influxdb = InfluxDBClient(Config.influxdb_host, Config.influxdb_port, timeout=Config.influxdb_timeout)
58 return _influxdb 58 return _influxdb
59 59
60 #连接mongodb 60 #连接mongodb
@@ -92,22 +92,21 @@ def connect_close(): @@ -92,22 +92,21 @@ def connect_close():
92 92
93 #数据入库定时器 93 #数据入库定时器
94 def data_pull_timer(insert_data): 94 def data_pull_timer(insert_data):
95 - print time.time()  
96 try: 95 try:
97 for middleware,dbs in insert_data.items(): 96 for middleware,dbs in insert_data.items():
98 for db,measurements in dbs.items(): 97 for db,measurements in dbs.items():
99 for measurement,queue in measurements.items(): 98 for measurement,queue in measurements.items():
100 try: 99 try:
101 - data_pull(queue,'Timer') 100 + data_pull(queue)
102 except: 101 except:
103 - data_pull(queue,'Timer') 102 + data_pull(queue)
104 except Exception as err: 103 except Exception as err:
105 print err 104 print err
106 finally: 105 finally:
107 Timer(Config.multi_insert_timeout,data_pull_timer,[insert_data]).start() 106 Timer(Config.multi_insert_timeout,data_pull_timer,[insert_data]).start()
108 107
109 #数据入库 108 #数据入库
110 -def data_pull(queue,type = 'Normal'): 109 +def data_pull(queue):
111 if queue.is_empty(): 110 if queue.is_empty():
112 return 111 return
113 json_bodys = queue.get_all() 112 json_bodys = queue.get_all()