Cobar.py 12.1 KB
import collectd
import MySQLdb
import copy
import re

Cobar_config = {
    'host': '127.0.0.1',
    'port': '9066',
    'user': 'test',
    'pwd': 'test',
    'tag': '0'
}

#Cobar_vars = {
 #   'tp_active_count': 'gauge',
  #  'tp_task_queue_size': 'gauge',
   # 'tp_task_complete_size': 'gauge',
    #'con_net_in': 'gauge',
    #'con_net_out': 'gauge',
    #'con_fc_count': 'gauge',
    #'sch_max_concurrent': 'gauge',
    #'sch_rcount': 'gauge',
    #'sch_wcount': 'gauge',
    #'sch_net_in': 'gauge',
    #'sch_net_out': 'gauge',
    #'sch_cobar_10': 'gauge',
    #'sch_cobar_200': 'gauge',
    #'sch_cobar_1000': 'gauge',
    #'sch_cobar_2000': 'gauge',
    #'sch_node_10': 'gauge',
    #'sch_node_200': 'gauge',
    #'sch_node_1000': 'gauge',
    #'sch_node_2000': 'gauge',
    #'tb_max_concurrent': 'gauge',
    #'tb_rcount': 'gauge',
    #'tb_wcount': 'gauge',
    #'tb_cobar_10': 'gauge',
    #'tb_cobar_200': 'gauge',
    #'tb_cobar_1000': 'gauge',
    #'tb_cobar_2000': 'gauge',
    #'tb_node_10': 'gauge',
    #'tb_node_200': 'gauge',
    #'tb_node_1000': 'gauge',
    #'tb_node_2000': 'gauge',
#}


class CobarTpStates(object):
        tp_active_count = 0
        tp_task_queue_size = 0
        tp_task_complete_size = 0


g_tp_states = CobarTpStates()


class CobarConnStates(object):
        con_net_in = 0
        con_net_out = 0
        con_fc_count = 0


g_con_states = CobarConnStates()


class CobarSchemaStates(object):
        schema = ''
        sch_max_concurrent = 0
        sch_rcount = 0
        sch_wcount = 0
        sch_net_in = 0
        sch_net_out = 0
        sch_cobar_10 = 0
	sch_cobar_50 = 0
        sch_cobar_200 = 0
        sch_cobar_1000 = 0
        sch_cobar_2000 = 0
        sch_node_10 = 0
	sch_node_50 = 0
        sch_node_200 = 0
        sch_node_1000 = 0
        sch_node_2000 = 0


g_schema_states = dict()


class CobarTableStates(object):
        table = ''
        tb_rcount = 0
        tb_wcount = 0
        tb_cobar_10 = 0
        tb_cobar_50 = 0
        tb_cobar_200 = 0
        tb_cobar_1000 = 0
        tb_cobar_2000 = 0
        tb_node_10 = 0
        tb_node_50 = 0
        tb_node_200 = 0
        tb_node_1000 = 0
        tb_node_2000 = 0


g_table_states = dict()


def get_cobar_tp_states(con):
    res = get_cobar_query(con, 'show @@threadpool')
    cur_states = CobarTpStates()
    for row in res.fetchall():
        if str(row['NAME']).startswith('Process'):
            cur_states.tp_active_count = cur_states.tp_active_count + row['ACTIVE_COUNT']
            cur_states.tp_task_queue_size = cur_states.tp_task_queue_size + row['TASK_QUEUE_SIZE']
            cur_states.tp_task_complete_size = cur_states.tp_task_complete_size + row['COMPLETED_TASK']
    global g_tp_states
    tp_states = CobarTpStates()
    tp_states.tp_active_count = cur_states.tp_active_count
    collectd.info("global:"+str(g_tp_states.tp_task_complete_size));
    collectd.info("cur:"+str(cur_states.tp_task_complete_size));
    tp_states.tp_task_complete_size = cur_states.tp_task_complete_size - g_tp_states.tp_task_complete_size
    tp_states.tp_task_queue_size = cur_states.tp_task_queue_size - g_tp_states.tp_task_queue_size
    g_tp_states = copy.deepcopy(cur_states)
    dispatch_states(tp_states)


def get_cobar_con_states(con):
    res = get_cobar_query(con, 'show @@processor')
    cur_states = CobarConnStates()
    for row in res.fetchall():
        if str(row['NAME']).startswith('Process'):
            cur_states.con_fc_count = cur_states.con_fc_count + row['FC_COUNT']
            cur_states.con_net_in = cur_states.con_net_in + row['NET_IN']
            cur_states.con_net_out = cur_states.con_net_out + row['NET_OUT']
    global g_con_states
    collectd.info("global:"+str(g_con_states.con_net_in))
    collectd.info("curr:"+str(cur_states.con_net_in))
    con_states = CobarConnStates()
    con_states.con_fc_count = cur_states.con_fc_count
    con_states.con_net_in = cur_states.con_net_in - g_con_states.con_net_in
    con_states.con_net_out = cur_states.con_net_out - g_con_states.con_net_out
    g_con_states = copy.deepcopy(cur_states)
    dispatch_states(con_states)


