def can_fetch(self, useragent, url):
"""using the parsed robots.txt decide if useragent can fetch url"""
if self.disallow_all:
return False
if self.allow_all:
return True
# search for given user agent matches
# the first match counts
parsed_url = urlparse.urlparse(urllib.unquote(url))
url = urlparse.urlunparse(('', '', parsed_url.path,
parsed_url.params, parsed_url.query, parsed_url.fragment))
url = urllib.quote(url)
if not url:
url = "/"
for entry in self.entries:
if entry.applies_to(useragent):
return entry.allowance(url)
# try the default entry last
if self.default_entry:
return self.default_entry.allowance(url)
# agent not found ==> access granted
return True
python类urlparse()的实例源码
def GetSCAFileContents( url ):
fileContents = None
scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
if scheme=="sca" :
queryAsDict = dict([x.split("=") for x in query.split("&")])
try:
orb=CORBA.ORB_init()
fileSys = orb.string_to_object(queryAsDict["fs"])
except KeyError:
logging.warning("sca URI missing fs query parameter")
except:
logging.warning("Unable to get ORB reference")
else:
if fileSys == None:
logging.warning("Failed to lookup file system")
else:
try:
scaFile = fileSys.open(path, True)
fileSize = scaFile.sizeOf()
fileContents = scaFile.read(fileSize)
scaFile.close()
finally:
pass
return fileContents
def reduce_uri(self, uri, default_port=True):
"""Accept authority or URI and extract only the authority and path."""
# note HTTP URLs do not have a userinfo component
parts = urlparse.urlsplit(uri)
if parts[1]:
# URI
scheme = parts[0]
authority = parts[1]
path = parts[2] or '/'
else:
# host or host:port
scheme = None
authority = uri
path = '/'
host, port = splitport(authority)
if default_port and port is None and scheme is not None:
dport = {"http": 80,
"https": 443,
}.get(scheme)
if dport is not None:
authority = "%s:%d" % (host, dport)
return authority, path
def resolveEntity(self, publicId, systemId):
assert systemId is not None
source = DOMInputSource()
source.publicId = publicId
source.systemId = systemId
source.byteStream = self._get_opener().open(systemId)
# determine the encoding if the transport provided it
source.encoding = self._guess_media_encoding(source)
# determine the base URI is we can
import posixpath, urlparse
parts = urlparse.urlparse(systemId)
scheme, netloc, path, params, query, fragment = parts
# XXX should we check the scheme here as well?
if path and not path.endswith("/"):
path = posixpath.dirname(path) + "/"
parts = scheme, netloc, path, params, query, fragment
source.baseURI = urlparse.urlunparse(parts)
return source
def _remove_ignored_parameters(self, request):
def filter_ignored_parameters(data):
return [(k, v) for k, v in data if k not in self._ignored_parameters]
url = urlparse(request.url)
query = parse_qsl(url.query)
query = filter_ignored_parameters(query)
query = urlencode(query)
url = urlunparse((url.scheme, url.netloc, url.path, url.params, query, url.fragment))
body = request.body
content_type = request.headers.get('content-type')
if body and content_type:
if content_type == 'application/x-www-form-urlencoded':
body = parse_qsl(body)
body = filter_ignored_parameters(body)
body = urlencode(body)
elif content_type == 'application/json':
import json
body = json.loads(body)
body = filter_ignored_parameters(sorted(body.items()))
body = json.dumps(body)
return url, body
def get(self, netloc, ua, timeout):
try:
headers = {'User-Agent': ua, 'Referer': netloc}
result = _basic_request(netloc, headers=headers, timeout=timeout)
match = re.findall('xhr\.open\("GET","([^,]+),', result)
if not match:
return False
url_Parts = match[0].split('"')
url_Parts[1] = '1680'
url = urlparse.urljoin(netloc, ''.join(url_Parts))
match = re.findall('rid=([0-9a-zA-Z]+)', url_Parts[0])
if not match:
return False
headers['Cookie'] = 'rcksid=%s' % match[0]
result = _basic_request(url, headers=headers, timeout=timeout)
return self.getCookieString(result, headers['Cookie'])
except:
return
# not very robust but lazieness...
def googlepass(url):
try:
try:
headers = dict(urlparse.parse_qsl(url.rsplit('|', 1)[1]))
except:
headers = None
url = url.split('|')[0].replace('\\', '')
url = client.request(url, headers=headers, output='geturl')
if 'requiressl=yes' in url:
url = url.replace('http://', 'https://')
else:
url = url.replace('https://', 'http://')
if headers: url += '|%s' % urllib.urlencode(headers)
return url
except:
return
def geturl(url):
try:
r = client.request(url, output='geturl')
if r == None: return r
host1 = re.findall('([\w]+)[.][\w]+$', urlparse.urlparse(url.strip().lower()).netloc)[0]
host2 = re.findall('([\w]+)[.][\w]+$', urlparse.urlparse(r.strip().lower()).netloc)[0]
if host1 == host2: return r
proxies = sorted(get(), key=lambda x: random.random())
proxies = sorted(proxies, key=lambda x: random.random())
proxies = proxies[:3]
for p in proxies:
p += urllib.quote_plus(url)
r = client.request(p, output='geturl')
if not r == None: return parse(r)
except:
pass
def movie(self, imdb, title, localtitle, aliases, year):
try:
t = cleantitle.get(title)
p = self.post_link % urllib.quote_plus(cleantitle.query(title))
q = urlparse.urljoin(self.base_link, self.search_link)
r = proxy.request(q, 'playing top', post=p, XHR=True)
r = client.parseDOM(r, 'li')
r = [(client.parseDOM(i, 'a', ret='href'), client.parseDOM(i, 'a')) for i in r]
r = [(i[0][0], i[1][0]) for i in r if i[0] and i[1]]
r = [(i[0], re.findall('(.+?)\((\d{4})', i[1])) for i in r]
r = [(i[0], i[1][0][0], i[1][0][1]) for i in r if i[1]]
r = [i for i in r if t == cleantitle.get(i[1]) and str(year) == i[2]]
url = proxy.parse(r[0][0])
url = re.findall('(?://.+?|)(/.+)', url)[0]
url = client.replaceHTMLCodes(url)
url = url.encode('utf-8')
return url
except:
pass
def searchMovie(self, title, year, aliases):
try:
url = '%s/%s-%s/' % (self.base_link, cleantitle.geturl(title), year)
url = client.request(url, output='geturl')
if url == None:
t = cleantitle.get(title)
q = '%s %s' % (title, year)
q = urlparse.urljoin(self.base_link, self.search_link % urllib.quote_plus(q))
r = client.request(q)
r = client.parseDOM(r, 'div', attrs={'class': 'inner'})
r = client.parseDOM(r, 'div', attrs={'class': 'info'})
r = zip(client.parseDOM(r, 'a', ret='href'), client.parseDOM(r, 'a', ret='title'))
r = [(i[0], re.findall('(?:^Watch Movie |^Watch movies |^Watch |)(.+?)\((\d{4})', i[1])) for i in r]
r = [(i[0], i[1][0][0], i[1][0][1]) for i in r if i[1]]
url = [i[0] for i in r if self.matchAlias(i[1], aliases) and year == i[2]][0]
if url == None: raise Exception()
return url
except:
return
def searchMovie(self, title, year, aliases, headers):
try:
title = cleantitle.normalize(title)
url = urlparse.urljoin(self.base_link, self.search_link % cleantitle.geturl(title))
r = client.request(url, headers=headers, timeout='15')
r = client.parseDOM(r, 'div', attrs={'class': 'ml-item'})
r = zip(client.parseDOM(r, 'a', ret='href'), client.parseDOM(r, 'a', ret='title'))
results = [(i[0], i[1], re.findall('\((\d{4})', i[1])) for i in r]
try:
r = [(i[0], i[1], i[2][0]) for i in results if len(i[2]) > 0]
url = [i[0] for i in r if self.matchAlias(i[1], aliases) and (year == i[2])][0]
except:
url = None
pass
if (url == None):
url = [i[0] for i in results if self.matchAlias(i[1], aliases)][0]
url = urlparse.urljoin(self.base_link, '%s/watching.html' % url)
return url
except:
return
def tvshow(self, imdb, tvdb, tvshowtitle, localtvshowtitle, aliases, year):
try:
tvshowtitle = cleantitle.getsearch(tvshowtitle)
p = urllib.urlencode({'action': 'ajaxy_sf', 'sf_value': tvshowtitle, 'search': 'false'})
r = urlparse.urljoin(self.base_link, self.search_link)
result = client.request(r, post=p, XHR=True)
diziler = json.loads(result)['diziler'][0]['all']
for i in diziler:
t = cleantitle.get(i['post_title'])
if tvshowtitle == t:
url = i['post_link']
url = url.split('/')[4]
url = url.encode('utf-8')
return url
except:
return
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
try:
if url == None: return
tv_maze = tvmaze.tvMaze()
num = tv_maze.episodeAbsoluteNumber(tvdb, int(season), int(episode))
num = str(num)
url = urlparse.urljoin(self.base_link, url)
r = client.request(url)
r = client.parseDOM(r, 'tr', attrs = {'class': ''})
r = [(client.parseDOM(i, 'a', ret='href'), client.parseDOM(i, 'td', attrs = {'class': 'epnum'})) for i in r]
r = [(i[0][0], i[1][0]) for i in r if len(i[0]) > 0 and len(i[1]) > 0]
r = [i[0] for i in r if num == i[1]][0]
url = re.findall('(?://.+?|)(/.+)', r)[0]
url = client.replaceHTMLCodes(url)
url = url.encode('utf-8')
return url
except:
return
def resolve(self, url):
try:
b = urlparse.urlparse(url).netloc
b = re.compile('([\w]+[.][\w]+)$').findall(b)[0]
if not b in base64.b64decode(self.b_link): return url
u, p, h = url.split('|')
r = urlparse.parse_qs(h)['Referer'][0]
#u += '&app_id=Exodus'
c = self.request(r, output='cookie', close=False)
result = self.request(u, post=p, referer=r, cookie=c)
url = result.split('url=')
url = [urllib.unquote_plus(i.strip()) for i in url]
url = [i for i in url if i.startswith('http')]
url = url[-1]
return url
except:
return
def tvshow(self, imdb, tvdb, tvshowtitle, localtvshowtitle, aliases, year):
try:
query = self.search_link % (urllib.quote_plus(tvshowtitle))
query = urlparse.urljoin(self.base_link, query)
result = client.request(query)
result = client.parseDOM(result, 'div', attrs={'class': 'movie clearfix'})
result = [(client.parseDOM(i, 'a', ret='href'),
client.parseDOM(i, 'span', attrs={'class': 'title-pl'}),
client.parseDOM(i, 'span', attrs={'class': 'title-en'}),
client.parseDOM(i, 'img', ret='src'),
client.parseDOM(i, 'p'),
client.parseDOM(i, 'p', attrs={'class': 'plot'})) for i in result ]
result = [(i[0][0], u" ".join(i[1] + i[2]), re.findall('(\d{4})', i[4][0])) for i in result]
result = [i for i in result if 'serial' in i[0]]
result = [i for i in result if cleantitle.get(tvshowtitle) in cleantitle.get(i[1])]
years = ['%s' % str(year), '%s' % str(int(year) + 1), '%s' % str(int(year) - 1)]
result = [i[0] for i in result if any(x in i[2] for x in years)][0]
url = result
return url
except:
return
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
try:
if not url:
return
query = urlparse.urljoin(self.base_link, url)
r = client.request(query)
r = dom_parser.parse_dom(r, 'td', attrs={'data-title-name': re.compile('Season %02d' % int(season))})
r = dom_parser.parse_dom(r, 'a', req='href')[0].attrs['href']
r = client.request(urlparse.urljoin(self.base_link, r))
r = dom_parser.parse_dom(r, 'td', attrs={'data-title-name': re.compile('Episode %02d' % int(episode))})
r = dom_parser.parse_dom(r, 'a', req='href')[0].attrs['href']
return source_utils.strip_domain(r)
except:
return
def __search(self, search_link, imdb, titles):
try:
query = search_link % (urllib.quote_plus(cleantitle.query(titles[0])))
query = urlparse.urljoin(self.base_link, query)
t = [cleantitle.get(i) for i in set(titles) if i]
r = client.request(query)
r = dom_parser.parse_dom(r, 'div', attrs={'class': 'big-list'})
r = dom_parser.parse_dom(r, 'table', attrs={'class': 'row'})
r = dom_parser.parse_dom(r, 'td', attrs={'class': 'list-name'})
r = dom_parser.parse_dom(r, 'a', req='href')
r = [i.attrs['href']for i in r if i and cleantitle.get(i.content) in t][0]
url = source_utils.strip_domain(r)
r = client.request(urlparse.urljoin(self.base_link, url))
r = dom_parser.parse_dom(r, 'a', attrs={'href': re.compile('.*/tt\d+.*')}, req='href')
r = [re.findall('.+?(tt\d+).*?', i.attrs['href']) for i in r]
r = [i[0] for i in r if i]
return url if imdb in r else None
except:
return
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
try:
if not url:
return
data = urlparse.parse_qs(url)
data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
tvshowtitle = data['tvshowtitle']
localtvshowtitle = data['localtvshowtitle']
aliases = source_utils.aliases_to_array(eval(data['aliases']))
year = re.findall('(\d{4})', premiered)
year = year[0] if year else data['year']
url = self.__search([localtvshowtitle] + aliases, year, season, episode)
if not url and tvshowtitle != localtvshowtitle:
url = self.__search([tvshowtitle] + aliases, year, season, episode)
return url
except:
return
def __search_movie(self, imdb, year):
try:
query = urlparse.urljoin(self.base_link, self.search_link % imdb)
y = ['%s' % str(year), '%s' % str(int(year) + 1), '%s' % str(int(year) - 1), '0']
r = client.request(query)
r = dom_parser.parse_dom(r, 'div', attrs={'class': 'container'})
r = dom_parser.parse_dom(r, 'div', attrs={'class': 'ml-item-content'})
r = [(dom_parser.parse_dom(i, 'a', attrs={'class': 'ml-image'}, req='href'), dom_parser.parse_dom(i, 'ul', attrs={'class': 'item-params'})) for i in r]
r = [(i[0][0].attrs['href'], re.findall('calendar.+?>.+?(\d{4})', ''.join([x.content for x in i[1]]))) for i in r if i[0] and i[1]]
r = [(i[0], i[1][0] if len(i[1]) > 0 else '0') for i in r]
r = sorted(r, key=lambda i: int(i[1]), reverse=True) # with year > no year
r = [i[0] for i in r if i[1] in y][0]
url = urlparse.urlparse(r).path
url = client.replaceHTMLCodes(url)
url = url.encode('utf-8')
return url
except:
return
def moonwalk(link, ref, season, episode):
try:
if season and episode:
q = dict(urlparse.parse_qsl(urlparse.urlsplit(link).query))
q.update({'season': season, 'episode': episode})
q = (urllib.urlencode(q)).replace('%2C', ',')
link = link.replace('?' + urlparse.urlparse(link).query, '') + '?' + q
trans = __get_moonwalk_translators(link, ref)
trans = trans if trans else [(link, '')]
urls = []
for i in trans:
urls += __get_moonwalk(i[0], ref, info=i[1])
return urls
except:
return []
def need_update(self):
if "HTTP_PROXY" in os.environ or "HTTPS_PROXY" in os.environ:
if "HTTP_PROXY" in os.environ:
if sys.version_info >= (3, 0):
proxy = urllib.parse.urlparse(os.environ["HTTP_PROXY"])
else:
proxy = urlparse.urlparse(os.environ["HTTP_PROXY"])
else:
if sys.version_info >= (3, 0):
proxy = urllib.parse.urlparse(os.environ["HTTPS_PROXY"])
else:
proxy = urlparse.urlparse(os.environ["HTTPS_PROXY"])
if sys.version_info >= (3, 0):
conn = http.client.HTTPSConnection(proxy.hostname, proxy.port)
else:
conn = httplib.HTTPSConnection(proxy.hostname, proxy.port)
conn.set_tunnel(self.version_host, 443)
else:
if sys.version_info >= (3, 0):
conn = http.client.HTTPSConnection("raw.githubusercontent.com")
else:
conn = httplib.HTTPSConnection("raw.githubusercontent.com")
conn.request("GET", self.version_url)
version = conn.getresponse().read()
try:
if StrictVersion(version) > StrictVersion(PYJFUZZ_VERSION):
self.new_version = version
return True
except:
pass
return False
def write(self):
if os.path.isfile(self.lib):
with open(self.lib) as f:
lib_repo = Repo.fromurl(f.read().strip())
if (formaturl(lib_repo.url, 'https') == formaturl(self.url, 'https') # match URLs in common format (https)
and (lib_repo.rev == self.rev # match revs, even if rev is None (valid for repos with no revisions)
or (lib_repo.rev and self.rev
and lib_repo.rev == self.rev[0:len(lib_repo.rev)]))): # match long and short rev formats
#print self.name, 'unmodified'
return
ref = (formaturl(self.url, 'https').rstrip('/') + '/' +
(('' if self.is_build else '#') +
self.rev if self.rev else ''))
action("Updating reference \"%s\" -> \"%s\"" % (relpath(cwd_root, self.path) if cwd_root != self.path else self.name, ref))
with open(self.lib, 'wb') as f:
with_auth = urlparse(ref)
f.write(with_auth._replace(netloc=with_auth.hostname).geturl())
f.write("\n")
def api_is_run(url):
"""Determine if a URL looks like a valid run URL"""
# Note that this generates an extra array element because of the
# leading slash.
url_parts = urlparse.urlparse(url).path.split('/')
if len(url_parts) != 6 \
or (url_parts[:3] != ['', 'pscheduler', 'tasks' ]) \
or (url_parts[4] != 'runs'):
return False
try:
uuid.UUID(url_parts[3])
uuid.UUID(url_parts[5])
except ValueError:
return False
return True
def __init__(self, iso_url, **kwargs):
Version.__init__(self, **kwargs)
if re.match(r'/', iso_url):
self.m_iso_url = "file://" + iso_url
self.m_iso_path = iso_url
else:
self.m_iso_url = iso_url
self.m_iso_path = None
# We can't determine the final ISO file name yet because the work
# directory is not known at this point, but we can precalculate the
# basename of it.
self.m_iso_basename = os.path.basename(
urllib.url2pathname(urlparse.urlparse(iso_url)[2]))
m = re.match(r"(.*)cd.*iso|NetBSD-[0-9\._A-Z]+-(.*).iso", self.m_iso_basename)
if m is None:
raise RuntimeError("cannot guess architecture from ISO name '%s'"
% self.m_iso_basename)
if m.group(1) is not None:
self.m_arch = m.group(1)
if m.group(2) is not None:
self.m_arch = m.group(2)
check_arch_supported(self.m_arch, 'iso')
def test_validate_image_status_before_upload_unexpected_resp_v1(self):
mock_conn = mock.Mock()
fake_url = 'http://fake_host/fake_path/fake_image_id'
parts = urlparse(fake_url)
path = parts[2]
fake_image_id = path.split('/')[-1]
mock_head_resp = mock.Mock()
mock_head_resp.status = httplib.BAD_REQUEST
mock_head_resp.read.return_value = 'fakeData'
mock_head_resp.getheader.return_value = 'queued'
mock_conn.getresponse.return_value = mock_head_resp
self.mock_patch_object(self.glance, 'check_resp_status_and_retry')
self.glance.validate_image_status_before_upload_v1(
mock_conn, fake_url, extra_headers=mock.Mock())
self.assertEqual(mock_head_resp.read.call_count, 2)
self.glance.check_resp_status_and_retry.assert_called_with(
mock_head_resp, fake_image_id, fake_url)
mock_conn.request.assert_called_once()
def check_headers(self, headers):
etag = headers.get('etag')
if etag is not None:
if etag.startswith(('W/', 'w/')):
if etag.startswith('w/'):
warn(HTTPWarning('weak etag indicator should be upcase.'),
stacklevel=4)
etag = etag[2:]
if not (etag[:1] == etag[-1:] == '"'):
warn(HTTPWarning('unquoted etag emitted.'), stacklevel=4)
location = headers.get('location')
if location is not None:
if not urlparse(location).netloc:
warn(HTTPWarning('absolute URLs required for location header'),
stacklevel=4)
def make_next_param(login_url, current_url):
'''
Reduces the scheme and host from a given URL so it can be passed to
the given `login` URL more efficiently.
:param login_url: The login URL being redirected to.
:type login_url: str
:param current_url: The URL to reduce.
:type current_url: str
'''
l = urlparse(login_url)
c = urlparse(current_url)
if (not l.scheme or l.scheme == c.scheme) and \
(not l.netloc or l.netloc == c.netloc):
return urlunparse(('', '', c.path, c.params, c.query, ''))
return current_url
def _BuildUrl(self, url, path_elements=None, extra_params=None):
# Break url into consituent parts
(scheme, netloc, path, params, query, fragment) = urlparse.urlparse(url)
# Add any additional path elements to the path
if path_elements:
# Filter out the path elements that have a value of None
p = [i for i in path_elements if i]
if not path.endswith('/'):
path += '/'
path += '/'.join(p)
# Add any additional query parameters to the query string
if extra_params and len(extra_params) > 0:
extra_query = self._EncodeParameters(extra_params)
# Add it to the existing query
if query:
query += '&' + extra_query
else:
query = extra_query
# Return the rebuilt URL
return urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
def get_netloc(url):
"""Return the netloc from a URL.
If the input value is not a value URL the method will raise an Ansible
filter exception.
:param url: the URL to parse
:type url: ``str``
:returns: ``str``
"""
try:
netloc = urlparse(url).netloc
except Exception as exp:
raise errors.AnsibleFilterError(
'Failed to return the netloc of: "%s"' % str(exp)
)
else:
return netloc
def get_netorigin(url):
"""Return the netloc from a URL.
If the input value is not a value URL the method will raise an Ansible
filter exception.
:param url: the URL to parse
:type url: ``str``
:returns: ``str``
"""
try:
parsed_url = urlparse(url)
netloc = parsed_url.netloc
scheme = parsed_url.scheme
except Exception as exp:
raise errors.AnsibleFilterError(
'Failed to return the netorigin of: "%s"' % str(exp)
)
else:
return '%s://%s' % (scheme, netloc)