python类where()的实例源码

certs.py 文件源码 项目:jira_worklog_scanner 作者: pgarneau 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
fetch_service_config.py 文件源码 项目:endpoints-tools 作者: cloudendpoints 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def fetch_service_config_id(metadata):
    """Fetch service config ID from metadata URL."""
    url = metadata + _METADATA_PATH + "/attributes/" + _METADATA_SERVICE_CONFIG_ID
    headers = {"Metadata-Flavor": "Google"}
    client = urllib3.PoolManager(ca_certs=certifi.where())
    try:
        response = client.request("GET", url, headers=headers)
        if response.status != 200:
            # Fetching service config id is optional. No need to leave log
            raise None
    except:
        url = metadata + _METADATA_PATH + "/attributes/endpoints-service-version"
        try:
            response = client.request("GET", url, headers=headers)
        except:
            logging.info("Failed to fetch service config ID from the metadata server: " + url)
            return None

        if response.status != 200:
            message_template = "Fetching service config ID failed (url {}, status code {})"
            logging.info(message_template.format(url, response.status))
            return None

    version = response.data
    logging.info("Service config ID:" + version)
    return version
fetch_service_config.py 文件源码 项目:endpoints-tools 作者: cloudendpoints 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def fetch_latest_rollout(management_service, service_name, access_token):
    """Fetch rollouts"""
    if access_token is None:
        headers = {}
    else:
        headers = {"Authorization": "Bearer {}".format(access_token)}

    client = urllib3.PoolManager(ca_certs=certifi.where())

    service_mgmt_url = SERVICE_MGMT_ROLLOUTS_URL_TEMPLATE.format(management_service,
                                                                 service_name)
    try:
        response = client.request("GET", service_mgmt_url, headers=headers)
    except:
        raise FetchError(1, "Failed to fetch rollouts")

    status_code = response.status
    if status_code != 200:
        message_template = ("Fetching rollouts failed "\
                            "(status code {}, reason {}, url {})")
        raise FetchError(1, message_template.format(status_code,
                                                    response.reason,
                                                    service_mgmt_url))
    rollouts = json.loads(response.data)
    # No valid rollouts
    if rollouts is None or \
      'rollouts' not in rollouts or \
      len(rollouts["rollouts"]) == 0 or \
      "rolloutId" not in rollouts["rollouts"][0] or \
      "trafficPercentStrategy" not in rollouts["rollouts"][0] or \
      "percentages" not in rollouts["rollouts"][0]["trafficPercentStrategy"]:
        message_template = ("Invalid rollouts response (url {}, data {})")
        raise FetchError(1, message_template.format(service_mgmt_url,
                                                    response.data))

    return rollouts["rollouts"][0]
certs.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
certs.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
certs.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
certs.py 文件源码 项目:YoWhenReady 作者: jnsdrtlf 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
wsocket.py 文件源码 项目:smc-python 作者: gabstopper 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _get_ca_bundle():
    """  
    If verify=True, then requests is using the built in
    certifi CA database. Attempt to get that path for
    the websocket.
    """
    try:
        import certifi
        return certifi.where()
    except ImportError:
        pass
certs.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
certs.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
certs.py 文件源码 项目:purelove 作者: hucmosin 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
areas_code.py 文件源码 项目:ceres 作者: dicortazar 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def parse_es_section(parser, es_section):

    ES_config = namedtuple('ES_config',
                           ['es_read', 'es_write', 'es_read_git_index',
                            'es_write_git_index'])

    user = parser.get(es_section, 'user')
    password = parser.get(es_section, 'password')
    host = parser.get(es_section, 'host')
    port = parser.get(es_section, 'port')
    path = parser.get(es_section, 'path')
    es_read_git_index = parser.get(es_section, 'index_git_raw')

    host_output = parser.get(es_section, 'host_output')
    port_output = parser.get(es_section, 'port_output')
    user_output = parser.get(es_section, 'user_output')
    password_output = parser.get(es_section, 'password_output')
    path_output = parser.get(es_section, 'path_output')
    es_write_git_index = parser.get(es_section, 'index_git_output')

    connection_input = "https://" + user + ":" + password + "@" + host + ":"\
                       + port + "/" + path
    print("Input ES:", connection_input)
    es_read = Elasticsearch([connection_input], use_ssl=True, verity_certs=True,
                            ca_cert=certifi.where(), timeout=100)

    credentials = ""
    if user_output:
        credentials = user_output + ":" + password_output + "@"

    connection_output = "http://" + credentials + host_output + ":"\
                        + port_output + "/" + path_output
    # es_write = Elasticsearch([connection_output], use_ssl=True,
    #                           verity_certs=True, ca_cert=certifi.where(),
    #                           scroll='300m', timeout=100)
    print("Output ES:", connection_output)
    es_write = Elasticsearch([connection_output])

    return ES_config(es_read=es_read, es_write=es_write,
                     es_read_git_index=es_read_git_index,
                     es_write_git_index=es_write_git_index)