def get_cobar_tb_states(con):
    res = get_cobar_query(con, 'show @@sql.stat.table')
    global g_table_states
    for row in res.fetchall():
        tmp_tb_states = CobarTableStates()
        tmp_tb_states.table = row['TABLE'].lower()
        tmp_tb_states.tb_rcount = int(row['R'])
        tmp_tb_states.tb_wcount = int(row['W'])
        tmp_ttl_tb = row['ALL_TTL_COUNT']
        tmp_ttl_tb = tmp_ttl_tb.lstrip('[')
        tmp_ttl_tb = tmp_ttl_tb.rstrip(']')
        ttl_tb = re.split(r'[;,\s]\s*', tmp_ttl_tb)
        tmp_tb_states.tb_cobar_10 = int(ttl_tb[0])
        tmp_tb_states.tb_cobar_50 = int(ttl_tb[1])
        tmp_tb_states.tb_cobar_200 = int(ttl_tb[2])
        tmp_tb_states.tb_cobar_1000 = int(ttl_tb[3])
        tmp_tb_states.tb_cobar_2000 = int(ttl_tb[4])
        tmp_ttl_tb = row['NODE_TTL_COUNT']
        tmp_ttl_tb = tmp_ttl_tb.lstrip('[')
        tmp_ttl_tb = tmp_ttl_tb.rstrip(']')
        ttl_tb = re.split(r'[;,\s]\s*', tmp_ttl_tb)
        tmp_tb_states.tb_node_10 = int(ttl_tb[0])
        tmp_tb_states.tb_node_50 = int(ttl_tb[1])
        tmp_tb_states.tb_node_200 = int(ttl_tb[2])
        tmp_tb_states.tb_node_1000 = int(ttl_tb[3])
        tmp_tb_states.tb_node_2000 = int(ttl_tb[4])
        old_states = g_table_states.get(tmp_tb_states.table)
        if old_states is None:
            g_table_states[tmp_tb_states.table] = tmp_tb_states
            # dispatch_states(tmp_tb_states.table, tmp_tb_states)
        else:
            tb_states = CobarTableStates()
            tb_states.table = tmp_tb_states.table
            tb_states.tb_rcount = tmp_tb_states.tb_rcount - old_states.tb_rcount
            tb_states.tb_wcount = tmp_tb_states.tb_wcount - old_states.tb_wcount
            tb_states.tb_cobar_10 = tmp_tb_states.tb_cobar_10 - old_states.tb_cobar_10
            tb_states.tb_cobar_50 = tmp_tb_states.tb_cobar_50 - old_states.tb_cobar_50
            tb_states.tb_cobar_200 = tmp_tb_states.tb_cobar_200 - old_states.tb_cobar_200
            tb_states.tb_cobar_1000 = tmp_tb_states.tb_cobar_1000 - old_states.tb_cobar_1000
            tb_states.tb_cobar_2000 = tmp_tb_states.tb_cobar_2000 - old_states.tb_cobar_2000
            tb_states.tb_node_10 = tmp_tb_states.tb_node_10 - old_states.tb_node_10
            tb_states.tb_node_50 = tmp_tb_states.tb_node_50 - old_states.tb_node_50
            tb_states.tb_node_200 = tmp_tb_states.tb_node_200 - old_states.tb_node_200
            tb_states.tb_node_1000 = tmp_tb_states.tb_node_1000 - old_states.tb_node_1000
            tb_states.tb_node_2000 = tmp_tb_states.tb_node_2000 - old_states.tb_node_2000
            g_table_states[tmp_tb_states.table] = copy.deepcopy(tmp_tb_states)
            dispatch_nstates('table', tb_states)


