|
@@ -3,7 +3,7 @@ |
|
@@ -3,7 +3,7 @@ |
3
|
# @Author: wuxiao
|
3
|
# @Author: wuxiao
|
4
|
# @Date: 2016-05-24 13:37:45
|
4
|
# @Date: 2016-05-24 13:37:45
|
5
|
# @Last Modified by: anchen
|
5
|
# @Last Modified by: anchen
|
6
|
-# @Last Modified time: 2016-05-24 18:40:09
|
6
|
+# @Last Modified time: 2016-06-16 18:36:19
|
7
|
|
7
|
|
8
|
import os
|
8
|
import os
|
9
|
import sys
|
9
|
import sys
|
|
@@ -14,7 +14,9 @@ import json |
|
@@ -14,7 +14,9 @@ import json |
14
|
import time
|
14
|
import time
|
15
|
import pika
|
15
|
import pika
|
16
|
from influxdb import InfluxDBClient
|
16
|
from influxdb import InfluxDBClient
|
|
|
17
|
+from pymongo import MongoClient
|
17
|
from lupa import LuaRuntime
|
18
|
from lupa import LuaRuntime
|
|
|
19
|
+from urllib import quote
|
18
|
|
20
|
|
19
|
'''
|
21
|
'''
|
20
|
读取配置信息
|
22
|
读取配置信息
|
|
@@ -37,6 +39,9 @@ channel = connection.channel() |
|
@@ -37,6 +39,9 @@ channel = connection.channel() |
37
|
#连接influxdb
|
39
|
#连接influxdb
|
38
|
influxdb = InfluxDBClient(Config.influxdb_host, Config.influxdb_port, timeout=1)
|
40
|
influxdb = InfluxDBClient(Config.influxdb_host, Config.influxdb_port, timeout=1)
|
39
|
|
41
|
|
|
|
42
|
+#连接mongodb
|
|
|
43
|
+mongodb = MongoClient( 'mongodb://%s:%s@%s:%d/' % ( quote(Config.mongodb_user),quote(Config.mongodb_pass),Config.mongodb_host,Config.mongodb_port ) )
|
|
|
44
|
+
|
40
|
print "MqClient:Start.\n";
|
45
|
print "MqClient:Start.\n";
|
41
|
|
46
|
|
42
|
#开始消费消息数据
|
47
|
#开始消费消息数据
|
|
@@ -73,10 +78,13 @@ for method_frame, properties, body in channel.consume(q_name): |
|
@@ -73,10 +78,13 @@ for method_frame, properties, body in channel.consume(q_name): |
73
|
}
|
78
|
}
|
74
|
]
|
79
|
]
|
75
|
|
80
|
|
76
|
- #选择库,写入
|
81
|
+ #influxdb选择库,写入
|
77
|
influxdb.switch_database(db)
|
82
|
influxdb.switch_database(db)
|
78
|
influxdb.write_points(json_body)
|
83
|
influxdb.write_points(json_body)
|
79
|
|
84
|
|
|
|
85
|
+ #mongodb选择库和集合,写入
|
|
|
86
|
+ mongodb[db][measurement].insert_one(data['data'])
|
|
|
87
|
+
|
80
|
#ack回应
|
88
|
#ack回应
|
81
|
#channel.basic_ack(method_frame.delivery_tag)
|
89
|
#channel.basic_ack(method_frame.delivery_tag)
|
82
|
if method_frame.delivery_tag % ack_interval == 0:
|
90
|
if method_frame.delivery_tag % ack_interval == 0:
|