def where():
"""Return the preferred certificate bundle."""
# vendored bundle inside Requests
return os.path.join(os.path.dirname(__file__), 'cacert.pem')
python类where()的实例源码
def fetch_service_config_id(metadata):
"""Fetch service config ID from metadata URL."""
url = metadata + _METADATA_PATH + "/attributes/" + _METADATA_SERVICE_CONFIG_ID
headers = {"Metadata-Flavor": "Google"}
client = urllib3.PoolManager(ca_certs=certifi.where())
try:
response = client.request("GET", url, headers=headers)
if response.status != 200:
# Fetching service config id is optional. No need to leave log
raise None
except:
url = metadata + _METADATA_PATH + "/attributes/endpoints-service-version"
try:
response = client.request("GET", url, headers=headers)
except:
logging.info("Failed to fetch service config ID from the metadata server: " + url)
return None
if response.status != 200:
message_template = "Fetching service config ID failed (url {}, status code {})"
logging.info(message_template.format(url, response.status))
return None
version = response.data
logging.info("Service config ID:" + version)
return version
def fetch_latest_rollout(management_service, service_name, access_token):
"""Fetch rollouts"""
if access_token is None:
headers = {}
else:
headers = {"Authorization": "Bearer {}".format(access_token)}
client = urllib3.PoolManager(ca_certs=certifi.where())
service_mgmt_url = SERVICE_MGMT_ROLLOUTS_URL_TEMPLATE.format(management_service,
service_name)
try:
response = client.request("GET", service_mgmt_url, headers=headers)
except:
raise FetchError(1, "Failed to fetch rollouts")
status_code = response.status
if status_code != 200:
message_template = ("Fetching rollouts failed "\
"(status code {}, reason {}, url {})")
raise FetchError(1, message_template.format(status_code,
response.reason,
service_mgmt_url))
rollouts = json.loads(response.data)
# No valid rollouts
if rollouts is None or \
'rollouts' not in rollouts or \
len(rollouts["rollouts"]) == 0 or \
"rolloutId" not in rollouts["rollouts"][0] or \
"trafficPercentStrategy" not in rollouts["rollouts"][0] or \
"percentages" not in rollouts["rollouts"][0]["trafficPercentStrategy"]:
message_template = ("Invalid rollouts response (url {}, data {})")
raise FetchError(1, message_template.format(service_mgmt_url,
response.data))
return rollouts["rollouts"][0]
def where():
"""Return the preferred certificate bundle."""
# vendored bundle inside Requests
return os.path.join(os.path.dirname(__file__), 'cacert.pem')
def where():
"""Return the preferred certificate bundle."""
# vendored bundle inside Requests
return os.path.join(os.path.dirname(__file__), 'cacert.pem')
def where():
"""Return the preferred certificate bundle."""
# vendored bundle inside Requests
return os.path.join(os.path.dirname(__file__), 'cacert.pem')
def where():
"""Return the preferred certificate bundle."""
# vendored bundle inside Requests
return os.path.join(os.path.dirname(__file__), 'cacert.pem')
def _get_ca_bundle():
"""
If verify=True, then requests is using the built in
certifi CA database. Attempt to get that path for
the websocket.
"""
try:
import certifi
return certifi.where()
except ImportError:
pass
def where():
"""Return the preferred certificate bundle."""
# vendored bundle inside Requests
return os.path.join(os.path.dirname(__file__), 'cacert.pem')
def where():
"""Return the preferred certificate bundle."""
# vendored bundle inside Requests
return os.path.join(os.path.dirname(__file__), 'cacert.pem')
def where():
"""Return the preferred certificate bundle."""
# vendored bundle inside Requests
return os.path.join(os.path.dirname(__file__), 'cacert.pem')
def parse_es_section(parser, es_section):
ES_config = namedtuple('ES_config',
['es_read', 'es_write', 'es_read_git_index',
'es_write_git_index'])
user = parser.get(es_section, 'user')
password = parser.get(es_section, 'password')
host = parser.get(es_section, 'host')
port = parser.get(es_section, 'port')
path = parser.get(es_section, 'path')
es_read_git_index = parser.get(es_section, 'index_git_raw')
host_output = parser.get(es_section, 'host_output')
port_output = parser.get(es_section, 'port_output')
user_output = parser.get(es_section, 'user_output')
password_output = parser.get(es_section, 'password_output')
path_output = parser.get(es_section, 'path_output')
es_write_git_index = parser.get(es_section, 'index_git_output')
connection_input = "https://" + user + ":" + password + "@" + host + ":"\
+ port + "/" + path
print("Input ES:", connection_input)
es_read = Elasticsearch([connection_input], use_ssl=True, verity_certs=True,
ca_cert=certifi.where(), timeout=100)
credentials = ""
if user_output:
credentials = user_output + ":" + password_output + "@"
connection_output = "http://" + credentials + host_output + ":"\
+ port_output + "/" + path_output
# es_write = Elasticsearch([connection_output], use_ssl=True,
# verity_certs=True, ca_cert=certifi.where(),
# scroll='300m', timeout=100)
print("Output ES:", connection_output)
es_write = Elasticsearch([connection_output])
return ES_config(es_read=es_read, es_write=es_write,
es_read_git_index=es_read_git_index,
es_write_git_index=es_write_git_index)
def ESConnection():
parser = configparser.ConfigParser()
conf_file = '.settings'
fd = open(conf_file, 'r')
parser.readfp(fd)
fd.close()
sections = parser.sections()
for section in sections:
options = parser.options(section)
for option in options:
if option == 'user': user = parser.get(section, option)
if option == 'password': password = parser.get(section, option)
if option == 'host': host = parser.get(section, option)
if option == 'port': port = parser.get(section, option)
if option == 'path': path = parser.get(section, option)
if option == 'index_git_raw': es_read_git_index = parser.get(section, option)
if option == 'host_output': host_output = parser.get(section, option)
if option == 'port_output': port_output = parser.get(section, option)
if option == 'user_output': user_output = parser.get(section, option)
if option == 'password_output': password_output = parser.get(section, option)
if option == 'path_output': path_output = parser.get(section, option)
if option == 'index_git_output': es_write_git_index = parser.get(section, option)
connection = "https://" + user + ":" + password + "@" + host + ":" + port + "/" + path
print (connection)
es_read = Elasticsearch([connection], use_ssl=True, verity_certs=True, ca_cert=certifi.where(), scroll='300m', timeout=100)
credentials = ""
if user_output:
credentials = user_output + ":" + password_output + "@"
connection_output = "http://" + credentials + host_output + ":" + port_output + "/" + path_output
#es_write = Elasticsearch([connection_output], use_ssl=True, verity_certs=True, ca_cert=certifi.where(), scroll='300m', timeout=100)
es_write = Elasticsearch([connection_output])
print (connection_output)
return es_read, es_write, es_read_git_index, es_write_git_index
def ESConnection():
parser = configparser.ConfigParser()
conf_file = 'settings'
fd = open(conf_file, 'r')
parser.readfp(fd)
fd.close()
sections = parser.sections()
for section in sections:
options = parser.options(section)
for option in options:
if option == 'user': user = parser.get(section, option)
if option == 'password': password = parser.get(section, option)
if option == 'host': host = parser.get(section, option)
if option == 'port': port = parser.get(section, option)
if option == 'path': path = parser.get(section, option)
if option == 'genderize_key': genderize_key = parser.get(section, option)
if option == 'index_gerrit_raw': es_read_gerrit_index = parser.get(section, option)
if option == 'index_git_raw': es_read_git_index = parser.get(section, option)
if option == 'host_output': host_output = parser.get(section, option)
if option == 'port_output': port_output = parser.get(section, option)
if option == 'user_output': user_output = parser.get(section, option)
if option == 'password_output': password_output = parser.get(section, option)
if option == 'path_output': path_output = parser.get(section, option)
if option == 'index_gerrit_output': es_write_gerrit_index = parser.get(section, option)
if option == 'index_git_output': es_write_git_index = parser.get(section, option)
connection = "https://" + user + ":" + password + "@" + host + ":" + port + "/"
print (connection)
es_read = Elasticsearch([connection], use_ssl=True, verity_certs=True, ca_cert=certifi.where(), scroll='300m', timeout=100)
connection_output = "https://" + user_output + ":" + password_output + "@" + host_output + ":" + port_output + "/" + path_output
es_write = Elasticsearch([connection_output], use_ssl=True, verity_certs=True, ca_cert=certifi.where(), scroll='300m', timeout=100)
return es_read, es_write, es_read_git_index, es_read_gerrit_index, \
es_write_git_index, es_write_gerrit_index, genderize_key
def where():
"""Return the preferred certificate bundle."""
# vendored bundle inside Requests
return os.path.join(os.path.dirname(__file__), 'cacert.pem')
def where():
"""Return the preferred certificate bundle."""
# vendored bundle inside Requests
return os.path.join(os.path.dirname(__file__), 'cacert.pem')
def where():
"""Return the preferred certificate bundle."""
# vendored bundle inside Requests
return os.path.join(os.path.dirname(__file__), 'cacert.pem')
def where():
"""Return the preferred certificate bundle."""
# vendored bundle inside Requests
return os.path.join(os.path.dirname(__file__), 'cacert.pem')
def read_all(self):
"""Return buffered data received on stdout and stderr channels.
This is useful for non-interactive call where a set of command passed
to the API call and their result is needed after the call is concluded.
Should be called after run_forever() or update()
TODO: Maybe we can process this and return a more meaningful map with
channels mapped for each input.
"""
out = self._all
self._all = ""
self._channels = {}
return out
def where():
"""Return the preferred certificate bundle."""
# vendored bundle inside Requests
return os.path.join(os.path.dirname(__file__), 'cacert.pem')