sync_bigdata_inventory.py 10.3 KB
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# author tiexin.yang@yoho.cn

from qcloud.qcloud_api import QcloudApi
from qcloud.mailer import mailman
import json
import datetime
import argparse
import os,shutil
import time
import re

"""
  作用:由于EMR集群不定时会发生伸缩变化,该脚本可以定时维护inventories/bigdata/hosts文件,如果发生变更会同步更新该文件并推送到git.yoho.cn,并执行resolv的playbook同步DNS
  依赖:zabbix.py(注销zabbix host)
  Demo:
        python server_decomission.py 1.2.3.4
"""

class EMRClusterScanner(object):
    def __init__(self,interval,secretId='',secretKey=''):
        self.interval = int(interval) #扫描时间间隔 若为0则只扫描一次 单位:秒 
        if not secretId or not secretKey:
            self.secretId,self.secretKey = open('/home/ansible/.qcloud_config').read().strip('\n').split('\n')
        else:
            self.secretId,self.secretKey = secretId,secretKey

        self.workDir = '/home/ansible/yoho-ansible-roles/'
        self.inventoryFile = '{0}inventories/bigdata/hosts'.format(self.workDir)
        self.playbook_resolv = '{0}playbooks/resolv.yml'.format(self.workDir)
        self.mailman = mailman()
        self.receivers = ['tiexin.yang@yoho.cn','kun.xiang@yoho.cn','chunhua.zhang@yoho.cn'] #已禁用
        self.emrNameMap = {
            "emr-rt": "emr-r6bhtb5v",
            "emr-ops": "emr-iaeloyc2",
            "emr-dw": "emr-r6bhtb5v",
            "emr-recom": "emr-ilj72ynu"
        }


    def EmrDescribeCluster(self,PageNo=1):
        #获取所有EMR集群数据
        #默认获取一页数据,每页最多显示20条集群信息
        client = QcloudApi(self.secretId,self.secretKey)
        params = {
            "Action":"EmrDescribeCluster",
            "PageNo": PageNo,
            "PageSize": 20
            }
        result = client.do_query(params,"emr.api.qcloud.com/v2/index.php")
        return result
    
    
    def EmrDescribeClusterNode(self,ClusterId,NodeFlag,PageNo=1):
        #获取单个EMR集群下指定节点名称的节点列表
        client = QcloudApi(self.secretId,self.secretKey)
        params = {
            "Action":"EmrDescribeClusterNode",
            "ClusterId":ClusterId,
            "NodeFlag":NodeFlag, #节点名称, 取值为:master,core,task,common,all
            "PageNo": PageNo,
            "PageSize": 20
            }
        result = client.do_query(params,"emr.api.qcloud.com/v2/index.php")
        return result
    
    def getCurrentNodes(self,clusterId):
        #获取当前所有EMR集群下的所有节点(不区分节点名称)
        #clusterList = self.getCurrentEmrs()
        emrNodeIps = []
        nodesCnt = 0
        PageNo = 1
        while True:
            print 'Getting nodes from',clusterId,'page',PageNo
            result = self.EmrDescribeClusterNode(clusterId,"all",PageNo)
            nodeList = result['data']['nodeList']
            for node in nodeList:
                emrNodeIps.append(node['ip'])
            nodesCnt += len(nodeList)
            totalCnt = result['data']['totalCnt']
            if nodesCnt < totalCnt:
                PageNo+=1
                continue
            else:
                break
        return emrNodeIps
    
    
    def getCurrentEmrs(self):
        #获取当前所有EMR集群的名称列表,如果超出默认页数则遍历所有结果页
        emrClusterList = []
        emrCnt = 0
        PageNo = 1
        while True:
            result = self.EmrDescribeCluster(PageNo)
            clusterList = result['data']['clusterList']
            for cluster in clusterList:
                emrClusterList.append(cluster['clusterId'])
            emrCnt += len(clusterList)
            totalCnt = result['data']['totalCnt']
            if emrCnt < totalCnt:
                PageNo+=1
                continue
            else:
                break
        return emrClusterList
    
    
    def getHostsInFile(self,data,prefix):
        #读取当前的Inventory信息,截取emr-recom下的所有ip数据用于和最新采集的ip列表进行匹配
        rexContent = re.compile(r'(\[{0}\][\d+\.\d+\.\d+\.\d+\s*]*)'.format(prefix),re.S)
        rexIp = re.compile(r'(\d+\.\d+\.\d+\.\d+)')
        content = rexContent.findall(data)
        hostsEntry = content[0].strip('\n')
        hostsInFile = rexIp.findall(hostsEntry)
        return hostsEntry,hostsInFile

    def gitUpdate(self):
        cmd = 'cd {0};git pull;git add {1};git commit -m "Update bigdata inventory file"; git push'.format(self.workDir,self.inventoryFile)
        if os.system(cmd) != 0:
            return False
        else:
            return True

    def syncDNS(self,target='emr-recom'):
        if target == 'emr-recom': #目前只负责同步emr-recom的DNS
            cmd = 'ansible-playbook -i {0} {1}'.format(self.inventoryFile,self.playbook_resolv)
            retry = 3
            while retry > 0:
                print cmd
                output = os.popen(cmd).read()
                if 'unreachable=1' in output or 'failed=1' in output or 'changed=1' not in output:
                    with open('/var/log/supervisor/ansible.err','a') as f:
                        f.write(output+'\n\n')
                        f.close()
                    retry-=1
                    print "重新同步DNS...还剩{0}次".format(retry)
                    time.sleep(300)
                    continue
                else:
                    with open('/var/log/supervisor/ansible.log','a') as f:
                        f.write(output+'\n\n')
                        f.close()
                    print "DNS 同步成功"
                    return True
            return False
        else:
            return True

    def checkDNS(self):
        check_cmd = 'ansible-playbook -i {0} {1} --check'.format(self.inventoryFile,self.playbook_resolv)
        execute_cmd = 'ansible-playbook -i {0} {1}'.format(self.inventoryFile,self.playbook_resolv)
        output = os.popen(check_cmd).read()
        if 'changed=1' in output:
            print '检测到未变更主机!'
            print '开始执行变更'
            os.popen(execute_cmd)
        else:
            print '主机配置已最新'
        return True
        
    
    def updateEMRInventory(self, emrName):
        #比对当前与最新的Inventory ip列表,相同则pass不同则更新
        today = datetime.date.today().strftime('%Y%m%d')
        hostsData = open(self.inventoryFile).read()
        hostsEntry,hostsInFile = self.getHostsInFile(hostsData,prefix=emrName)
        currentNodes = self.getCurrentNodes(clusterId=self.emrNameMap[emrName])
        if set(currentNodes) == set(hostsInFile):
            print 'Inventory already up to date'
            return 'Passed'
        else:
            #print '\n'.join(currentNodes)
            N_toAdd = len(set(currentNodes) - set(hostsInFile))
            N_toDel = len(set(hostsInFile) - set(currentNodes))
            print '{0} to add,{1} to del'.format(N_toAdd,N_toDel)
            print 'Taking backup for current inventory file...'
            shutil.copyfile(self.inventoryFile,self.inventoryFile+'.'+today)
            print 'Replacing inventory file content...'
            newInventoryContent = hostsData.replace(hostsEntry,'[{0}]\n'.format(emrName)+'\n'.join(list(set(currentNodes))))
            with open(self.inventoryFile,'w') as f:
                f.write(newInventoryContent)
                f.close()
            print 'Pushing changes to git server...'
            updateResult = self.gitUpdate()
            if updateResult:
                pass
            else:
                print 'Inventory file updated failed'
                return False
            
            print 'Inventory file updated success'
            if N_toAdd > 0:
                print 'Running playbook to sync DNS...'
                syncResult = self.syncDNS(emrName)
                if syncResult:
                print 'Running playbook to sync DNS...'
                syncResult = self.syncDNS(emrName)
                if syncResult:
                    print 'DNS synced'
                else:
                    print 'DNS sync failed'
                    return False

            return 'Inventory 新增{0}条,删除{1}条'.format(N_toAdd,N_toDel)

    def start_daemon(self):
        #以守护进程的状态运行此EMR扫描程序
        while True:
            self.checkDNS()
            for emrName in self.emrNameMap:
                try:
                    result = self.updateEMRInventory(emrName)
                    if result:
                        if result.upper() == 'PASSED':
                            print 'Nothing changed, passed'
                            continue
                        else:
                            print 'EMR大数据Inventory [{0}] 已更新\n{1}\n详见 http://git.yoho.cn/ops/yoho-ansible-roles/blob/master/inventories/bigdata/hosts'.format(emrName,result)
                            #self.mailman.mail(receivers=self.receivers,Content='EMR大数据Inventory [{0}] 已更新\n{1}\n详见 http://git.yoho.cn/ops/yoho-ansible-roles/blob/master/inventories/bigdata/hosts'.format(emrName,result),Title='EMR Inventory 更新结果')
                    else:
                        print 'EMR大数据Inventory更新失败\n请检查本地git缓存和ansible连接\n5分钟后重试'
                        #self.mailman.mail(receivers=self.receivers,Content='EMR大数据Inventory更新失败\n请检查本地git缓存和ansible连接\n5分钟后重试',Title='EMR Inventory 更新结果')
                        time.sleep(300)
                        continue
                except Exception as e:
                    print e
                    #self.mailman.mail(receivers=self.receivers,Content='EMR大数据Inventory更新失败\n{0}\n5分钟后重试'.format(str(e)),Title='EMR Inventory 更新结果')
                    print 'EMR大数据Inventory更新失败\n{0}\n5分钟后重试'.format(str(e))
                    time.sleep(300)
                    pass
            if self.interval > 0:
                time.sleep(self.interval)
            else:
                break
    

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--interval',type=int,help='Checks interval in seconds')
    args = parser.parse_args()
    EMRClusterScanner(args.interval).start_daemon()