emr_query.py 6.73 KB
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# author tiexin.yang@yoho.cn

from qcloud_api import QcloudApi
from mailer import mailman
import datetime
import argparse
import os,shutil
import time
import re


class EMRClusterScanner(object):
    def __init__(self,interval,secretId='',secretKey=''):
        self.interval = int(interval) if int(interval)> 30 else 30 #扫描时间间隔必须大于30秒,避免浪费cpu资源以及api访问过于频繁 单位:秒      
        if not secretId or not secretKey:
            self.secretId,self.secretKey = open('/home/txyang/.qcloud_config').read().strip('\n').split('\n')
        else:
            self.secretId,self.secretKey = secretId,secretKey

        self.inventoryFile = '/opt/projects/yoho-ansible-roles/inventories/bigdata/hosts'
        self.mailman = mailman()
        self.receivers = ['tiexin.yang@yoho.cn','chunhua.zhang@yoho.cn']
        self.emrNameMap = {
            "emr-rt": "emr-r6bhtb5v",
            "emr-ops": "emr-iaeloyc2",
            "emr-dw": "emr-r6bhtb5v",
            "emr-recom": "emr-ilj72ynu"
        }


    def EmrDescribeCluster(self,PageNo=1):
        #获取所有EMR集群数据
        #默认获取一页数据,每页最多显示20条集群信息
        client = QcloudApi(self.secretId,self.secretKey)
        params = {
            "Action":"EmrDescribeCluster",
            "PageNo": PageNo,
            "PageSize": 20
            }
        result = client.do_query(params,"emr.api.qcloud.com/v2/index.php")
        return result
    
    
    def EmrDescribeClusterNode(self,ClusterId,NodeFlag,PageNo=1):
        #获取单个EMR集群下指定节点名称的节点列表
        client = QcloudApi(self.secretId,self.secretKey)
        params = {
            "Action":"EmrDescribeClusterNode",
            "ClusterId":ClusterId,
            "NodeFlag":NodeFlag, #节点名称, 取值为:master,core,task,common,all
            "PageNo": PageNo,
            "PageSize": 20
            }
        result = client.do_query(params,"emr.api.qcloud.com/v2/index.php")
        return result
    
    def getCurrentNodes(self,clusterId):
        #获取当前所有EMR集群下的所有节点(不区分节点名称)
        #clusterList = self.getCurrentEmrs()
        emrNodeIps = []
        nodesCnt = 0
        PageNo = 1
        while True:
            print 'Getting nodes from',clusterId,'page',PageNo
            result = self.EmrDescribeClusterNode(clusterId,"all",PageNo)
            nodeList = result['data']['nodeList']
            for node in nodeList:
                emrNodeIps.append(node['ip'])
            nodesCnt += len(nodeList)
            totalCnt = result['data']['totalCnt']
            if nodesCnt < totalCnt:
                PageNo+=1
                continue
            else:
                break
        return emrNodeIps
    
    
    def getCurrentEmrs(self):
        #获取当前所有EMR集群的名称列表,如果超出默认页数则遍历所有结果页
        emrClusterList = []
        emrCnt = 0
        PageNo = 1
        while True:
            result = self.EmrDescribeCluster(PageNo)
            clusterList = result['data']['clusterList']
            for cluster in clusterList:
                emrClusterList.append(cluster['clusterId'])
            emrCnt += len(clusterList)
            totalCnt = result['data']['totalCnt']
            if emrCnt < totalCnt:
                PageNo+=1
                continue
            else:
                break
        return emrClusterList
    
    
    def getHostsInFile(self,data,prefix):
        #读取当前的Inventory信息,截取emr-recom下的所有ip数据用于和最新采集的ip列表进行匹配
        print 'prefix:',prefix
        rexContent = re.compile(r'(\[{0}\][\d+\.\d+\.\d+\.\d+\s*]*)'.format(prefix),re.S)
        rexIp = re.compile(r'(\d+\.\d+\.\d+\.\d+)')
        content = rexContent.findall(data)
        hostsEntry = content[0].strip('\n')
        hostsInFile = rexIp.findall(hostsEntry)
        return hostsEntry,hostsInFile

    def gitUpdate(self):
        cmd = 'cd /opt/projects/yoho-ansible-roles/;git pull;git add {0};git commit -m "Update bigdata inventory file"; git push'.format(self.inventoryFile)
        os.popen(cmd)
        return True
    
    def updateEMRInventory(self, emrName):
        #比对当前与最新的Inventory ip列表,相同则pass不同则更新
        today = datetime.date.today().strftime('%Y%m%d')
        hostsData = open(self.inventoryFile).read()
        hostsEntry,hostsInFile = self.getHostsInFile(hostsData,prefix=emrName)
        currentNodes = self.getCurrentNodes(clusterId=self.emrNameMap[emrName])
        if set(currentNodes) == set(hostsInFile):
            print 'Inventory already up to date'
            return 0
        else:
            print '\n'.join(currentNodes)
            N_toAdd = len(set(currentNodes) - set(hostsInFile))
            N_toDel = len(set(hostsInFile) - set(currentNodes))
            print '{0} to add,{1} to del'.format(N_toAdd,N_toDel)
            print 'Taking backup for current inventory file...'
            shutil.copyfile(self.inventoryFile,self.inventoryFile+'.'+today)
            print 'Replacing inventory file content...'
            newInventoryContent = hostsData.replace(hostsEntry,'[{0}]\n'.format(emrName)+'\n'.join(list(set(currentNodes))))
            with open(self.inventoryFile,'w') as f:
                f.write(newInventoryContent)
                f.close()
            print 'Pushing changes to git server...'
            self.gitUpdate()
            print 'Inventory file updated success'
            return 'Inventory 新增{0}条,删除{1}条'.format(N_toAdd,N_toDel)

    def start_daemon(self):
        #以守护进程的状态运行此EMR扫描程序
        while True:
            for emrName in self.emrNameMap:
                #try:
                    result = self.updateEMRInventory(emrName)
                    if result:
                        self.mailman.mail(receivers=self.receivers,Content='EMR大数据Inventory [{0}] 已更新\n{1}\n详见 http://git.yoho.cn/ops/yoho-ansible-roles/blob/master/inventories/bigdata/hosts'.format(emrName,result),Title='EMR Inventory 更新结果')
                    else:
                        pass
                #except Exception as e:
                #    print e
                #    self.mailman.mail(receivers=self.receivers,Content='EMR大数据Inventory更新失败\n{0}\n5分钟后重试'.format(str(e)),Title='EMR Inventory 更新结果')
                #    time.sleep(300)
                #    pass
            time.sleep(self.interval)
    

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--interval',type=int,help='Checks interval in seconds')
    args = parser.parse_args()
    EMRClusterScanner(args.interval).start_daemon()