Authored by txyang

Update emr query

... ... @@ -3,102 +3,152 @@
# author tiexin.yang@yoho.cn
from qcloud_api import QcloudApi
from mailer import mailman
import datetime
import argparse
import os,shutil
import time
import re
secretId = "************"
secretKey = "*****************"
def EmrDescribeCluster(PageNo=1):
client = QcloudApi(secretId,secretKey)
params = {
"Action":"EmrDescribeCluster",
"PageNo": PageNo,
"PageSize": 20
}
result = client.do_query(params,"emr.api.qcloud.com/v2/index.php")
return result
class EMRClusterScanner(object):
def __init__(self,interval,secretId='',secretKey=''):
self.interval = int(interval) if int(interval)> 30 else 30 #扫描时间间隔必须大于30秒,避免浪费cpu资源以及api访问过于频繁 单位:秒
if not secretId or not secretKey:
self.secretId,self.secretKey = open('~/.qcloud_config').read().strip('\n').split('\n')
else:
self.secretId,self.secretKey = secretId,secretKey
self.inventoryFile = '/opt/projects/yoho-ansible-roles/inventories/bigdata/hosts'
self.mailman = mailman()
self.receivers = ['tiexin.yang@yoho.cn']
def EmrDescribeClusterNode(ClusterId, NodeFlag, PageNo=1):
client = QcloudApi(secretId,secretKey)
params = {
"Action":"EmrDescribeClusterNode",
"ClusterId":ClusterId,
"NodeFlag":NodeFlag, #节点名称, 取值为:master,core,task,common,all
"PageNo": PageNo,
"PageSize": 20
}
result = client.do_query(params,"emr.api.qcloud.com/v2/index.php")
return result
def getCurrentNodes():
clusterList = getCurrentEmrs()
emrNodeIps = []
nodesCnt = 0
PageNo = 1
for clusterId in clusterList:
def EmrDescribeCluster(self,PageNo=1):
#获取所有EMR集群数据
#默认获取一页数据,每页最多显示20条集群信息
client = QcloudApi(secretId,secretKey)
params = {
"Action":"EmrDescribeCluster",
"PageNo": PageNo,
"PageSize": 20
}
result = client.do_query(params,"emr.api.qcloud.com/v2/index.php")
return result
def EmrDescribeClusterNode(self,ClusterId,NodeFlag,PageNo=1):
#获取单个EMR集群下指定节点名称的节点列表
client = QcloudApi(self.secretId,self.secretKey)
params = {
"Action":"EmrDescribeClusterNode",
"ClusterId":ClusterId,
"NodeFlag":NodeFlag, #节点名称, 取值为:master,core,task,common,all
"PageNo": PageNo,
"PageSize": 20
}
result = client.do_query(params,"emr.api.qcloud.com/v2/index.php")
return result
def getCurrentNodes(self):
#获取当前所有EMR集群下的所有节点(不区分节点名称)
clusterList = self.getCurrentEmrs()
emrNodeIps = []
nodesCnt = 0
PageNo = 1
for clusterId in clusterList:
while True:
print 'Getting nodes from',clusterId,'page',PageNo
result = self.EmrDescribeClusterNode(clusterId,"all",PageNo)
nodeList = result['data']['nodeList']
for node in nodeList:
emrNodeIps.append(node['ip'])
nodesCnt += len(nodeList)
totalCnt = result['data']['totalCnt']
if nodesCnt < totalCnt:
PageNo+=1
continue
else:
break
PageNo = 1
return emrNodeIps
def getCurrentEmrs(self):
#获取当前所有EMR集群的名称列表,如果超出默认页数则遍历所有结果页
emrClusterList = []
emrCnt = 0
PageNo = 1
while True:
print 'Getting nodes from',clusterId,'page',PageNo
result = EmrDescribeClusterNode(clusterId,"all",PageNo)
nodeList = result['data']['nodeList']
for node in nodeList:
emrNodeIps.append(node['ip'])
nodesCnt += len(nodeList)
result = self.EmrDescribeCluster(PageNo)
clusterList = result['data']['clusterList']
for cluster in clusterList:
emrClusterList.append(cluster['clusterId'])
emrCnt += len(clusterList)
totalCnt = result['data']['totalCnt']
if nodesCnt < totalCnt:
if emrCnt < totalCnt:
PageNo+=1
continue
else:
break
PageNo = 1
return emrNodeIps
return emrClusterList
def getHostsInFile(self,data):
#读取当前的Inventory信息,截取emr-recom下的所有ip数据用于和最新采集的ip列表进行匹配
rex = re.compile(r'\[emr-recom\]([\d+\.\d+\.\d+\.\d+\s*]*)',re.S)
hostsEntry = rex.findall(data)[0].strip('\n')
hostsInFile = hostsEntry.split('\n')
return hostsEntry,hostsInFile
def getCurrentEmrs():
emrClusterList = []
emrCnt = 0
PageNo = 1
while True:
result = EmrDescribeCluster(PageNo)
clusterList = result['data']['clusterList']
for cluster in clusterList:
emrClusterList.append(cluster['clusterId'])
emrCnt += len(clusterList)
totalCnt = result['data']['totalCnt']
if emrCnt < totalCnt:
PageNo+=1
continue
def gitUpdate(self):
cmd = 'cd /opt/projects/yoho-ansible-roles/;git add {0};git commit -m "Update bigdata inventory file"; git push;sleep(2)'.format(self.inventoryFile)
os.popen(cmd)
return True
def updateEMRInventory(self):
#比对当前与最新的Inventory ip列表,相同则pass不同则更新
today = datetime.date.today().strftime('%Y%m%d')
hostsData = open(self.inventoryFile).read()
hostsEntry,hostsInFile = getHostsInFile(hostsData)
currentNodes = self.getCurrentNodes()
if set(currentNodes) == set(hostsInFile):
print 'Inventory already up to date'
return 0
else:
break
print emrClusterList
return emrClusterList
N_toAdd = len(set(currentNodes) - set(hostsInFile))
N_toDel = len(set(hostsInFile) - set(currentNodes))
print 'Taking backup for current inventory file...'
shutil.copyfile(self.inventoryFile,self.inventoryFile+'.'+today)
print 'Replacing inventory file content...'
newInventoryContent = hostsData.replace(hostsEntry,'\n'.join(list(set(currentNodes))))
with open(self.inventoryFile,'w') as f:
f.write(newInventoryContent)
f.close()
print 'Pushing changes to git server...'
self.gitUpdate()
print 'Inventory file updated success'
return 'Inventory 新增{0}条,删除{1}条'.format(N_toAdd,N_toDel)
def getHostsInFile(data):
rex = re.compile(r'\[emr-recom\]([\d+\.\d+\.\d+\.\d+\s*]*)',re.S)
hostsEntry = rex.findall(data)[0].strip('\n')
hostsInFile = hostsEntry.split('\n')
return hostsEntry,hostsInFile
def updateEMRInventory():
hostsData = open('/home/ansible/yoho-ansible-roles/inventories/bigdata/hosts').read()
hostsEntry,hostsInFile = getHostsInFile(hostsData)
currentNodes = getCurrentNodes()
if set(currentNodes) == set(hostsInFile):
print 'Inventory already up to date'
return 0
else:
print 'Replacing inventory file content...'
newInventoryContent = hostsData.replace(hostsEntry,'\n'.join(list(set(currentNodes))))
with open('new.hosts','w') as f:
f.write(newInventoryContent)
f.close()
print 'Inventory file updated success'
return 0
def start_daemon(self):
#以守护进程的状态运行此EMR扫描程序
while True:
try:
result = self.updateEMRInventory()
if result:
self.mailman.mail(rceivers=self.receivers,Content='EMR大数据Inventory已更新\n{0}\n详见 http://git.yoho.cn/ops/yoho-ansible-roles/blob/master/inventories/bigdata/hosts'.format(result),Title='EMR Inventory 更新结果')
else:
pass
except Exception as e:
print e
self.mailman.mail(rceivers=self.receivers,Content='EMR大数据Inventory更新失败\n{0}\n5分钟后重试'.format(str(e)),Title='EMR Inventory 更新结果')
time.sleep(300)
pass
time.sleep(self.interval)
if __name__ == '__main__':
updateEMRInventory()
#getCurrentEmrs()
parser = argparse.ArgumentParser()
parser.add_argument('interval',type=int,help='Checks interval in seconds')
args = parser.parse_args()
EMRClusterScanner(args.interval).start_daemon()
... ...