areas_code.py 文件源码 项目:ceres 作者: dicortazar 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def ESConnection():

    parser = configparser.ConfigParser()
    conf_file = '.settings'
    fd = open(conf_file, 'r')
    parser.readfp(fd)
    fd.close()

    sections = parser.sections()
    for section in sections:
        options = parser.options(section)
        for option in options:
            if option == 'user': user = parser.get(section, option)
            if option == 'password': password = parser.get(section, option)
            if option == 'host': host = parser.get(section, option)
            if option == 'port': port = parser.get(section, option)
            if option == 'path': path = parser.get(section, option)
            if option == 'index_git_raw': es_read_git_index = parser.get(section, option)

            if option == 'host_output': host_output = parser.get(section, option)
            if option == 'port_output': port_output = parser.get(section, option)
            if option == 'user_output': user_output = parser.get(section, option)
            if option == 'password_output': password_output = parser.get(section, option)
            if option == 'path_output': path_output = parser.get(section, option)
            if option == 'index_git_output': es_write_git_index = parser.get(section, option)


    connection = "https://" + user + ":" + password + "@" + host + ":" + port + "/" + path
    print (connection)
    es_read = Elasticsearch([connection], use_ssl=True, verity_certs=True, ca_cert=certifi.where(), scroll='300m', timeout=100)

    credentials = ""
    if user_output:
        credentials = user_output + ":" + password_output + "@"

    connection_output = "http://" + credentials + host_output + ":" + port_output + "/" + path_output
    #es_write = Elasticsearch([connection_output], use_ssl=True, verity_certs=True, ca_cert=certifi.where(), scroll='300m', timeout=100)
    es_write = Elasticsearch([connection_output])
    print (connection_output)

    return es_read, es_write, es_read_git_index, es_write_git_index
openstack_gender.py 文件源码 项目:ceres 作者: dicortazar 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def ESConnection():

    parser = configparser.ConfigParser()
    conf_file = 'settings'
    fd = open(conf_file, 'r')
    parser.readfp(fd)
    fd.close()

    sections = parser.sections()
    for section in sections:
        options = parser.options(section)
        for option in options:
            if option == 'user': user = parser.get(section, option)
            if option == 'password': password = parser.get(section, option)
            if option == 'host': host = parser.get(section, option)
            if option == 'port': port = parser.get(section, option)
            if option == 'path': path = parser.get(section, option)
            if option == 'genderize_key': genderize_key = parser.get(section, option)
            if option == 'index_gerrit_raw': es_read_gerrit_index = parser.get(section, option)
            if option == 'index_git_raw': es_read_git_index = parser.get(section, option)

            if option == 'host_output': host_output = parser.get(section, option)
            if option == 'port_output': port_output = parser.get(section, option)
            if option == 'user_output': user_output = parser.get(section, option)
            if option == 'password_output': password_output = parser.get(section, option)
            if option == 'path_output': path_output = parser.get(section, option)
            if option == 'index_gerrit_output': es_write_gerrit_index = parser.get(section, option)
            if option == 'index_git_output': es_write_git_index = parser.get(section, option)


    connection = "https://" + user + ":" + password + "@" + host + ":" + port + "/"
    print (connection)
    es_read = Elasticsearch([connection], use_ssl=True, verity_certs=True, ca_cert=certifi.where(), scroll='300m', timeout=100)

    connection_output = "https://" + user_output + ":" + password_output + "@" + host_output + ":" + port_output + "/" + path_output
    es_write = Elasticsearch([connection_output], use_ssl=True, verity_certs=True, ca_cert=certifi.where(), scroll='300m', timeout=100)

    return es_read, es_write, es_read_git_index, es_read_gerrit_index, \
           es_write_git_index, es_write_gerrit_index, genderize_key
certs.py 文件源码 项目:alexa-stackoverflow 作者: benvand 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
certs.py 文件源码 项目:devsecops-example-helloworld 作者: boozallen 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
certs.py 文件源码 项目:harbour-sailfinder 作者: DylanVanAssche 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
certs.py 文件源码 项目:harbour-sailfinder 作者: DylanVanAssche 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
ws_client.py 文件源码 项目:python-base 作者: kubernetes-client 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def read_all(self):
        """Return buffered data received on stdout and stderr channels.
        This is useful for non-interactive call where a set of command passed
        to the API call and their result is needed after the call is concluded.
        Should be called after run_forever() or update()

        TODO: Maybe we can process this and return a more meaningful map with
        channels mapped for each input.
        """
        out = self._all
        self._all = ""
        self._channels = {}
        return out
certs.py 文件源码 项目:QXSConsolas 作者: qxsch 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')


问题


面经


文章

微信
公众号

扫码关注公众号