def get_cobar_schema_states(con):
    res = get_cobar_query(con, 'show @@sql.stat.schema')
    global g_schema_states
    for row in res.fetchall():
        tmp_schema_states = CobarSchemaStates()
        tmp_schema_states.schema = row['SCHEMA'].lower()
        tmp_schema_states.sch_max_concurrent = int(row['MAX'])
        tmp_schema_states.sch_net_in = int(row['NET_IN'])
        tmp_schema_states.sch_net_out = int(row['NET_OUT'])
        tmp_schema_states.sch_rcount = int(row['R'])
        tmp_schema_states.sch_wcount = int(row['W'])
        tmp_ttl_sch = row['ALL_TTL_COUNT']
        tmp_ttl_sch = tmp_ttl_sch.lstrip('[')
        tmp_ttl_sch = tmp_ttl_sch.rstrip(']')
        ttl_sch = re.split(r'[;,\s]\s*', tmp_ttl_sch)
        tmp_schema_states.sch_cobar_10 = int(ttl_sch[0])
        tmp_schema_states.sch_cobar_50 = int(ttl_sch[1])
        tmp_schema_states.sch_cobar_200 = int(ttl_sch[2])
        tmp_schema_states.sch_cobar_1000 = int(ttl_sch[3])
        tmp_schema_states.sch_cobar_2000 = int(ttl_sch[4])
        tmp_ttl_sch = row['NODE_TTL_COUNT']
        tmp_ttl_sch = tmp_ttl_sch.lstrip('[')
        tmp_ttl_sch = tmp_ttl_sch.rstrip(']')
        ttl_sch = re.split(r'[;,\s]\s*', tmp_ttl_sch)
        tmp_schema_states.sch_node_10 = int(ttl_sch[0])
        tmp_schema_states.sch_node_50 = int(ttl_sch[1])
	tmp_schema_states.sch_node_200 = int(ttl_sch[2])
	tmp_schema_states.sch_node_1000 = int(ttl_sch[3])
	tmp_schema_states.sch_node_2000 = int(ttl_sch[4])
        old_states = g_schema_states.get(tmp_schema_states.schema)
        if old_states is None:
            g_schema_states[tmp_schema_states.schema] = tmp_schema_states
            # dispatch_states(tmp_schema_states.schema, tmp_schema_states)
        else:
            sch_states = CobarSchemaStates()
            sch_states.schema = tmp_schema_states.schema
            sch_states.sch_max_concurrent = tmp_schema_states.sch_max_concurrent
            sch_states.sch_net_in = tmp_schema_states.sch_net_in - old_states.sch_net_in
            sch_states.sch_net_out = tmp_schema_states.sch_net_out - old_states.sch_net_out
            sch_states.sch_wcount = tmp_schema_states.sch_wcount - old_states.sch_wcount
            sch_states.sch_rcount = tmp_schema_states.sch_rcount - old_states.sch_rcount
            sch_states.sch_cobar_10 = tmp_schema_states.sch_cobar_10 - old_states.sch_cobar_10
            sch_states.sch_cobar_50 = tmp_schema_states.sch_cobar_50 - old_states.sch_cobar_50
            sch_states.sch_cobar_200 = tmp_schema_states.sch_cobar_200 - old_states.sch_cobar_200
            sch_states.sch_cobar_1000 = tmp_schema_states.sch_cobar_1000 - old_states.sch_cobar_1000
            sch_states.sch_cobar_2000 = tmp_schema_states.sch_cobar_2000 - old_states.sch_cobar_2000
            sch_states.sch_node_10 = tmp_schema_states.sch_node_10 - old_states.sch_node_10
            sch_states.sch_node_50 = tmp_schema_states.sch_node_50 - old_states.sch_node_50
            sch_states.sch_node_200 = tmp_schema_states.sch_node_200 - old_states.sch_node_200
            sch_states.sch_node_1000 = tmp_schema_states.sch_node_1000 - old_states.sch_node_1000
            sch_states.sch_node_2000 = tmp_schema_states.sch_node_2000 - old_states.sch_node_2000
            g_schema_states[tmp_schema_states.schema] = copy.deepcopy(tmp_schema_states)
            dispatch_nstates('schema', sch_states)


def configure_callback(conf=None):
    global Cobar_config
    for c in conf.children:
        key = c.key.lower()
        value = c.values[0]
        Cobar_config[key] = value
    Cobar_config['port'] = int(Cobar_config['port'])


def get_cobar_con():
    return MySQLdb.connect(host=Cobar_config['host'],
                           port=Cobar_config['port'],
                           user=Cobar_config['user'],
                           passwd=Cobar_config['pwd'])


def get_cobar_query(con, query):
    cur = con.cursor(MySQLdb.cursors.DictCursor)
    cur.execute(query)
    return cur


def dispatch_states(states):
    if not states:
	return
    data_dic = vars(states).iteritems()
    for key, value in data_dic:
        dispatch_value(key, value, key, None)


# type-table_test01   type_instance-maxConcurrent
def dispatch_nstates(type, states):
    pass
    if not states:
	return
    prefix = ''
    if type == 'schema':
       prefix = states.schema
    else:
       prefix = states.table
    data_dic = vars(states).iteritems()
    for key, value in data_dic:
        if str(key) == 'schema' or str(key) == 'table':
            continue
        dispatch_value(key, value, type, prefix)


def dispatch_value(key, value, type, type_instance=None):
    if not type_instance:
        type_instance = key
    else:
        type_instance = type_instance+'_'+key
    if value is None:
        return
    try:
        value = int(value)
    except ValueError:
        value = float(value)

    val = collectd.Values(plugin='cobar', plugin_instance=Cobar_config['tag'])
    val.type = type
    val.type_instance = type_instance
    val.values = [value]
    val.dispatch()


def read_callback():
    pass
    collectd.info("user read info....")
    connection = get_cobar_con()
    get_cobar_tp_states(connection)
    get_cobar_con_states(connection)
    get_cobar_schema_states(connection)
    get_cobar_tb_states(connection)

def write_callback(vl):
    collectd.info('cb: %s' % vl)


collectd.register_config(configure_callback)

collectd.register_read(read_callback)

#collectd.register_write(write_callback)