|
|
import collectd
|
|
|
import pika
|
|
|
import time
|
|
|
|
|
|
SendCon = None
|
|
|
|
|
|
RecvCon = None
|
|
|
|
|
|
RabbitConfig = {
|
|
|
'host': '127.0.0.1',
|
|
|
'port': 5672,
|
|
|
'user': 'guest',
|
|
|
'pwd': 'guest',
|
|
|
'vhost': '/',
|
|
|
'tag': 'rabbit_01'
|
|
|
}
|
|
|
|
|
|
readFlag = False
|
|
|
|
|
|
def init_SendCon():
|
|
|
global SendCon
|
|
|
if SendCon == None or SendCon.is_open == False:
|
|
|
credentials = pika.PlainCredentials(RabbitConfig['user'], RabbitConfig['pwd'])
|
|
|
SendCon = pika.BlockingConnection(pika.ConnectionParameters(host=RabbitConfig['host'], port=RabbitConfig['port'], credentials=credentials, virtual_host=RabbitConfig['vhost']))
|
|
|
if SendCon.is_open == True:
|
|
|
collectd.info("connect success...")
|
|
|
else:
|
|
|
return
|
|
|
|
|
|
|
|
|
def init_RecvCon():
|
|
|
global RecvCon
|
|
|
if RecvCon == None or RecvCon.is_open == False:
|
|
|
credentials = pika.PlainCredentials(RabbitConfig['user'], RabbitConfig['pwd'])
|
|
|
RecvCon = pika.BlockingConnection(pika.ConnectionParameters(host=RabbitConfig['host'], port=RabbitConfig['port'], credentials=credentials, virtual_host=RabbitConfig['vhost']))
|
|
|
else:
|
|
|
return
|
|
|
|
|
|
|
|
|
def send_check():
|
|
|
init_SendCon()
|
|
|
global SendCon
|
|
|
collectd.info("start send")
|
|
|
channel = SendCon.channel()
|
|
|
channel.queue_declare(queue='ctest_'+RabbitConfig['host'])
|
|
|
channel.basic_publish(exchange='',
|
|
|
routing_key='ctest_'+RabbitConfig['host'],
|
|
|
body='quit')
|
|
|
pass
|
|
|
|
|
|
|
|
|
def call_back(ch, method, properties, body):
|
|
|
global readFlag
|
|
|
readFlag = True
|
|
|
ch.basic_cancel(consumer_tag='consumer_'+RabbitConfig['host'])
|
|
|
ch.stop_consuming()
|
|
|
pass
|
|
|
|
|
|
|
|
|
def recv_check():
|
|
|
init_RecvCon()
|
|
|
global RecvCon
|
|
|
collectd.info("start read")
|
|
|
channel = RecvCon.channel()
|
|
|
channel.queue_declare(queue='ctest_'+RabbitConfig['host'])
|
|
|
channel.basic_consume(call_back,
|
|
|
queue='ctest_'+RabbitConfig['host'],
|
|
|
no_ack=True,
|
|
|
consumer_tag='consumer_'+RabbitConfig['host'])
|
|
|
channel.start_consuming()
|
|
|
pass
|
|
|
|
|
|
|
|
|
def config_callback(conf=None):
|
|
|
global RabbitConfig
|
|
|
for c in conf.children:
|
|
|
key = c.key.lower()
|
|
|
value = c.values[0]
|
|
|
if key == 'port':
|
|
|
RabbitConfig[key] = int(value)
|
|
|
else:
|
|
|
RabbitConfig[key] = str(value)
|
|
|
pass
|
|
|
|
|
|
|
|
|
def read_callback():
|
|
|
val = collectd.Values(plugin='rabbit-health', plugin_instance=RabbitConfig['tag'])
|
|
|
val.type = 'rabbitmq_state'
|
|
|
val.type_instance = ''
|
|
|
send_check()
|
|
|
recv_check()
|
|
|
global readFlag
|
|
|
if readFlag:
|
|
|
val.values = [1]
|
|
|
else:
|
|
|
val.values = [-1]
|
|
|
val.dispatch()
|
|
|
readFlag = False
|
|
|
pass
|
|
|
|
|
|
|
|
|
collectd.register_config(config_callback)
|
|
|
|
|
|
collectd.register_read(read_callback)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
...
|
...
|
|