emr_query.py
6.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# author tiexin.yang@yoho.cn
from qcloud_api import QcloudApi
from mailer import mailman
import datetime
import argparse
import os,shutil
import time
import re
class EMRClusterScanner(object):
def __init__(self,interval,secretId='',secretKey=''):
self.interval = int(interval) if int(interval)> 30 else 30 #扫描时间间隔必须大于30秒,避免浪费cpu资源以及api访问过于频繁 单位:秒
if not secretId or not secretKey:
self.secretId,self.secretKey = open('/home/txyang/.qcloud_config').read().strip('\n').split('\n')
else:
self.secretId,self.secretKey = secretId,secretKey
self.inventoryFile = '/opt/projects/yoho-ansible-roles/inventories/bigdata/hosts'
self.mailman = mailman()
self.receivers = ['tiexin.yang@yoho.cn','chunhua.zhang@yoho.cn']
self.emrNameMap = {
"emr-rt": "emr-r6bhtb5v",
"emr-ops": "emr-iaeloyc2",
"emr-dw": "emr-r6bhtb5v",
"emr-recom": "emr-ilj72ynu"
}
def EmrDescribeCluster(self,PageNo=1):
#获取所有EMR集群数据
#默认获取一页数据,每页最多显示20条集群信息
client = QcloudApi(self.secretId,self.secretKey)
params = {
"Action":"EmrDescribeCluster",
"PageNo": PageNo,
"PageSize": 20
}
result = client.do_query(params,"emr.api.qcloud.com/v2/index.php")
return result
def EmrDescribeClusterNode(self,ClusterId,NodeFlag,PageNo=1):
#获取单个EMR集群下指定节点名称的节点列表
client = QcloudApi(self.secretId,self.secretKey)
params = {
"Action":"EmrDescribeClusterNode",
"ClusterId":ClusterId,
"NodeFlag":NodeFlag, #节点名称, 取值为:master,core,task,common,all
"PageNo": PageNo,
"PageSize": 20
}
result = client.do_query(params,"emr.api.qcloud.com/v2/index.php")
return result
def getCurrentNodes(self,clusterId):
#获取当前所有EMR集群下的所有节点(不区分节点名称)
#clusterList = self.getCurrentEmrs()
emrNodeIps = []
nodesCnt = 0
PageNo = 1
while True:
print 'Getting nodes from',clusterId,'page',PageNo
result = self.EmrDescribeClusterNode(clusterId,"all",PageNo)
nodeList = result['data']['nodeList']
for node in nodeList:
emrNodeIps.append(node['ip'])
nodesCnt += len(nodeList)
totalCnt = result['data']['totalCnt']
if nodesCnt < totalCnt:
PageNo+=1
continue
else:
break
return emrNodeIps
def getCurrentEmrs(self):
#获取当前所有EMR集群的名称列表,如果超出默认页数则遍历所有结果页
emrClusterList = []
emrCnt = 0
PageNo = 1
while True:
result = self.EmrDescribeCluster(PageNo)
clusterList = result['data']['clusterList']
for cluster in clusterList:
emrClusterList.append(cluster['clusterId'])
emrCnt += len(clusterList)
totalCnt = result['data']['totalCnt']
if emrCnt < totalCnt:
PageNo+=1
continue
else:
break
return emrClusterList
def getHostsInFile(self,data,prefix):
#读取当前的Inventory信息,截取emr-recom下的所有ip数据用于和最新采集的ip列表进行匹配
print 'prefix:',prefix
rexContent = re.compile(r'(\[{0}\][\d+\.\d+\.\d+\.\d+\s*]*)'.format(prefix),re.S)
rexIp = re.compile(r'(\d+\.\d+\.\d+\.\d+)')
content = rexContent.findall(data)
hostsEntry = content[0].strip('\n')
hostsInFile = rexIp.findall(hostsEntry)
return hostsEntry,hostsInFile
def gitUpdate(self):
cmd = 'cd /opt/projects/yoho-ansible-roles/;git pull;git add {0};git commit -m "Update bigdata inventory file"; git push'.format(self.inventoryFile)
os.popen(cmd)
return True
def updateEMRInventory(self, emrName):
#比对当前与最新的Inventory ip列表,相同则pass不同则更新
today = datetime.date.today().strftime('%Y%m%d')
hostsData = open(self.inventoryFile).read()
hostsEntry,hostsInFile = self.getHostsInFile(hostsData,prefix=emrName)
currentNodes = self.getCurrentNodes(clusterId=self.emrNameMap[emrName])
if set(currentNodes) == set(hostsInFile):
print 'Inventory already up to date'
return 0
else:
print '\n'.join(currentNodes)
N_toAdd = len(set(currentNodes) - set(hostsInFile))
N_toDel = len(set(hostsInFile) - set(currentNodes))
print '{0} to add,{1} to del'.format(N_toAdd,N_toDel)
print 'Taking backup for current inventory file...'
shutil.copyfile(self.inventoryFile,self.inventoryFile+'.'+today)
print 'Replacing inventory file content...'
newInventoryContent = hostsData.replace(hostsEntry,'[{0}]\n'.format(emrName)+'\n'.join(list(set(currentNodes))))
with open(self.inventoryFile,'w') as f:
f.write(newInventoryContent)
f.close()
print 'Pushing changes to git server...'
self.gitUpdate()
print 'Inventory file updated success'
return 'Inventory 新增{0}条,删除{1}条'.format(N_toAdd,N_toDel)
def start_daemon(self):
#以守护进程的状态运行此EMR扫描程序
while True:
for emrName in self.emrNameMap:
#try:
result = self.updateEMRInventory(emrName)
if result:
self.mailman.mail(receivers=self.receivers,Content='EMR大数据Inventory [{0}] 已更新\n{1}\n详见 http://git.yoho.cn/ops/yoho-ansible-roles/blob/master/inventories/bigdata/hosts'.format(emrName,result),Title='EMR Inventory 更新结果')
else:
pass
#except Exception as e:
# print e
# self.mailman.mail(receivers=self.receivers,Content='EMR大数据Inventory更新失败\n{0}\n5分钟后重试'.format(str(e)),Title='EMR Inventory 更新结果')
# time.sleep(300)
# pass
time.sleep(self.interval)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--interval',type=int,help='Checks interval in seconds')
args = parser.parse_args()
EMRClusterScanner(args.interval).start_daemon()