qcloud_api.py
2.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
from ansible.module_utils.basic import *
import requests
import json
import random
import hmac
import base64
import hashlib
import time
import sys
reload(sys)
sys.setdefaultencoding('utf8')
class QcloudApi():
"""腾讯云API接口"""
URLS_DNS = 'cns.api.qcloud.com/v2/index.php'
'public request params'
Region = 'ap-beijing'
Version = 'YOHO-QCLOUD-V1.0'
'see doc: https://cloud.tencent.com/document/product/302/8517'
'init. must provide SecretId and SecretKey'
def __init__(self,secretId,secretKey):
self.secretId = secretId
self.secretKey = secretKey
def do_query(self, params, req_url=URLS_DNS, api_version=2):
'''
Do Query Qcloud api
:param params: 查询参数,dict类型,必须包含 Action
return: dict类型,腾讯云返回的消息体
'''
if 'Action' not in params:
#print("must provide [Action] params !")
raise ValueError(params)
if api_version == 2:
public_params = {'Timestamp': int(time.time()), 'Nonce': QcloudApi.generate_nonce(),'SecretId': self.secretId}
params = dict(params, **public_params)
sign = self.signature(params, req_url)
params['Signature'] = sign
response_dict = requests.post("https://" + req_url, data=params).json()
elif api_version == 3:
public_params = {'Region': QcloudApi.Region, 'Timestamp': int(time.time()), 'Nonce': int(QcloudApi.generate_nonce()), 'Version': '2017-03-12', 'SecretId': self.secretId, 'SignatureMethod':'HmacSHA1'}
# merge params
params = dict(params, **public_params)
# signature
req_url = req_url.rstrip('/')+'/'
sign = self.signature(params, req_url)
params['Signature'] = sign
response_dict = requests.post("https://" + req_url, data=params).json()
return response_dict
@staticmethod
def generate_nonce(length=8):
return ''.join([str(random.randint(0, 9)) for i in range(length)])
def signature(self, request_params, url):
kv_pairs = []
for key in sorted(request_params):
kv_pairs.append('%s=%s' % (key, request_params[key]))
str_req = '&'.join(kv_pairs)
str_full = 'POST%s?%s' % (url, str_req)
hmac_sign = hmac.new(bytes(self.secretKey), bytes(str_full), hashlib.sha1).digest()
return base64.b64encode(hmac_sign).decode()