def sign(self, uri, endpoint, endpoint_path, method_verb, *args, **kwargs):
try:
params = kwargs['params']
except KeyError:
params = {}
if method_verb != 'POST':
endpoint_path += urllib.parse.urlencode(params)
msg = {'path': endpoint_path, 'nonce': self.nonce(), 'token_id': self.key}
signature = jwt.encode(msg, self.secret, algorithm='HS256')
headers = {'X-Quoine-API-Version': '2', 'X-Quoine-Auth': signature,
'Content-Type': 'application/json'}
request = {'headers': headers}
if method_verb == 'POST':
request['json'] = params
return self.uri + endpoint_path, request
python类parse()的实例源码
def httpPost(url,resource,params):
headers = {
"Content-type" : "application/x-www-form-urlencoded",
}
try :
conn = httplib.HTTPSConnection(url, timeout=10)
temp_params = urllib.parse.urlencode(params)
conn.request("POST", resource, temp_params, headers)
response = conn.getresponse()
data = response.read().decode('utf-8')
params.clear()
conn.close()
return data
except:
# except Exception,e:
# print(Exception,":",e)
traceback.print_exc()
return False
def doJenkinsSetUrl(recipes, argv):
parser = argparse.ArgumentParser(prog="bob jenkins set-url")
parser.add_argument("name", help="Jenkins server alias")
parser.add_argument("url", help="New URL")
args = parser.parse_args(argv)
if args.name not in BobState().getAllJenkins():
print("Jenkins '{}' not known.".format(args.name), file=sys.stderr)
sys.exit(1)
url = urllib.parse.urlparse(args.url)
urlPath = url.path
if not urlPath.endswith("/"): urlPath = urlPath + "/"
config = BobState().getJenkinsConfig(args.name)
config["url"] = {
"scheme" : url.scheme,
"server" : url.hostname,
"port" : url.port,
"path" : urlPath,
"username" : url.username,
"password" : url.password,
}
BobState().setJenkinsConfig(args.name, config)
def dict_search_args_parse(self, message):
if not message:
await self.bot.say("Error in arg parse")
return
limit = 1
query = message
result = re.match(r"^([0-9]+)\s+(.*)$", message)
if result:
limit, query = [result.group(x) for x in (1, 2)]
return int(limit), query
# keys = ["limit"]
# kwargs = utils.get_kwargs(args, keys)
# try:
# limit = int(kwargs["limit"])
# if limit <= 0:
# raise ValueError
# except (ValueError, KeyError):
# limit = 1
# query = utils.strip_kwargs(args, keys)
def dumb_css_parser(data):
"""returns a hash of css selectors, each of which contains a hash of css attributes"""
# remove @import sentences
data += ';'
importIndex = data.find('@import')
while importIndex != -1:
data = data[0:importIndex] + data[data.find(';', importIndex) + 1:]
importIndex = data.find('@import')
# parse the css. reverted from dictionary compehension in order to support older pythons
elements = [x.split('{') for x in data.split('}') if '{' in x.strip()]
try:
elements = dict([(a.strip(), dumb_property_dict(b)) for a, b in elements])
except ValueError:
elements = {} # not that important
return elements
def _call_ACIS(self, kwargs, **moreKwargs):
'''
Core method for calling the ACIS services.
Returns python dictionary by de-serializing json response
'''
#self._formatInputDict(**kwargs)
kwargs.update(moreKwargs)
self._input_dict = self._stripNoneValues(kwargs)
self.url = self.baseURL + self.webServiceSource
if pyVersion == 2: #python 2.x
params = urllib.urlencode({'params':json.dumps(self._input_dict)})
request = urllib2.Request(self.url, params, {'Accept':'application/json'})
response = urllib2.urlopen(request)
jsonData = response.read()
elif pyVersion == 3: #python 3.x
params = urllib.parse.urlencode({'params':json.dumps(self._input_dict)})
params = params.encode('utf-8')
req = urllib.request.urlopen(self.url, data = params)
jsonData = req.read().decode()
return json.loads(jsonData)
def _fetch_img_urls(self, keyword, safe_search=False):
# bing img search, https://gist.github.com/stephenhouser/c5e2b921c3770ed47eb3b75efbc94799
url = self._get_bing_url(keyword, safe_search=safe_search)
self.logger.debug('search url {}'.format(url))
header = {
'User-Agent': "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/43.0.2357.134 Safari/537.36"}
soup = BeautifulSoup(urllib.request.urlopen(urllib.request.Request(url, headers=header)), 'html.parser')
imgs = [] # contains the link for Large original images, type of image
for a in soup.find_all("a", {"class": "iusc"}):
mad = json.loads(a["mad"])
turl = mad["turl"]
m = json.loads(a["m"])
murl = m["murl"]
image_name = urllib.parse.urlsplit(murl).path.split("/")[-1]
imgs.append((image_name, turl, murl))
return imgs
def __init__(self, var):
#: The original string that comes through with the variable
self.original = var
#: The operator for the variable
self.operator = ''
#: List of safe characters when quoting the string
self.safe = ''
#: List of variables in this variable
self.variables = []
#: List of variable names
self.variable_names = []
#: List of defaults passed in
self.defaults = {}
# Parse the variable itself.
self.parse()
self.post_parse()
def __init__(self, credentials, host, request_uri, headers, response, content, http):
from urllib.parse import urlencode
Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
challenge = _parse_www_authenticate(response, 'www-authenticate')
service = challenge['googlelogin'].get('service', 'xapi')
# Bloggger actually returns the service in the challenge
# For the rest we guess based on the URI
if service == 'xapi' and request_uri.find("calendar") > 0:
service = "cl"
# No point in guessing Base or Spreadsheet
#elif request_uri.find("spreadsheets") > 0:
# service = "wise"
auth = dict(Email=credentials[0], Passwd=credentials[1], service=service, source=headers['user-agent'])
resp, content = self.http.request("https://www.google.com/accounts/ClientLogin", method="POST", body=urlencode(auth), headers={'Content-Type': 'application/x-www-form-urlencoded'})
lines = content.split('\n')
d = dict([tuple(line.split("=", 1)) for line in lines if line])
if resp.status == 403:
self.Auth = ""
else:
self.Auth = d['Auth']
def testGet301(self):
# Test that we automatically follow 301 redirects
# and that we cache the 301 response
uri = urllib.parse.urljoin(base, "301/onestep.asis")
destination = urllib.parse.urljoin(base, "302/final-destination.txt")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
self.assertTrue('content-location' in response)
self.assertEqual(response['content-location'], destination)
self.assertEqual(content, b"This is the final destination.\n")
self.assertEqual(response.previous.status, 301)
self.assertEqual(response.previous.fromcache, False)
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
self.assertEqual(response['content-location'], destination)
self.assertEqual(content, b"This is the final destination.\n")
self.assertEqual(response.previous.status, 301)
self.assertEqual(response.previous.fromcache, True)
def testGet302RedirectionLimit(self):
# Test that we can set a lower redirection limit
# and that we raise an exception when we exceed
# that limit.
self.http.force_exception_to_status_code = False
uri = urllib.parse.urljoin(base, "302/twostep.asis")
try:
(response, content) = self.http.request(uri, "GET", redirections = 1)
self.fail("This should not happen")
except httplib2.RedirectLimit:
pass
except Exception as e:
self.fail("Threw wrong kind of exception ")
# Re-run the test with out the exceptions
self.http.force_exception_to_status_code = True
(response, content) = self.http.request(uri, "GET", redirections = 1)
self.assertEqual(response.status, 500)
self.assertTrue(response.reason.startswith("Redirected more"))
self.assertEqual("302", response['status'])
self.assertTrue(content.startswith(b"<html>"))
self.assertTrue(response.previous != None)
def testGet302NoLocation(self):
# Test that we throw an exception when we get
# a 302 with no Location: header.
self.http.force_exception_to_status_code = False
uri = urllib.parse.urljoin(base, "302/no-location.asis")
try:
(response, content) = self.http.request(uri, "GET")
self.fail("Should never reach here")
except httplib2.RedirectMissingLocation:
pass
except Exception as e:
self.fail("Threw wrong kind of exception ")
# Re-run the test with out the exceptions
self.http.force_exception_to_status_code = True
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 500)
self.assertTrue(response.reason.startswith("Redirected but"))
self.assertEqual("302", response['status'])
self.assertTrue(content.startswith(b"This is content"))
def testGet304(self):
# Test that we use ETags properly to validate our cache
uri = urllib.parse.urljoin(base, "304/test_etag.txt")
(response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'})
self.assertNotEqual(response['etag'], "")
(response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'})
(response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'cache-control': 'must-revalidate'})
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, True)
cache_file_name = os.path.join(cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1]))
f = open(cache_file_name, "r")
status_line = f.readline()
f.close()
self.assertTrue(status_line.startswith("status:"))
(response, content) = self.http.request(uri, "HEAD", headers = {'accept-encoding': 'identity'})
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, True)
(response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'range': 'bytes=0-0'})
self.assertEqual(response.status, 206)
self.assertEqual(response.fromcache, False)
def testGet307(self):
# Test that we do follow 307 redirects but
# do not cache the 307
uri = urllib.parse.urljoin(base, "307/onestep.asis")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
self.assertEqual(content, b"This is the final destination.\n")
self.assertEqual(response.previous.status, 307)
self.assertEqual(response.previous.fromcache, False)
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, True)
self.assertEqual(content, b"This is the final destination.\n")
self.assertEqual(response.previous.status, 307)
self.assertEqual(response.previous.fromcache, False)
def testNoVary(self):
pass
# when there is no vary, a different Accept header (e.g.) should not
# impact if the cache is used
# test that the vary header is not sent
# uri = urllib.parse.urljoin(base, "vary/no-vary.asis")
# (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
# self.assertEqual(response.status, 200)
# self.assertFalse('vary' in response)
#
# (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
# self.assertEqual(response.status, 200)
# self.assertEqual(response.fromcache, True, msg="Should be from cache")
#
# (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
# self.assertEqual(response.status, 200)
# self.assertEqual(response.fromcache, True, msg="Should be from cache")
def testVaryHeaderDouble(self):
uri = urllib.parse.urljoin(base, "vary/accept-double.asis")
(response, content) = self.http.request(uri, "GET", headers={
'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
self.assertEqual(response.status, 200)
self.assertTrue('vary' in response)
# we are from cache
(response, content) = self.http.request(uri, "GET", headers={
'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
self.assertEqual(response.fromcache, True, msg="Should be from cache")
(response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, False)
# get the resource again, not from cache, varied headers don't match exact
(response, content) = self.http.request(uri, "GET", headers={'Accept-Language': 'da'})
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, False, msg="Should not be from cache")
def testGetGZipFailure(self):
# Test that we raise a good exception when the gzip fails
self.http.force_exception_to_status_code = False
uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis")
try:
(response, content) = self.http.request(uri, "GET")
self.fail("Should never reach here")
except httplib2.FailedToDecompressContent:
pass
except Exception:
self.fail("Threw wrong kind of exception")
# Re-run the test with out the exceptions
self.http.force_exception_to_status_code = True
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 500)
self.assertTrue(response.reason.startswith("Content purported"))
def testGetDeflateFailure(self):
# Test that we raise a good exception when the deflate fails
self.http.force_exception_to_status_code = False
uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis")
try:
(response, content) = self.http.request(uri, "GET")
self.fail("Should never reach here")
except httplib2.FailedToDecompressContent:
pass
except Exception:
self.fail("Threw wrong kind of exception")
# Re-run the test with out the exceptions
self.http.force_exception_to_status_code = True
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 500)
self.assertTrue(response.reason.startswith("Content purported"))
def testBasicAuth(self):
# Test Basic Authentication
uri = urllib.parse.urljoin(base, "basic/file.txt")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 401)
uri = urllib.parse.urljoin(base, "basic/")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 401)
self.http.add_credentials('joe', 'password')
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
uri = urllib.parse.urljoin(base, "basic/file.txt")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
def testBasicAuthTwoDifferentCredentials(self):
# Test Basic Authentication with multiple sets of credentials
uri = urllib.parse.urljoin(base, "basic2/file.txt")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 401)
uri = urllib.parse.urljoin(base, "basic2/")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 401)
self.http.add_credentials('fred', 'barney')
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
uri = urllib.parse.urljoin(base, "basic2/file.txt")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
def testDigestAuthStale(self):
# Test that we can handle a nonce becoming stale
uri = urllib.parse.urljoin(base, "digest-expire/file.txt")
self.http.add_credentials('joe', 'password')
(response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
info = httplib2._parse_www_authenticate(response, 'authentication-info')
self.assertEqual(response.status, 200)
time.sleep(3)
# Sleep long enough that the nonce becomes stale
(response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
self.assertFalse(response.fromcache)
self.assertTrue(response._stale_digest)
info3 = httplib2._parse_www_authenticate(response, 'authentication-info')
self.assertEqual(response.status, 200)
def parse_songci(self, response):
item = SongCiItem()
item['url'] = response.url
full_title = response.css('div.son1>h1::text').extract_first()
if full_title:
try:
item['tune_name'], item['title'] = full_title.split('·')
except ValueError:
item['title'] = full_title
son2_p = response.css('div.son2>p')
for p in son2_p:
for name, field in {'??': 'dynasty', '??': 'author'}.items():
if name in p.css('::text').extract_first():
item[field] = p.css('::text').extract()[1]
content = ''.join(response.css('div#cont::text').extract()).strip()
if content:
item['content'] = content
else:
all_p_texts = son2_p.css('::text').extract()
try:
item['content'] = '\n'.join(all_p_texts[all_p_texts.index('???') + 1:]).strip()
except ValueError:
self.logger.error('Cannot parse item. url=%s', response.url)
yield item
def sendGetRequest(self, parser = None):
self.response = None
params = self.params#[param.items()[0] for param in self.params];
parser = parser or self.parser or ResponseParser()
headers = dict(list({"User-Agent":self.getUserAgent(),
"Accept": parser.getMeta()
}.items()) + list(self.headers.items()));
host,port,path = self.getConnectionParameters()
self.response = WARequest.sendRequest(host, port, path, headers, params, "GET")
if not self.response.status == WARequest.OK:
logger.error("Request not success, status was %s"%self.response.status)
return {}
data = self.response.read()
logger.info(data)
self.sent = True
return parser.parse(data.decode(), self.pvars)
def sendPostRequest(self, parser = None):
self.response = None
params = self.params #[param.items()[0] for param in self.params];
parser = parser or self.parser or ResponseParser()
headers = dict(list({"User-Agent":self.getUserAgent(),
"Accept": parser.getMeta(),
"Content-Type":"application/x-www-form-urlencoded"
}.items()) + list(self.headers.items()))
host,port,path = self.getConnectionParameters()
self.response = WARequest.sendRequest(host, port, path, headers, params, "POST")
if not self.response.status == WARequest.OK:
logger.error("Request not success, status was %s" % self.response.status)
return {}
data = self.response.read()
logger.info(data)
self.sent = True
return parser.parse(data.decode(), self.pvars)
def setUp(self):
# the URLs for now which will have the WSDL files and the XSD file
#urlparse.urljoin('file:', urllib.pathname2url(os.path.abspath("service.xml")))
import urllib
import os
from urllib.parse import urlparse
from urllib.request import pathname2url
query_services_url = urllib.parse.urljoin('file:', pathname2url(os.path.abspath('../wsdl_files/vipuserservices-query-1.7.wsdl')))
userservices_url = urllib.parse.urljoin('file:', pathname2url(os.path.abspath('../wsdl_files/vipuserservices-auth-1.7.wsdl')))
# initializing the Suds clients for each url, with the client certificate youll have in the same dir as this file
query_services_client = Client(query_services_url,
transport=HTTPSClientCertTransport('vip_certificate.crt', 'vip_certificate.crt'))
user_services_client = Client(userservices_url,
transport=HTTPSClientCertTransport('vip_certificate.crt', 'vip_certificate.crt'))
self.test_user_services_object = SymantecUserServices(user_services_client)
def setUp(self):
# the URLs for now which will have the WSDL files and the XSD file
import urllib
import os
from urllib.parse import urlparse
from urllib.request import pathname2url
managementservices_url = urllib.parse.urljoin('file:', pathname2url(
os.path.abspath('../wsdl_files/vipuserservices-mgmt-1.7.wsdl')))
# managementservices_url = 'http://webdev.cse.msu.edu/~huynhall/vipuserservices-mgmt-1.7.wsdl'
# initializing the Suds clients for each url, with the client certificate youll have in the same dir as this file
self.management_client = Client(managementservices_url,
transport=HTTPSClientCertTransport('vip_certificate.crt',
'vip_certificate.crt'))
self.test_management_services_object = SymantecManagementServices(self.management_client)
pass
def __creatBlocks(self):
"""
Second part of parsing. Find blocks and creat a list.
"""
w = list(zip(self.lines, self.indentationList))
self.blocks, indentation, level = "[", 0, 0
for i in w:
if i[1] > indentation:
level = level + 1
self.blocks += ",[" + '"' + urllib.parse.quote_plus(i[0]) + '"'
elif i[1] == 0:
if len(self.blocks) > 1:
self.blocks += "]" * (level) + ','
self.blocks += '"' + urllib.parse.quote_plus(i[0]) + '"'
level = 0
elif i[1] < indentation:
if w.index(i) != len(w):
self.blocks += "]" + "," + '"' + \
urllib.parse.quote_plus(i[0]) + '"'
level += -1
elif i[1] == indentation:
self.blocks += "," + '"' + urllib.parse.quote_plus(i[0]) + '"'
indentation = i[1]
self.blocks += "]" * (level + 1)
self.blocks = ast.literal_eval(self.blocks)
def _makeSafeAbsoluteURI(base, rel=None):
# bail if ACCEPTABLE_URI_SCHEMES is empty
if not ACCEPTABLE_URI_SCHEMES:
return _urljoin(base, rel or '')
if not base:
return rel or ''
if not rel:
try:
scheme = urllib.parse.urlparse(base)[0]
except ValueError:
return ''
if not scheme or scheme in ACCEPTABLE_URI_SCHEMES:
return base
return ''
uri = _urljoin(base, rel)
if uri.strip().split(':', 1)[0] not in ACCEPTABLE_URI_SCHEMES:
return ''
return uri
def http_error_401(self, req, fp, code, msg, headers):
# Check if
# - server requires digest auth, AND
# - we tried (unsuccessfully) with basic auth, AND
# If all conditions hold, parse authentication information
# out of the Authorization header we sent the first time
# (for the username and password) and the WWW-Authenticate
# header the server sent back (for the realm) and retry
# the request with the appropriate digest auth headers instead.
# This evil genius hack has been brought to you by Aaron Swartz.
host = urllib.parse.urlparse(req.get_full_url())[1]
if base64 is None or 'Authorization' not in req.headers \
or 'WWW-Authenticate' not in headers:
return self.http_error_default(req, fp, code, msg, headers)
auth = _base64decode(req.headers['Authorization'].split(' ')[1])
user, passw = auth.split(':')
realm = re.findall('realm="([^"]*)"', headers['WWW-Authenticate'])[0]
self.add_password(realm, host, user, passw)
retry = self.http_error_auth_reqed('www-authenticate', host, req, headers)
self.reset_retry_count()
return retry
def _convert_to_idn(url):
"""Convert a URL to IDN notation"""
# this function should only be called with a unicode string
# strategy: if the host cannot be encoded in ascii, then
# it'll be necessary to encode it in idn form
parts = list(urllib.parse.urlsplit(url))
try:
parts[1].encode('ascii')
except UnicodeEncodeError:
# the url needs to be converted to idn notation
host = parts[1].rsplit(':', 1)
newhost = []
port = ''
if len(host) == 2:
port = host.pop()
for h in host[0].split('.'):
newhost.append(h.encode('idna').decode('utf-8'))
parts[1] = '.'.join(newhost)
if port:
parts[1] += ':' + port
return urllib.parse.urlunsplit(parts)
else:
return url