qcloud_api.py 2.64 KB
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# author chunhua.zhang@yoho.cn
import random
import hmac
import base64
import hashlib
import time
import requests


class QcloudApi:
    """腾讯云API接口"""

    'pre-defined url'
    'load balance'
    URLS_lb = 'lb.api.qcloud.com/v2/index.php'
    URLS_DNS = 'cns.api.qcloud.com/v2/index.php'

    'public request params'
    Region = 'ap-beijing'
    Version = 'YOHO-QCLOUD-V1.0'

    'see doc: https://www.qcloud.com/document/api/377/4153'
    'init. must provide SecretId and SecretKey'
    def __init__(self, secretId, secretKey):
        self.secretId = secretId
        self.secretKey = secretKey

    def do_query(self, params, req_url=URLS_lb):
        """
        Do Query Qcloud api
        :param params: 查询参数,dict类型,必须包含 Action
        :param req_url: 查询的地址,参考:qcloud_api.URLS_lb --> lb.api.qcloud.com/v2/index.php
        :return: dict类型,腾讯云返回的消息体
        """
        if 'Action' not in params:
            print("must provide [Action] params !")
            raise ValueError(params)

        public_params = {'RequestClient': QcloudApi.Version, 'limit': 100, 'Region': QcloudApi.Region, 'Timestamp': int(time.time()), 'Nonce': QcloudApi.generate_nonce(), 'SecretId': self.secretId}
        # merge params
        params = dict(params, **public_params)
        # signature
        sign = self.signature(params, req_url)
        params['Signature'] = sign
        response_dict = requests.post("https://" + req_url, data=params).json()
        print("Query qcloud api success. params: %s, result: %s" % (params, response_dict))
        return response_dict

    @staticmethod
    def generate_nonce(length=8):
        """Generate pseudorandom number."""
        return ''.join([str(random.randint(0, 9)) for i in range(length)])

    def signature(self, request_params, url):
        """
           对消息进行签名 see https://www.qcloud.com/document/api/377/4214
           @param request_params
           @param url, 请求地址,例如:lb.api.qcloud.com/v2/index.php
          """
        # sort dict by key
        kv_pairs = []
        for key in sorted(request_params):
            kv_pairs.append('%s=%s' % (key, request_params[key]))
        str_req = '&'.join(kv_pairs)
        str_full = 'POST%s?%s' % (url, str_req)
        hmac_sign = hmac.new(bytes(self.secretKey), bytes(str_full), hashlib.sha1).digest()
        return base64.b64encode(hmac_sign).decode()

# for test
if __name__ == "__main__":
    api = QcloudApi(secretId='AK**K', secretKey='AC**QaW')
    api.do_query(params={'Action': 'DescribeLoadBalancers'}, req_url=QcloudApi.URLS_lb)