def can_fetch(self, useragent, url):
"""using the parsed robots.txt decide if useragent can fetch url"""
if self.disallow_all:
return False
if self.allow_all:
return True
# search for given user agent matches
# the first match counts
parsed_url = urlparse.urlparse(urllib.unquote(url))
url = urlparse.urlunparse(('', '', parsed_url.path,
parsed_url.params, parsed_url.query, parsed_url.fragment))
url = urllib.quote(url)
if not url:
url = "/"
for entry in self.entries:
if entry.applies_to(useragent):
return entry.allowance(url)
# try the default entry last
if self.default_entry:
return self.default_entry.allowance(url)
# agent not found ==> access granted
return True
python类urlunparse()的实例源码
def resolveEntity(self, publicId, systemId):
assert systemId is not None
source = DOMInputSource()
source.publicId = publicId
source.systemId = systemId
source.byteStream = self._get_opener().open(systemId)
# determine the encoding if the transport provided it
source.encoding = self._guess_media_encoding(source)
# determine the base URI is we can
import posixpath, urlparse
parts = urlparse.urlparse(systemId)
scheme, netloc, path, params, query, fragment = parts
# XXX should we check the scheme here as well?
if path and not path.endswith("/"):
path = posixpath.dirname(path) + "/"
parts = scheme, netloc, path, params, query, fragment
source.baseURI = urlparse.urlunparse(parts)
return source
def make_next_param(login_url, current_url):
'''
Reduces the scheme and host from a given URL so it can be passed to
the given `login` URL more efficiently.
:param login_url: The login URL being redirected to.
:type login_url: str
:param current_url: The URL to reduce.
:type current_url: str
'''
l = urlparse(login_url)
c = urlparse(current_url)
if (not l.scheme or l.scheme == c.scheme) and \
(not l.netloc or l.netloc == c.netloc):
return urlunparse(('', '', c.path, c.params, c.query, ''))
return current_url
def getlinkinfos(self):
# File reading is done in __init__() routine. Store parser in
# local variable to indicate success of parsing.
# If no parser was stored, fail.
if not self.parser: return []
rawlinks = self.parser.getlinks()
base = urlparse.urljoin(self.url, self.parser.getbase() or "")
infos = []
for rawlink in rawlinks:
t = urlparse.urlparse(rawlink)
# DON'T DISCARD THE FRAGMENT! Instead, include
# it in the tuples which are returned. See Checker.dopage().
fragment = t[-1]
t = t[:-1] + ('',)
rawlink = urlparse.urlunparse(t)
link = urlparse.urljoin(base, rawlink)
infos.append((link, rawlink, fragment))
return infos
def _BuildUrl(self, url, path_elements=None, extra_params=None):
# Break url into consituent parts
(scheme, netloc, path, params, query, fragment) = urlparse.urlparse(url)
# Add any additional path elements to the path
if path_elements:
# Filter out the path elements that have a value of None
p = [i for i in path_elements if i]
if not path.endswith('/'):
path += '/'
path += '/'.join(p)
# Add any additional query parameters to the query string
if extra_params and len(extra_params) > 0:
extra_query = self._EncodeParameters(extra_params)
# Add it to the existing query
if query:
query += '&' + extra_query
else:
query = extra_query
# Return the rebuilt URL
return urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
def _add_query_parameter(url, name, value):
"""Adds a query parameter to a url.
Replaces the current value if it already exists in the URL.
Args:
url: string, url to add the query parameter to.
name: string, query parameter name.
value: string, query parameter value.
Returns:
Updated query parameter. Does not update the url if value is None.
"""
if value is None:
return url
else:
parsed = list(urlparse.urlparse(url))
q = dict(parse_qsl(parsed[4]))
q[name] = value
parsed[4] = urllib.urlencode(q)
return urlparse.urlunparse(parsed)
def scrobble_show(self, show_name, season_number, episode_number, progress, scrobble_type):
self.logger.info(
'Scrobbling ({scrobble_type}) {show_name} - S{season_number}E{episode_number} - {progress} to trak.tv.'
.format(show_name=show_name, scrobble_type=scrobble_type, season_number=season_number.zfill(2),
episode_number=episode_number.zfill(2), progress=progress))
data = {}
data['show'] = {}
data['show']['title'] = show_name
data['episode'] = {}
data['episode']['season'] = int(season_number)
data['episode']['number'] = int(episode_number)
data['progress'] = int(progress)
data['app_version'] = '1.0'
data['app_date'] = '2014-09-22'
json_data = json.dumps(data)
url = urlparse.urlunparse(('https', 'api-v2launch.trakt.tv', '/scrobble/' + scrobble_type, '', '', ''))
try:
self._do_trakt_auth_post(url, json_data)
except:
return False
return True
def scrobble_movie(self, imdb_id, progress, scrobble_type):
self.logger.info('Scrobbling ({scrobble_type}) {imdb_id} - {progress} to trak.tv.'
.format(imdb_id=imdb_id, scrobble_type=scrobble_type, progress=progress))
data = {}
data['movie'] = {}
data['movie']['ids'] = {}
data['movie']['ids']['imdb'] = imdb_id
data['progress'] = int(progress)
data['app_version'] = '1.0'
data['app_date'] = '2014-09-22'
json_data = json.dumps(data)
url = urlparse.urlunparse(('https', 'api-v2launch.trakt.tv', '/scrobble/' + scrobble_type, '', '', ''))
try:
self._do_trakt_auth_post(url, json_data)
except:
return False
return True
def _parse(url, defaultPort=None):
url = url.strip()
parsed = urlparse.urlparse(url)
scheme = parsed[0]
path = urlparse.urlunparse(('','')+parsed[2:])
if defaultPort is None:
if scheme == 'https':
defaultPort = 443
else:
defaultPort = 80
host, port = parsed[1], defaultPort
if ':' in host:
host, port = host.split(':')
port = int(port)
if path == "":
path = "/"
return scheme, host, port, path
def process(self):
parsed = urlparse.urlparse(self.uri)
protocol = parsed[0]
host = parsed[1]
port = self.ports[protocol]
if ':' in host:
host, port = host.split(':')
port = int(port)
rest = urlparse.urlunparse(('','')+parsed[2:])
if not rest:
rest = rest+'/'
class_ = self.protocols[protocol]
headers = self.getAllHeaders().copy()
if not headers.has_key('host'):
headers['host'] = host
self.content.seek(0, 0)
s = self.content.read()
clientFactory = class_(self.method, rest, self.clientproto, headers,
s, self)
reactor.connectTCP(host, port, clientFactory)
def resolveEntity(self, publicId, systemId):
assert systemId is not None
source = DOMInputSource()
source.publicId = publicId
source.systemId = systemId
source.byteStream = self._get_opener().open(systemId)
# determine the encoding if the transport provided it
source.encoding = self._guess_media_encoding(source)
# determine the base URI is we can
import posixpath, urlparse
parts = urlparse.urlparse(systemId)
scheme, netloc, path, params, query, fragment = parts
# XXX should we check the scheme here as well?
if path and not path.endswith("/"):
path = posixpath.dirname(path) + "/"
parts = scheme, netloc, path, params, query, fragment
source.baseURI = urlparse.urlunparse(parts)
return source
def resolveEntity(self, publicId, systemId):
assert systemId is not None
source = DOMInputSource()
source.publicId = publicId
source.systemId = systemId
source.byteStream = self._get_opener().open(systemId)
# determine the encoding if the transport provided it
source.encoding = self._guess_media_encoding(source)
# determine the base URI is we can
import posixpath, urlparse
parts = urlparse.urlparse(systemId)
scheme, netloc, path, params, query, fragment = parts
# XXX should we check the scheme here as well?
if path and not path.endswith("/"):
path = posixpath.dirname(path) + "/"
parts = scheme, netloc, path, params, query, fragment
source.baseURI = urlparse.urlunparse(parts)
return source
def _add_query_parameter(url, name, value):
"""Adds a query parameter to a url.
Replaces the current value if it already exists in the URL.
Args:
url: string, url to add the query parameter to.
name: string, query parameter name.
value: string, query parameter value.
Returns:
Updated query parameter. Does not update the url if value is None.
"""
if value is None:
return url
else:
parsed = list(urlparse.urlparse(url))
q = dict(parse_qsl(parsed[4]))
q[name] = value
parsed[4] = urllib.urlencode(q)
return urlparse.urlunparse(parsed)
def do_get(self, url, top_level=False, top_level_path=""):
parts = list(urlparse.urlparse(url))
# 2 is the path offset
if top_level:
parts[2] = '/' + top_level_path
parts[2] = MULTIPLE_SLASH.sub('/', parts[2])
url = urlparse.urlunparse(parts)
try:
if self.disable_ssl_validation:
urllib3.disable_warnings()
http = urllib3.PoolManager(cert_reqs='CERT_NONE')
else:
http = urllib3.PoolManager()
r = http.request('GET', url, headers=self.headers)
except Exception as e:
LOG.error("Request on service '%s' with url '%s' failed",
(self.name, url))
raise e
if r.status >= 400:
raise ServiceError("Request on service '%s' with url '%s' failed"
" with code %d" % (self.name, url, r.status))
return r.data
def handle_redirect_to_login(request, **kwargs):
login_url = kwargs.get("login_url")
redirect_field_name = kwargs.get("redirect_field_name")
next_url = kwargs.get("next_url")
if login_url is None:
login_url = settings.ACCOUNT_LOGIN_URL
if next_url is None:
next_url = request.get_full_path()
try:
login_url = urlresolvers.reverse(login_url)
except urlresolvers.NoReverseMatch:
if callable(login_url):
raise
if "/" not in login_url and "." not in login_url:
raise
url_bits = list(urlparse.urlparse(login_url))
if redirect_field_name:
querystring = QueryDict(url_bits[4], mutable=True)
querystring[redirect_field_name] = next_url
url_bits[4] = querystring.urlencode(safe="/")
return HttpResponseRedirect(urlparse.urlunparse(url_bits))
def url(self, value):
self.__dict__['url'] = value
if value is not None:
scheme, netloc, path, params, query, fragment = urlparse.urlparse(value)
# Exclude default port numbers.
if scheme == 'http' and netloc[-3:] == ':80':
netloc = netloc[:-3]
elif scheme == 'https' and netloc[-4:] == ':443':
netloc = netloc[:-4]
if scheme not in ('http', 'https'):
raise ValueError("Unsupported URL %s (%s)." % (value, scheme))
# Normalized URL excludes params, query, and fragment.
self.normalized_url = urlparse.urlunparse((scheme, netloc, path, None, None, None))
else:
self.normalized_url = None
self.__dict__['url'] = None
def resolveEntity(self, publicId, systemId):
assert systemId is not None
source = DOMInputSource()
source.publicId = publicId
source.systemId = systemId
source.byteStream = self._get_opener().open(systemId)
# determine the encoding if the transport provided it
source.encoding = self._guess_media_encoding(source)
# determine the base URI is we can
import posixpath, urlparse
parts = urlparse.urlparse(systemId)
scheme, netloc, path, params, query, fragment = parts
# XXX should we check the scheme here as well?
if path and not path.endswith("/"):
path = posixpath.dirname(path) + "/"
parts = scheme, netloc, path, params, query, fragment
source.baseURI = urlparse.urlunparse(parts)
return source
def make_next_param(login_url, current_url):
'''
Reduces the scheme and host from a given URL so it can be passed to
the given `login` URL more efficiently.
:param login_url: The login URL being redirected to.
:type login_url: str
:param current_url: The URL to reduce.
:type current_url: str
'''
l = urlparse(login_url)
c = urlparse(current_url)
if (not l.scheme or l.scheme == c.scheme) and \
(not l.netloc or l.netloc == c.netloc):
return urlunparse(('', '', c.path, c.params, c.query, ''))
return current_url
def _do_put_request(self, resource, param_dict):
req_url = urlparse.urlunparse(["http", self.host, "api/v%s/%s" % (self.api_version, resource), "", "", ""])
print "req_url=%s" % (req_url)
opener = urllib2.build_opener(urllib2.HTTPHandler)
req = urllib2.Request(req_url, data=json.dumps(param_dict))
req.add_header('Content-Type', 'application/json')
req.get_method = lambda: 'PUT'
try:
return eval(opener.open(req).read())
except urllib2.HTTPError, err:
return parse_errors(err)
#---------------------------------------------
# error parsing
# --------------------------------------------
def handle_redirect_to_login(request, **kwargs):
login_url = kwargs.get("login_url")
redirect_field_name = kwargs.get("redirect_field_name")
next_url = kwargs.get("next_url")
if login_url is None:
login_url = settings.ACCOUNT_LOGIN_URL
if next_url is None:
next_url = request.get_full_path()
try:
login_url = urlresolvers.reverse(login_url)
except urlresolvers.NoReverseMatch:
if callable(login_url):
raise
if "/" not in login_url and "." not in login_url:
raise
url_bits = list(urlparse(login_url))
if redirect_field_name:
querystring = QueryDict(url_bits[4], mutable=True)
querystring[redirect_field_name] = next_url
url_bits[4] = querystring.urlencode(safe="/")
return HttpResponseRedirect(urlunparse(url_bits))
def resolveEntity(self, publicId, systemId):
assert systemId is not None
source = DOMInputSource()
source.publicId = publicId
source.systemId = systemId
source.byteStream = self._get_opener().open(systemId)
# determine the encoding if the transport provided it
source.encoding = self._guess_media_encoding(source)
# determine the base URI is we can
import posixpath, urlparse
parts = urlparse.urlparse(systemId)
scheme, netloc, path, params, query, fragment = parts
# XXX should we check the scheme here as well?
if path and not path.endswith("/"):
path = posixpath.dirname(path) + "/"
parts = scheme, netloc, path, params, query, fragment
source.baseURI = urlparse.urlunparse(parts)
return source
def _build_url(self, endpoint, params={}):
"""Return the full URL for the desired endpoint.
Args:
endpoint (str): the API endpoint after base URL
params (dict): any params to include in the request
Returns:
(str) the full URL of the request
"""
new_params = {'circle-token': self._token}
new_params.update(params)
parsed_url = urlparse(self._base_url)
new_parse = ParseResult(scheme=parsed_url.scheme, netloc=parsed_url.netloc,
path='/'.join((parsed_url.path, endpoint)),
params='', query=urlencode(new_params),
fragment='')
return urlunparse(new_parse)
def __getattr__ (self,name):
if name=="urlWithoutVariables":
return urlunparse((self.schema,self.__host,self.__path,'','',''))
elif name=="pathWithVariables":
return urlunparse(('','',self.__path,'',self.__variablesGET.urlEncoded(),''))
elif name=="completeUrl":
return urlunparse((self.schema,self.__host,self.__path,self.__params,self.__variablesGET.urlEncoded(),''))
elif name=="finalUrl":
if self.__finalurl:
return self.__finalurl
return self.completeUrl
elif name=="urlWithoutPath":
return "%s://%s" % (self.schema,self._headers["Host"])
elif name=="path":
return self.__path
elif name=="postdata":
if self.ContentType=="application/x-www-form-urlencoded":
return self.__variablesPOST.urlEncoded()
elif self.ContentType=="multipart/form-data":
return self.__variablesPOST.multipartEncoded()
else:
return self.__uknPostData
else:
raise AttributeError
def resolveEntity(self, publicId, systemId):
assert systemId is not None
source = DOMInputSource()
source.publicId = publicId
source.systemId = systemId
source.byteStream = self._get_opener().open(systemId)
# determine the encoding if the transport provided it
source.encoding = self._guess_media_encoding(source)
# determine the base URI is we can
import posixpath, urlparse
parts = urlparse.urlparse(systemId)
scheme, netloc, path, params, query, fragment = parts
# XXX should we check the scheme here as well?
if path and not path.endswith("/"):
path = posixpath.dirname(path) + "/"
parts = scheme, netloc, path, params, query, fragment
source.baseURI = urlparse.urlunparse(parts)
return source
def _http_request(self, verb, path, body, headers):
"""Makes the actual HTTP request.
"""
url = urlparse.urlunparse((self.config.scheme, self.config.server,
path, None, None, None))
LOG.debug("Request is %s:%s" % (verb, url))
LOG.debug("Request headers are %s" % headers)
LOG.debug("Request body is %s" % body)
conn = self._get_connection()
resp, content = conn.request(url, method=verb, body=body,
headers=headers)
#http response code is handled else where
http_status = (resp.status, resp.reason)
resp_headers = dict(
(k.lower(), v)
for k, v in resp.iteritems()
)
resp_body = content
LOG.debug("Response status is %s %s" % http_status)
LOG.debug("Response headers are %s" % resp_headers)
LOG.debug("Response body is %s" % resp_body)
return (http_status, resp_headers, resp_body)
def findTags(self):
all = lambda x: 1
for elm in self.document(all, {'rel': re.compile(r'\btag\b')}):
href = elm.get('href')
if not href:
continue
urlscheme, domain, path, params, query, fragment = \
urlparse.urlparse(_urljoin(self.baseuri, href))
segments = path.split('/')
tag = segments.pop()
if not tag:
if segments:
tag = segments.pop()
else:
# there are no tags
continue
tagscheme = urlparse.urlunparse((urlscheme, domain, '/'.join(segments), '', '', ''))
if not tagscheme.endswith('/'):
tagscheme += '/'
self.tags.append(FeedParserDict({"term": tag, "scheme": tagscheme, "label": elm.string or ''}))
def make_next_param(login_url, current_url):
'''
Reduces the scheme and host from a given URL so it can be passed to
the given `login` URL more efficiently.
:param login_url: The login URL being redirected to.
:type login_url: str
:param current_url: The URL to reduce.
:type current_url: str
'''
l = urlparse(login_url)
c = urlparse(current_url)
if (not l.scheme or l.scheme == c.scheme) and \
(not l.netloc or l.netloc == c.netloc):
return urlunparse(('', '', c.path, c.params, c.query, ''))
return current_url
def findTags(self):
all = lambda x: 1
for elm in self.document(all, {'rel': re.compile(r'\btag\b')}):
href = elm.get('href')
if not href:
continue
urlscheme, domain, path, params, query, fragment = \
urlparse.urlparse(_urljoin(self.baseuri, href))
segments = path.split('/')
tag = segments.pop()
if not tag:
if segments:
tag = segments.pop()
else:
# there are no tags
continue
tagscheme = urlparse.urlunparse((urlscheme, domain, '/'.join(segments), '', '', ''))
if not tagscheme.endswith('/'):
tagscheme += '/'
self.tags.append(FeedParserDict({"term": tag, "scheme": tagscheme, "label": elm.string or ''}))
def normalize_website(cls, w):
from django.core.validators import EMPTY_VALUES
from urlparse import urlparse, urlunparse, ParseResult
w = w.decode('utf-8')
if w in EMPTY_VALUES:
return None
w = w.lower().strip()
if not w.startswith('http://') and not w.startswith('https://'):
w = 'http://' + w.lstrip('/')
try:
parsed = urlparse(w)
except ValueError as e:
return None
else:
new_parsed = ParseResult(scheme='http',
netloc=cls.get_website_tld(w),
path=parsed.path.rstrip('/'),
params='',
query=parsed.query,
fragment='')
return urlunparse(new_parsed)
def _add_query_parameter(url, name, value):
"""Adds a query parameter to a url.
Replaces the current value if it already exists in the URL.
Args:
url: string, url to add the query parameter to.
name: string, query parameter name.
value: string, query parameter value.
Returns:
Updated query parameter. Does not update the url if value is None.
"""
if value is None:
return url
else:
parsed = list(urlparse.urlparse(url))
q = dict(urlparse.parse_qsl(parsed[4]))
q[name] = value
parsed[4] = urllib.urlencode(q)
return urlparse.urlunparse(parsed)