fetch_service_config.py 文件源码

python
阅读 18 收藏 0 点赞 0 评论 0

项目:endpoints-tools 作者: cloudendpoints 项目源码 文件源码
def fetch_latest_rollout(management_service, service_name, access_token):
    """Fetch rollouts"""
    if access_token is None:
        headers = {}
    else:
        headers = {"Authorization": "Bearer {}".format(access_token)}

    client = urllib3.PoolManager(ca_certs=certifi.where())

    service_mgmt_url = SERVICE_MGMT_ROLLOUTS_URL_TEMPLATE.format(management_service,
                                                                 service_name)
    try:
        response = client.request("GET", service_mgmt_url, headers=headers)
    except:
        raise FetchError(1, "Failed to fetch rollouts")

    status_code = response.status
    if status_code != 200:
        message_template = ("Fetching rollouts failed "\
                            "(status code {}, reason {}, url {})")
        raise FetchError(1, message_template.format(status_code,
                                                    response.reason,
                                                    service_mgmt_url))
    rollouts = json.loads(response.data)
    # No valid rollouts
    if rollouts is None or \
      'rollouts' not in rollouts or \
      len(rollouts["rollouts"]) == 0 or \
      "rolloutId" not in rollouts["rollouts"][0] or \
      "trafficPercentStrategy" not in rollouts["rollouts"][0] or \
      "percentages" not in rollouts["rollouts"][0]["trafficPercentStrategy"]:
        message_template = ("Invalid rollouts response (url {}, data {})")
        raise FetchError(1, message_template.format(service_mgmt_url,
                                                    response.data))

    return rollouts["rollouts"][0]
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号