def _get_sync_token(self):
url_parts = urlsplit(self.next_sync_url or self.next_page_url)
querystring = parse_qs(url_parts.query)
return querystring['sync_token'][0]
python类urlsplit()的实例源码
def parse(self, response):
self.responses.append(response)
p = urlsplit(response.url)
self.visited_urls.append(
urlunsplit(['', '', p.path, p.query, p.fragment]) or '/')
urls = {link.url for link in
self.link_extractor.extract_links(response)
if not self._looks_like_logout(link, response)}
for url in urls:
yield self.make_request(url)
def test_login(settings, extra_settings=None):
""" No logout links, just one page after login.
"""
crawler = make_crawler(settings, **AL_SETTINGS)
with MockServer(Login) as s:
yield crawler.crawl(url=s.root_url)
spider = crawler.spider
assert len(spider.visited_urls) == 2
assert set(spider.visited_urls) == {'/', '/hidden'}
response = spider.responses[0]
assert urlsplit(response.url).path.rstrip('/') == ''
assert response.meta['autologin_active']
assert response.meta['autologin_response']['status'] == 'solved'
def test_login_error(settings, extra_settings=None):
""" Trying to login with wrong credentials
"""
al_settings = dict(AL_SETTINGS)
al_settings['AUTOLOGIN_PASSWORD'] = 'wrong'
crawler = make_crawler(settings, **al_settings)
with MockServer(Login) as s:
yield crawler.crawl(url=s.root_url)
spider = crawler.spider
assert len(spider.visited_urls) == 2
assert set(spider.visited_urls) == {'/', '/login'}
response = spider.responses[0]
assert urlsplit(response.url).path.rstrip('/') == ''
assert not response.meta['autologin_active']
assert response.meta['autologin_response']['status'] == 'error'
def create_return_url(base, query, **kwargs):
"""
Add a query string plus extra parameters to a base URL which may contain
a query part already.
:param base: redirect_uri may contain a query part, no fragment allowed.
:param query: Old query part as a string
:param kwargs: extra query parameters
:return:
"""
part = urlsplit(base)
if part.fragment:
raise ValueError("Base URL contained parts it shouldn't")
for key, values in parse_qs(query).items():
if key in kwargs:
if isinstance(kwargs[key], six.string_types):
kwargs[key] = [kwargs[key]]
kwargs[key].extend(values)
else:
kwargs[key] = values
if part.query:
for key, values in parse_qs(part.query).items():
if key in kwargs:
if isinstance(kwargs[key], six.string_types):
kwargs[key] = [kwargs[key]]
kwargs[key].extend(values)
else:
kwargs[key] = values
_pre = base.split("?")[0]
else:
_pre = base
logger.debug("kwargs: %s" % kwargs)
return "%s?%s" % (_pre, url_encode_params(kwargs))
def _fetch_crl(self, config, url, out, fmt):
# type: (ConfigParser, str, str, str) -> bool
updated = False
url_hash = sha1(url.encode('utf-8')).hexdigest()
headers = {} # type: Dict[str, str]
try:
etag = config.get(CONFIG_SECTION, url_hash)
except NoOptionError:
pass
else:
headers = {'If-None-Match': etag}
response = requests.get(url, headers=headers)
if response.status_code == 200:
crl_name = os.path.basename(urlsplit(url).path)
crl_name, content = self._format_crl(crl_name, response.content, fmt)
crl_path = os.path.join(out, crl_name)
with open(crl_path, 'wb') as f:
f.write(content)
print(crl_path, file=self.stdout)
updated = True
if 'ETag' in response.headers:
config.set(CONFIG_SECTION, url_hash, response.headers['ETag'])
elif response.status_code == 304:
pass
else:
print("Error {} downloading {}: {}".format(
response.status_code, url, response.content
), file=self.stderr)
return updated
def get_querystring(uri):
parts = urlparse.urlsplit(uri)
if sys.version_info[:2] == (2, 6):
query = parts.path
if query.startswith('?'):
query = query[1:]
else:
query = parts.query
return urlparse.parse_qs(query)
def _update_link_prefix(self, orig_url, prefix):
if not prefix:
return orig_url
url_parts = list(urlparse.urlsplit(orig_url))
prefix_parts = list(urlparse.urlsplit(prefix))
url_parts[0:2] = prefix_parts[0:2]
url_parts[2] = prefix_parts[2] + url_parts[2]
return urlparse.urlunsplit(url_parts).rstrip('/')
def get_path(url):
p = urlsplit(url)
return urlunsplit(['', '', p.path or '/', p.query, p.fragment])
def get_session(domain_or_url):
"""
???????? keep-alive ?session
:param domain_or_url: ??
:type domain_or_url: str
:rtype: requests.Session
"""
domain = urllib_parse.urlsplit(domain_or_url).netloc or domain_or_url
if domain not in pool:
pool[domain] = []
if not hasattr(locked_session, "sessdicts"):
# ????????????????session
# ???session???????, ?? pool ????, ??????????
# ??????, ???? release_lock() ???????session
# ??????session?????session?
locked_session.sessdicts = []
if not pool[domain]:
# ????, ???? session
sessdict = {
"domain": domain,
"sessobj": requests.Session(),
}
else:
# ????????????
sessdict = pool[domain].pop()
sessdict["active"] = time.time()
locked_session.sessdicts.append(sessdict)
if _gc_checkpoint < time.time() - SESSION_TTL:
with cleaning_lock:
clear()
return sessdict["sessobj"] # type: requests.Session
def get_license_from_url(url):
"""Get the license abbreviation from an URL.
Args:
url(str): canonical url of the license.
Returns:
str: the corresponding license abbreviation.
Raises:
ValueError: when the url is not recognized
"""
if not url:
return
split_url = urlsplit(url, scheme='http')
if split_url.netloc.lower() == 'creativecommons.org':
license = ['CC']
match = _RE_LICENSE_URL.match(split_url.path)
license.extend(part.upper() for part in match.groups() if part)
elif split_url.netloc == 'arxiv.org':
license = ['arXiv']
match = _RE_LICENSE_URL.match(split_url.path)
license.extend(part for part in match.groups() if part)
else:
raise ValueError('Unknown license URL')
return u' '.join(license)
def get_querystring(uri):
"""Get Qeruystring information from uri.
:param uri: uri
:return: querystring info or {}
"""
parts = urlparse.urlsplit(uri)
if sys.version_info[:2] == (2, 6):
query = parts.path
if query.startswith('?'):
query = query[1:]
else:
query = parts.query
return urlparse.parse_qs(query)
def __init__(self, repouri):
"""Initialize a RepoStats object. Pass a TransportRepoURI
object in repouri to configure an object for a particular
repository URI."""
self.__url = repouri.uri.rstrip("/")
self.__scheme = urlsplit(self.__url)[0]
self.__priority = repouri.priority
self.__proxy = repouri.proxy
self.__system = repouri.system
self._err_decay = 0
self.__failed_tx = 0
self.__content_err = 0
self.__decayable_err = 0
self.__timeout_err = 0
self.__total_tx = 0
self.__consecutive_errors = 0
self.__connections = 0
self.__connect_time = 0.0
self.__used = False
self.__bytes_xfr = 0.0
self.__seconds_xfr = 0.0
self.origin_speed = 0.0
self.origin_cspeed = 0.0
self.origin_count = 1
self.origin_factor = 1
self.origin_decay = 1
def __str__(self):
illegals = []
for u in self.uris:
assert isinstance(u, six.string_types)
scheme = urlsplit(u,
allow_fragments=0)[0]
illegals.append((u, scheme))
if len(illegals) > 1:
msg = _("The follwing URIs use unsupported "
"schemes. Supported schemes are "
"file://, http://, and https://.")
for i, s in illegals:
msg += _("\n {uri} (scheme: "
"{scheme})").format(uri=i, scheme=s)
return msg
elif len(illegals) == 1:
i, s = illegals[0]
return _("The URI '{uri}' uses the unsupported "
"scheme '{scheme}'. Supported schemes are "
"file://, http://, and https://.").format(
uri=i, scheme=s)
return _("The specified URI uses an unsupported scheme."
" Supported schemes are: file://, http://, and "
"https://.")
def __str__(self):
if self.data:
scheme = urlsplit(self.data,
allow_fragments=0)[0]
return _("The proxy URI '{uri}' uses the unsupported "
"scheme '{scheme}'. Currently the only supported "
"scheme is http://.").format(
uri=self.data, scheme=scheme)
return _("The specified proxy URI uses an unsupported scheme."
" Currently the only supported scheme is: http://.")
def valid_pub_url(url, proxy=False):
"""Verify that the publisher URL contains only valid characters.
If 'proxy' is set to True, some checks are relaxed."""
if not url:
return False
# First split the URL and check if the scheme is one we support
o = urlsplit(url)
if not o[0] in _valid_proto:
return False
if o[0] == "file":
path = urlparse(url, "file", allow_fragments=0)[2]
path = url2pathname(path)
if not os.path.abspath(path):
return False
# No further validation to be done.
return True
# Next verify that the network location is valid
if six.PY3:
host = urllib.parse.splitport(o[1])[0]
else:
host = urllib.splitport(o[1])[0]
if proxy:
# We may have authentication details in the proxy URI, which
# we must ignore when checking for hostname validity.
host_parts = host.split("@")
if len(host_parts) == 2:
host = host[1]
if not host or _invalid_host_chars.match(host):
return False
if _hostname_re.match(host):
return True
return False
def get_proxy_slot(self, proxy):
"""
Return downloader slot for a proxy.
By default it doesn't take port in account, i.e. all proxies with
the same hostname / ip address share the same slot.
"""
# FIXME: an option to use website address as a part of slot as well?
return urlsplit(proxy).hostname
def convert_env(self):
full_uri = self.environ['REQUEST_URI']
parts = urlsplit(full_uri)
self.resolve(full_uri, self.environ, parts.netloc.split(':')[0])
for header in list(self.environ.keys()):
if header in self.FILTER_REQ_HEADERS:
self.environ.pop(header, '')
def get_id_from_href(href):
"""Return the id or uuid portion of a url.
Given: 'http://www.foo.com/bar/123?q=4'
Returns: '123'
Given: 'http://www.foo.com/bar/abc123?q=4'
Returns: 'abc123'
"""
return urlparse.urlsplit("%s" % href).path.split('/')[-1]
def _update_link_prefix(self, orig_url, prefix):
if not prefix:
return orig_url
url_parts = list(urlparse.urlsplit(orig_url))
prefix_parts = list(urlparse.urlsplit(prefix))
url_parts[0:2] = prefix_parts[0:2]
url_parts[2] = prefix_parts[2] + url_parts[2]
return urlparse.urlunsplit(url_parts).rstrip('/')