def __init__(self, uri, timeout=None, ssl_context_factory=None):
"""Initialize a HTTP Socket.
@param uri(str) The http_scheme:://host:port/path to connect to.
@param timeout timeout in ms
"""
parsed = urllib.parse.urlparse(uri)
self.scheme = parsed.scheme
assert self.scheme in ('http', 'https')
if self.scheme == 'http':
self.port = parsed.port or http_client.HTTP_PORT
elif self.scheme == 'https':
self.port = parsed.port or http_client.HTTPS_PORT
self.host = parsed.hostname
self.path = parsed.path
if parsed.query:
self.path += '?%s' % parsed.query
self.__wbuf = BytesIO()
self.__http = None
self.__custom_headers = None
self.__timeout = None
if timeout:
self.setTimeout(timeout)
self._ssl_context_factory = ssl_context_factory
python类parse()的实例源码
def parse_sphinx_searchindex(searchindex):
"""Parse a Sphinx search index
Parameters
----------
searchindex : str
The Sphinx search index (contents of searchindex.js)
Returns
-------
filenames : list of str
The file names parsed from the search index.
objects : dict
The objects parsed from the search index.
"""
# Make sure searchindex uses UTF-8 encoding
if hasattr(searchindex, 'decode'):
searchindex = searchindex.decode('UTF-8')
# parse objects
query = 'objects:'
pos = searchindex.find(query)
if pos < 0:
raise ValueError('"objects:" not found in search index')
sel = _select_block(searchindex[pos:], '{', '}')
objects = _parse_dict_recursive(sel)
# parse filenames
query = 'filenames:'
pos = searchindex.find(query)
if pos < 0:
raise ValueError('"filenames:" not found in search index')
filenames = searchindex[pos + len(query) + 1:]
filenames = filenames[:filenames.find(']')]
filenames = [f.strip('"') for f in filenames.split(',')]
return filenames, objects
def parse_sphinx_searchindex(searchindex):
"""Parse a Sphinx search index
Parameters
----------
searchindex : str
The Sphinx search index (contents of searchindex.js)
Returns
-------
filenames : list of str
The file names parsed from the search index.
objects : dict
The objects parsed from the search index.
"""
# Make sure searchindex uses UTF-8 encoding
if hasattr(searchindex, 'decode'):
searchindex = searchindex.decode('UTF-8')
# parse objects
query = 'objects:'
pos = searchindex.find(query)
if pos < 0:
raise ValueError('"objects:" not found in search index')
sel = _select_block(searchindex[pos:], '{', '}')
objects = _parse_dict_recursive(sel)
# parse filenames
query = 'filenames:'
pos = searchindex.find(query)
if pos < 0:
raise ValueError('"filenames:" not found in search index')
filenames = searchindex[pos + len(query) + 1:]
filenames = filenames[:filenames.find(']')]
filenames = [f.strip('"') for f in filenames.split(',')]
return filenames, objects
def s3upload(presigned_url, filename):
"""
Given an uploader URL and a filename, parse the URL and get the presigned URL then use the presigned URL to upload
the file named filename to S3.
Parameters:
presigned_url (str): the presigned URL that that is the PUT endpoint
filename (str): the file to be pushed to S3
Returns:
(bool): True if the presigned URL was generated and the file uploaded successfully, else False.
"""
import logging
ec2rlcore.logutil.LogUtil.get_root_logger().addHandler(logging.NullHandler())
try:
# The response puts the URL string in double quotes so cut off the first and last characters
with open(filename, "rb") as payload:
response = requests.put(url=presigned_url, data=payload)
if response.status_code == 200:
ec2rlcore.dual_log("Upload successful")
return True
else:
ec2rlcore.dual_log("ERROR: Upload failed. Received response {}".format(
response.status_code))
raise S3UploadResponseError(response.status_code)
except requests.exceptions.Timeout:
raise requests.exceptions.Timeout("ERROR: connection timed out.")
except IOError:
raise S3UploadTarfileReadError("ERROR: there was an issue reading the file '{}'.".format(filename))
def __init__(self, error_message, *args):
message = "Failed to parse URL: {}".format(error_message)
super(S3UploadUrlParsingFailure, self).__init__(message, *args)
def identify_names(code):
"""Builds a codeobj summary by identifying and resovles used names
>>> code = '''
... from a.b import c
... import d as e
... print(c)
... e.HelloWorld().f.g
... '''
>>> for name, o in sorted(identify_names(code).items()):
... print(name, o['name'], o['module'], o['module_short'])
c c a.b a.b
e.HelloWorld HelloWorld d d
"""
finder = NameFinder()
finder.visit(ast.parse(code))
example_code_obj = {}
for name, full_name in finder.get_mapping():
# name is as written in file (e.g. np.asarray)
# full_name includes resolved import path (e.g. numpy.asarray)
module, attribute = full_name.rsplit('.', 1)
# get shortened module name
module_short = get_short_module_name(module, attribute)
cobj = {'name': attribute, 'module': module,
'module_short': module_short}
example_code_obj[name] = cobj
return example_code_obj
def identify_names(code):
"""Builds a codeobj summary by identifying and resovles used names
>>> code = '''
... from a.b import c
... import d as e
... print(c)
... e.HelloWorld().f.g
... '''
>>> for name, o in sorted(identify_names(code).items()):
... print(name, o['name'], o['module'], o['module_short'])
c c a.b a.b
e.HelloWorld HelloWorld d d
"""
finder = NameFinder()
finder.visit(ast.parse(code))
example_code_obj = {}
for name, full_name in finder.get_mapping():
# name is as written in file (e.g. np.asarray)
# full_name includes resolved import path (e.g. numpy.asarray)
module, attribute = full_name.rsplit('.', 1)
# get shortened module name
module_short = get_short_module_name(module, attribute)
cobj = {'name': attribute, 'module': module,
'module_short': module_short}
example_code_obj[name] = cobj
return example_code_obj
def flush(self):
if self.isOpen():
self.close()
self.open()
# Pull data out of buffer
data = self.__wbuf.getvalue()
self.__wbuf = BytesIO()
# HTTP request
self.__http.putrequest('POST', self.path, skip_host=True)
# Write headers
self.__http.putheader('Host', self.host)
self.__http.putheader('Content-Type', 'application/x-thrift')
self.__http.putheader('Content-Length', str(len(data)))
if (not self.__custom_headers or
'User-Agent' not in self.__custom_headers):
user_agent = 'Python/THttpClient'
script = os.path.basename(sys.argv[0])
if script:
user_agent = '%s (%s)' % (
user_agent, urllib.parse.quote(script))
self.__http.putheader('User-Agent', user_agent)
if self.__custom_headers:
for key, val in self.__custom_headers.items():
self.__http.putheader(key, val)
self.__http.endheaders()
# Write payload
self.__http.send(data)
# Get reply to flush the request
response = self.__http.getresponse()
self.code, self.message, self.headers = (
response.status, response.msg, response.getheaders())
self.response = response
def identify_names(code):
"""Builds a codeobj summary by identifying and resovles used names
>>> code = '''
... from a.b import c
... import d as e
... print(c)
... e.HelloWorld().f.g
... '''
>>> for name, o in sorted(identify_names(code).items()):
... print(name, o['name'], o['module'], o['module_short'])
c c a.b a.b
e.HelloWorld HelloWorld d d
"""
finder = NameFinder()
finder.visit(ast.parse(code))
example_code_obj = {}
for name, full_name in finder.get_mapping():
# name is as written in file (e.g. np.asarray)
# full_name includes resolved import path (e.g. numpy.asarray)
module, attribute = full_name.rsplit('.', 1)
# get shortened module name
module_short = get_short_module_name(module, attribute)
cobj = {'name': attribute, 'module': module,
'module_short': module_short}
example_code_obj[name] = cobj
return example_code_obj
def get_presigned_url(uploader_url, filename, region="us-east-1"):
"""
Given an uploader URL and a filename, parse the URL and get the presigned URL then use the presigned URL to upload
the file named filename to S3.
Parameters:
uploader_url (str): the uploader URL provided by the AWS engineer
filename (str): the file to be pushed to S3
region (str): the S3 endpoint the file should be uploaded to
Returns:
presigned_url (str): the presigned URL as a string
"""
endpoint = "https://30yinsv8k6.execute-api.us-east-1.amazonaws.com/prod/get-signed-url"
s3uploader_regions = ["us-east-1", "eu-west-1", "ap-northeast-1"]
if region not in s3uploader_regions:
region = "us-east-1"
query_str = urlparse.urlparse(uploader_url).query
put_dict = urlparse.parse_qs(query_str)
# If uploader_url is not parsable then maybe it is a shortened URL
try:
if not put_dict:
the_request = urllib_request.Request(uploader_url)
uploader_url = urllib_urlopen(the_request).geturl()
query_str = urlparse.urlparse(uploader_url).query
put_dict = urlparse.parse_qs(query_str)
except urllib_urlerror:
pass
if put_dict:
# urlparse.parse_qs returns dict values that are single value lists
# Reassign the values to the first value in the list
put_dict["accountId"] = put_dict["account-id"][0]
put_dict["caseId"] = put_dict["case-id"][0]
put_dict["key"] = put_dict["key"][0]
put_dict["expiration"] = int(put_dict["expiration"][0])
put_dict["fileName"] = filename
put_dict["region"] = region
else:
raise S3UploadUrlParsingFailure(uploader_url)
json_payload = json.dumps(put_dict)
try:
response = requests.post(url=endpoint, data=json_payload)
# If the initial put was successful then proceed with uploading the file using the returned presigned URL
if response.status_code == 200:
presigned_url = response.text
return presigned_url
else:
raise S3UploadGetPresignedURLError(response.status_code, uploader_url)
except requests.exceptions.Timeout:
raise requests.exceptions.Timeout("ERROR: Connection timed out.")