emr_query.py 6.18 KB
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# author tiexin.yang@yoho.cn

from qcloud_api import QcloudApi
from mailer import mailman
import datetime
import argparse
import os,shutil
import time
import re


class EMRClusterScanner(object):
    def __init__(self,interval,secretId='',secretKey=''):
        self.interval = int(interval) if int(interval)> 30 else 30 #扫描时间间隔必须大于30秒,避免浪费cpu资源以及api访问过于频繁 单位:秒      
        if not secretId or not secretKey:
            self.secretId,self.secretKey = open('/home/txyang/.qcloud_config').read().strip('\n').split('\n')
        else:
            self.secretId,self.secretKey = secretId,secretKey

        self.inventoryFile = '/opt/projects/yoho-ansible-roles/inventories/bigdata/hosts'
        self.mailman = mailman()
        self.receivers = ['tiexin.yang@yoho.cn']


    def EmrDescribeCluster(self,PageNo=1):
        #获取所有EMR集群数据
        #默认获取一页数据,每页最多显示20条集群信息
        client = QcloudApi(self.secretId,self.secretKey)
        params = {
            "Action":"EmrDescribeCluster",
            "PageNo": PageNo,
            "PageSize": 20
            }
        result = client.do_query(params,"emr.api.qcloud.com/v2/index.php")
        return result
    
    
    def EmrDescribeClusterNode(self,ClusterId,NodeFlag,PageNo=1):
        #获取单个EMR集群下指定节点名称的节点列表
        client = QcloudApi(self.secretId,self.secretKey)
        params = {
            "Action":"EmrDescribeClusterNode",
            "ClusterId":ClusterId,
            "NodeFlag":NodeFlag, #节点名称, 取值为:master,core,task,common,all
            "PageNo": PageNo,
            "PageSize": 20
            }
        result = client.do_query(params,"emr.api.qcloud.com/v2/index.php")
        return result
    
    def getCurrentNodes(self):
        #获取当前所有EMR集群下的所有节点(不区分节点名称)
        clusterList = self.getCurrentEmrs()
        emrNodeIps = []
        nodesCnt = 0
        PageNo = 1
        for clusterId in clusterList:
            while True:
                print 'Getting nodes from',clusterId,'page',PageNo
                result = self.EmrDescribeClusterNode(clusterId,"all",PageNo)
                nodeList = result['data']['nodeList']
                for node in nodeList:
                    emrNodeIps.append(node['ip'])
                nodesCnt += len(nodeList)
                totalCnt = result['data']['totalCnt']
                if nodesCnt < totalCnt:
                    PageNo+=1
                    continue
                else:
                    break
            PageNo = 1
        return emrNodeIps
    
    
    def getCurrentEmrs(self):
        #获取当前所有EMR集群的名称列表,如果超出默认页数则遍历所有结果页
        emrClusterList = []
        emrCnt = 0
        PageNo = 1
        while True:
            result = self.EmrDescribeCluster(PageNo)
            clusterList = result['data']['clusterList']
            for cluster in clusterList:
                emrClusterList.append(cluster['clusterId'])
            emrCnt += len(clusterList)
            totalCnt = result['data']['totalCnt']
            if emrCnt < totalCnt:
                PageNo+=1
                continue
            else:
                break
        return emrClusterList
    
    
    def getHostsInFile(self,data):
        #读取当前的Inventory信息,截取emr-recom下的所有ip数据用于和最新采集的ip列表进行匹配
        rex = re.compile(r'\[emr-recom\]([\d+\.\d+\.\d+\.\d+\s*]*)',re.S)
        hostsEntry = rex.findall(data)[0].strip('\n')
        hostsInFile = hostsEntry.split('\n')
        return hostsEntry,hostsInFile

    def gitUpdate(self):
        cmd = 'cd /opt/projects/yoho-ansible-roles/;git pull;git add {0};git commit -m "Update bigdata inventory file"; git push'.format(self.inventoryFile)
        os.popen(cmd)
        return True
    
    def updateEMRInventory(self):
        #比对当前与最新的Inventory ip列表,相同则pass不同则更新
        today = datetime.date.today().strftime('%Y%m%d')
        hostsData = open(self.inventoryFile).read()
        hostsEntry,hostsInFile = self.getHostsInFile(hostsData)
        currentNodes = self.getCurrentNodes()
        if set(currentNodes) == set(hostsInFile):
            print 'Inventory already up to date'
            return 0
        else:
            N_toAdd = len(set(currentNodes) - set(hostsInFile))
            N_toDel = len(set(hostsInFile) - set(currentNodes))
            print 'Taking backup for current inventory file...'
            shutil.copyfile(self.inventoryFile,self.inventoryFile+'.'+today)
            print 'Replacing inventory file content...'
            newInventoryContent = hostsData.replace(hostsEntry,'\n'.join(list(set(currentNodes))))
            with open(self.inventoryFile,'w') as f:
                f.write(newInventoryContent)
                f.close()
            print 'Pushing changes to git server...'
            self.gitUpdate()
            print 'Inventory file updated success'
            return 'Inventory 新增{0}条,删除{1}条'.format(N_toAdd,N_toDel)

    def start_daemon(self):
        #以守护进程的状态运行此EMR扫描程序
        while True:
            try:
                result = self.updateEMRInventory()
                if result:
                    self.mailman.mail(receivers=self.receivers,Content='EMR大数据Inventory已更新\n{0}\n详见 http://git.yoho.cn/ops/yoho-ansible-roles/blob/master/inventories/bigdata/hosts'.format(result),Title='EMR Inventory 更新结果')
                else:
                    pass
            except Exception as e:
                print e
                self.mailman.mail(receivers=self.receivers,Content='EMR大数据Inventory更新失败\n{0}\n5分钟后重试'.format(str(e)),Title='EMR Inventory 更新结果')
                time.sleep(300)
                pass
            time.sleep(self.interval)
    

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--interval',type=int,help='Checks interval in seconds')
    args = parser.parse_args()
    EMRClusterScanner(args.interval).start_daemon()