def __deserialize_date(self, string):
"""
Deserializes string to date.
:param string: str.
:return: date.
"""
try:
from dateutil.parser import parse
return parse(string).date()
except ImportError:
return string
except ValueError:
raise ApiException(
status=0,
reason="Failed to parse `{0}` into a date object"
.format(string)
)
python类parse()的实例源码
def __deserialize_datatime(self, string):
"""
Deserializes string to datetime.
The string should be in iso8601 datetime format.
:param string: str.
:return: datetime.
"""
try:
from dateutil.parser import parse
return parse(string)
except ImportError:
return string
except ValueError:
raise ApiException(
status=0,
reason="Failed to parse `{0}` into a datetime object".
format(string)
)
def __deserialize_date(self, string):
"""
Deserializes string to date.
:param string: str.
:return: date.
"""
try:
from dateutil.parser import parse
return parse(string).date()
except ImportError:
return string
except ValueError:
raise ApiException(
status=0,
reason="Failed to parse `{0}` into a date object"
.format(string)
)
def __deserialize_datatime(self, string):
"""
Deserializes string to datetime.
The string should be in iso8601 datetime format.
:param string: str.
:return: datetime.
"""
try:
from dateutil.parser import parse
return parse(string)
except ImportError:
return string
except ValueError:
raise ApiException(
status=0,
reason="Failed to parse `{0}` into a datetime object".
format(string)
)
def resolve_reference(cls, design_ref):
"""Resolve a reference to a design document.
Locate a schema handler based on the URI scheme of the data reference
and use that handler to get the data referenced.
:param design_ref: A URI-formatted reference to a data entity
"""
try:
design_uri = urllib.parse.urlparse(design_ref)
handler = cls.scheme_handlers.get(design_uri.scheme, None)
if handler is None:
raise errors.InvalidDesignReference(
"Invalid reference scheme %s: no handler." %
design_uri.scheme)
else:
# Have to do a little magic to call the classmethod as a pointer
return handler.__get__(None, cls)(design_uri)
except ValueError:
raise errors.InvalidDesignReference(
"Cannot resolve design reference %s: unable to parse as valid URI."
% design_ref)
def resolve_reference_http(cls, design_uri):
"""Retrieve design documents from http/https endpoints.
Return a byte array of the response content. Support unsecured or
basic auth
:param design_uri: Tuple as returned by urllib.parse for the design reference
"""
if design_uri.username is not None and design_uri.password is not None:
response = requests.get(
design_uri.geturl(),
auth=(design_uri.username, design_uri.password),
timeout=30)
else:
response = requests.get(design_uri.geturl(), timeout=30)
return response.content
def resolve_reference_ucp(cls, design_uri):
"""Retrieve artifacts from a UCP service endpoint.
Return a byte array of the response content. Assumes Keystone
authentication required.
:param design_uri: Tuple as returned by urllib.parse for the design reference
"""
ks_sess = KeystoneUtils.get_session()
(new_scheme, foo) = re.subn('^[^+]+\+', '', design_uri.scheme)
url = urllib.parse.urlunparse((new_scheme, design_uri.netloc, design_uri.path,
design_uri.params, design_uri.query, design_uri.fragment))
logger = logging.getLogger(__name__)
logger.debug("Calling Keystone session for url %s" % str(url))
resp = ks_sess.get(url)
if resp.status_code >= 400:
raise errors.InvalidDesignReference(
"Received error code for reference %s: %s - %s" % (url, str(resp.status_code), resp.text))
return resp.content
def dl_json(url, header={}):
'''
http GET one file, and parse json text. return json info result
'''
# NOTE json must use 'utf-8' encoding
blob = dl_blob(url, header=header)
try:
text = blob.decode('utf-8')
except Exception as e:
er = err.DecodingError('decode blob to json text with \"utf-8\" failed, URL ', url)
er.blob = blob
raise er from e
try:
info = json.loads(text)
return info
except Exception as e:
er = err.ParseJSONError('parse json text failed, URL ', url)
er.text = text
raise er from e
# TODO now only support 'utf-8' encoding
def __init__(self, credentials, host, request_uri, headers, response, content, http):
from urllib.parse import urlencode
Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
challenge = _parse_www_authenticate(response, 'www-authenticate')
service = challenge['googlelogin'].get('service', 'xapi')
# Bloggger actually returns the service in the challenge
# For the rest we guess based on the URI
if service == 'xapi' and request_uri.find("calendar") > 0:
service = "cl"
# No point in guessing Base or Spreadsheet
#elif request_uri.find("spreadsheets") > 0:
# service = "wise"
auth = dict(Email=credentials[0], Passwd=credentials[1], service=service, source=headers['user-agent'])
resp, content = self.http.request("https://www.google.com/accounts/ClientLogin", method="POST", body=urlencode(auth), headers={'Content-Type': 'application/x-www-form-urlencoded'})
lines = content.split('\n')
d = dict([tuple(line.split("=", 1)) for line in lines if line])
if resp.status == 403:
self.Auth = ""
else:
self.Auth = d['Auth']
def sign(self, url, endpoint, endpoint_path, method_verb, *args, **kwargs):
try:
req = kwargs['params']
except KeyError:
req = {}
req['nonce'] = self.nonce()
postdata = urllib.parse.urlencode(req)
# Unicode-objects must be encoded before hashing
encoded = (str(req['nonce']) + postdata).encode('utf-8')
message = (endpoint_path.encode('utf-8') +
hashlib.sha256(encoded).digest())
signature = hmac.new(base64.b64decode(self.secret),
message, hashlib.sha512)
sigdigest = base64.b64encode(signature.digest())
headers = {
'API-Key': self.key,
'API-Sign': sigdigest.decode('utf-8')
}
return url, {'data': req, 'headers': headers}
def sign(self, uri, endpoint, endpoint_path, method_verb, *args, **kwargs):
nonce = self.nonce()
try:
params = kwargs['params']
except KeyError:
params = {}
params['apikey'] = self.key
params['nonce'] = nonce
post_params = params
post_params.update({'nonce': nonce, 'method': endpoint})
post_params = urllib.parse.urlencode(post_params)
url = uri + post_params
sig = hmac.new(url, self.secret, hashlib.sha512)
headers = {'apisign': sig}
return url, {'headers': headers}
def sign(self, url, endpoint, endpoint_path, method_verb, *args, **kwargs):
try:
params = kwargs['params']
except KeyError:
params = {}
nonce = self.nonce()
req_string = endpoint_path + '?apikey=' + self.key + "&nonce=" + nonce + '&'
req_string += urllib.parse.urlencode(params)
headers = {"apisign": hmac.new(self.secret.encode('utf-8'),
(self.uri + req_string).encode('utf-8'),
hashlib.sha512).hexdigest()}
return self.uri + req_string, {'headers': headers, 'params': {}}
def __deserialize_date(self, string):
"""
Deserializes string to date.
:param string: str.
:return: date.
"""
try:
from dateutil.parser import parse
return parse(string).date()
except ImportError:
return string
except ValueError:
raise ApiException(
status=0,
reason="Failed to parse `{0}` into a date object"
.format(string)
)
def __deserialize_datatime(self, string):
"""
Deserializes string to datetime.
The string should be in iso8601 datetime format.
:param string: str.
:return: datetime.
"""
try:
from dateutil.parser import parse
return parse(string)
except ImportError:
return string
except ValueError:
raise ApiException(
status=0,
reason="Failed to parse `{0}` into a datetime object".
format(string)
)
def get(url,headers={},params=None):
if sys.version_info[0] == 3:
if params:
data = parse.urlencode(params)
url = "%s?%s" % (url, data)
req = request.Request(url=url,headers=headers)
else:
req = request.Request(url=url,headers=headers)
response = request.urlopen(req)
return Response(response)
elif sys.version_info[0] == 2:
if params:
data = urllib.urlencode(params)
url = url + '?' + data
req = urllib2.Request(url=url,headers=headers)
else:
req = urllib2.Request(url=url,headers=headers)
response = urllib2.urlopen(req)
return Response(response)
else:
return None
def parse(self, response, crawler):
document = lxml.etree.HTML(response.text)
for title in document.cssselect('tr.athing a.storylink'):
yield title.text
urlinfo = urllib.parse.urlparse(response.url)
base_url = urlinfo.scheme + '://' + urlinfo.netloc
try:
href = document.cssselect('a.morelink')[0].get('href')
except:
return
next_url = urllib.parse.urljoin(base_url, href)
crawler.schedule_request(next_url)
def __deserialize_date(self, string):
"""
Deserializes string to date.
:param string: str.
:return: date.
"""
try:
from dateutil.parser import parse
return parse(string).date()
except ImportError:
return string
except ValueError:
raise ApiException(
status=0,
reason="Failed to parse `{0}` into a date object"
.format(string)
)
def __deserialize_datatime(self, string):
"""
Deserializes string to datetime.
The string should be in iso8601 datetime format.
:param string: str.
:return: datetime.
"""
try:
from dateutil.parser import parse
return parse(string)
except ImportError:
return string
except ValueError:
raise ApiException(
status=0,
reason="Failed to parse `{0}` into a datetime object".
format(string)
)
def __init__(self, url=None, *args, **kwargs):
super(Bazaar, self).__init__(url, *args, **kwargs)
# Python >= 2.7.4, 3.3 doesn't have uses_fragment or non_hierarchical
# Register lp but do not expose as a scheme to support bzr+lp.
if getattr(urllib_parse, 'uses_fragment', None):
urllib_parse.uses_fragment.extend(['lp'])
urllib_parse.non_hierarchical.extend(['lp'])
def __init__(self, url=None, *args, **kwargs):
super(Bazaar, self).__init__(url, *args, **kwargs)
# Python >= 2.7.4, 3.3 doesn't have uses_fragment or non_hierarchical
# Register lp but do not expose as a scheme to support bzr+lp.
if getattr(urllib_parse, 'uses_fragment', None):
urllib_parse.uses_fragment.extend(['lp'])
urllib_parse.non_hierarchical.extend(['lp'])