python类urlsplit()的实例源码

sync_page.py 文件源码 项目:contentful.py 作者: contentful 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def _get_sync_token(self):
        url_parts = urlsplit(self.next_sync_url or self.next_page_url)
        querystring = parse_qs(url_parts.query)
        return querystring['sync_token'][0]
test_integration.py 文件源码 项目:autologin-middleware 作者: TeamHG-Memex 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def parse(self, response):
        self.responses.append(response)
        p = urlsplit(response.url)
        self.visited_urls.append(
            urlunsplit(['', '', p.path, p.query, p.fragment]) or '/')
        urls = {link.url for link in
                self.link_extractor.extract_links(response)
                if not self._looks_like_logout(link, response)}
        for url in urls:
            yield self.make_request(url)
test_integration.py 文件源码 项目:autologin-middleware 作者: TeamHG-Memex 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_login(settings, extra_settings=None):
    """ No logout links, just one page after login.
    """
    crawler = make_crawler(settings, **AL_SETTINGS)
    with MockServer(Login) as s:
        yield crawler.crawl(url=s.root_url)
    spider = crawler.spider
    assert len(spider.visited_urls) == 2
    assert set(spider.visited_urls) == {'/', '/hidden'}
    response = spider.responses[0]
    assert urlsplit(response.url).path.rstrip('/') == ''
    assert response.meta['autologin_active']
    assert response.meta['autologin_response']['status'] == 'solved'
test_integration.py 文件源码 项目:autologin-middleware 作者: TeamHG-Memex 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_login_error(settings, extra_settings=None):
    """ Trying to login with wrong credentials
    """
    al_settings = dict(AL_SETTINGS)
    al_settings['AUTOLOGIN_PASSWORD'] = 'wrong'
    crawler = make_crawler(settings, **al_settings)
    with MockServer(Login) as s:
        yield crawler.crawl(url=s.root_url)
    spider = crawler.spider
    assert len(spider.visited_urls) == 2
    assert set(spider.visited_urls) == {'/', '/login'}
    response = spider.responses[0]
    assert urlsplit(response.url).path.rstrip('/') == ''
    assert not response.meta['autologin_active']
    assert response.meta['autologin_response']['status'] == 'error'
authn.py 文件源码 项目:deb-python-pysaml2 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def create_return_url(base, query, **kwargs):
    """
    Add a query string plus extra parameters to a base URL which may contain
    a query part already.

    :param base: redirect_uri may contain a query part, no fragment allowed.
    :param query: Old query part as a string
    :param kwargs: extra query parameters
    :return:
    """
    part = urlsplit(base)
    if part.fragment:
        raise ValueError("Base URL contained parts it shouldn't")

    for key, values in parse_qs(query).items():
        if key in kwargs:
            if isinstance(kwargs[key], six.string_types):
                kwargs[key] = [kwargs[key]]
            kwargs[key].extend(values)
        else:
            kwargs[key] = values

    if part.query:
        for key, values in parse_qs(part.query).items():
            if key in kwargs:
                if isinstance(kwargs[key], six.string_types):
                    kwargs[key] = [kwargs[key]]
                kwargs[key].extend(values)
            else:
                kwargs[key] = values

        _pre = base.split("?")[0]
    else:
        _pre = base

    logger.debug("kwargs: %s" % kwargs)

    return "%s?%s" % (_pre, url_encode_params(kwargs))
crls.py 文件源码 项目:cloak-server 作者: encryptme 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _fetch_crl(self, config, url, out, fmt):
        # type: (ConfigParser, str, str, str) -> bool
        updated = False

        url_hash = sha1(url.encode('utf-8')).hexdigest()
        headers = {}  # type: Dict[str, str]

        try:
            etag = config.get(CONFIG_SECTION, url_hash)
        except NoOptionError:
            pass
        else:
            headers = {'If-None-Match': etag}

        response = requests.get(url, headers=headers)

        if response.status_code == 200:
            crl_name = os.path.basename(urlsplit(url).path)
            crl_name, content = self._format_crl(crl_name, response.content, fmt)
            crl_path = os.path.join(out, crl_name)
            with open(crl_path, 'wb') as f:
                f.write(content)
            print(crl_path, file=self.stdout)
            updated = True

            if 'ETag' in response.headers:
                config.set(CONFIG_SECTION, url_hash, response.headers['ETag'])
        elif response.status_code == 304:
            pass
        else:
            print("Error {} downloading {}: {}".format(
                response.status_code, url, response.content
            ), file=self.stderr)

        return updated
_compat.py 文件源码 项目:oejia_wx 作者: JoneXiong 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_querystring(uri):
    parts = urlparse.urlsplit(uri)
    if sys.version_info[:2] == (2, 6):
        query = parts.path
        if query.startswith('?'):
            query = query[1:]
    else:
        query = parts.query
    return urlparse.parse_qs(query)
common.py 文件源码 项目:masakari 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _update_link_prefix(self, orig_url, prefix):
        if not prefix:
            return orig_url
        url_parts = list(urlparse.urlsplit(orig_url))
        prefix_parts = list(urlparse.urlsplit(prefix))
        url_parts[0:2] = prefix_parts[0:2]
        url_parts[2] = prefix_parts[2] + url_parts[2]
        return urlparse.urlunsplit(url_parts).rstrip('/')
utils.py 文件源码 项目:domain-discovery-crawler 作者: TeamHG-Memex 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_path(url):
    p = urlsplit(url)
    return urlunsplit(['', '', p.path or '/', p.query, p.fragment])
requests_connection_pool.py 文件源码 项目:my_utils 作者: aploium 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get_session(domain_or_url):
    """
    ???????? keep-alive ?session
    :param domain_or_url: ??
    :type domain_or_url: str
    :rtype: requests.Session
    """
    domain = urllib_parse.urlsplit(domain_or_url).netloc or domain_or_url

    if domain not in pool:
        pool[domain] = []

    if not hasattr(locked_session, "sessdicts"):
        # ????????????????session
        # ???session???????, ?? pool ????, ??????????
        # ??????, ???? release_lock() ???????session
        #    ??????session?????session?
        locked_session.sessdicts = []

    if not pool[domain]:
        # ????, ???? session
        sessdict = {
            "domain": domain,
            "sessobj": requests.Session(),
        }
    else:
        # ????????????
        sessdict = pool[domain].pop()

    sessdict["active"] = time.time()

    locked_session.sessdicts.append(sessdict)

    if _gc_checkpoint < time.time() - SESSION_TTL:
        with cleaning_lock:
            clear()

    return sessdict["sessobj"]  # type: requests.Session
utils.py 文件源码 项目:inspire-schemas 作者: inspirehep 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_license_from_url(url):
    """Get the license abbreviation from an URL.

    Args:
        url(str): canonical url of the license.

    Returns:
        str: the corresponding license abbreviation.

    Raises:
        ValueError: when the url is not recognized
    """
    if not url:
        return

    split_url = urlsplit(url, scheme='http')

    if split_url.netloc.lower() == 'creativecommons.org':
        license = ['CC']
        match = _RE_LICENSE_URL.match(split_url.path)
        license.extend(part.upper() for part in match.groups() if part)
    elif split_url.netloc == 'arxiv.org':
        license = ['arXiv']
        match = _RE_LICENSE_URL.match(split_url.path)
        license.extend(part for part in match.groups() if part)
    else:
        raise ValueError('Unknown license URL')

    return u' '.join(license)
utils.py 文件源码 项目:TornadoWeb 作者: VxCoder 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_querystring(uri):
    """Get Qeruystring information from uri.

    :param uri: uri
    :return: querystring info or {}
    """
    parts = urlparse.urlsplit(uri)
    if sys.version_info[:2] == (2, 6):
        query = parts.path
        if query.startswith('?'):
            query = query[1:]
    else:
        query = parts.query
    return urlparse.parse_qs(query)
stats.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, repouri):
                """Initialize a RepoStats object.  Pass a TransportRepoURI
                object in repouri to configure an object for a particular
                repository URI."""

                self.__url = repouri.uri.rstrip("/")
                self.__scheme = urlsplit(self.__url)[0]
                self.__priority = repouri.priority

                self.__proxy = repouri.proxy
                self.__system = repouri.system

                self._err_decay = 0
                self.__failed_tx = 0
                self.__content_err = 0
                self.__decayable_err = 0
                self.__timeout_err = 0
                self.__total_tx = 0
                self.__consecutive_errors = 0

                self.__connections = 0
                self.__connect_time = 0.0

                self.__used = False

                self.__bytes_xfr = 0.0
                self.__seconds_xfr = 0.0
                self.origin_speed = 0.0
                self.origin_cspeed = 0.0
                self.origin_count = 1
                self.origin_factor = 1
                self.origin_decay = 1
api_errors.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __str__(self):
                illegals = []

                for u in self.uris:
                        assert isinstance(u, six.string_types)
                        scheme = urlsplit(u,
                            allow_fragments=0)[0]
                        illegals.append((u, scheme))

                if len(illegals) > 1:
                        msg = _("The follwing URIs use unsupported "
                            "schemes.  Supported schemes are "
                            "file://, http://, and https://.")
                        for i, s in illegals:
                                msg += _("\n  {uri} (scheme: "
                                    "{scheme})").format(uri=i, scheme=s)
                        return msg
                elif len(illegals) == 1:
                        i, s = illegals[0]
                        return _("The URI '{uri}' uses the unsupported "
                            "scheme '{scheme}'.  Supported schemes are "
                            "file://, http://, and https://.").format(
                            uri=i, scheme=s)
                return _("The specified URI uses an unsupported scheme."
                    "  Supported schemes are: file://, http://, and "
                    "https://.")
api_errors.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __str__(self):
                if self.data:
                        scheme = urlsplit(self.data,
                            allow_fragments=0)[0]
                        return _("The proxy URI '{uri}' uses the unsupported "
                            "scheme '{scheme}'. Currently the only supported "
                            "scheme is http://.").format(
                            uri=self.data, scheme=scheme)
                return _("The specified proxy URI uses an unsupported scheme."
                    " Currently the only supported scheme is: http://.")
misc.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def valid_pub_url(url, proxy=False):
        """Verify that the publisher URL contains only valid characters.
        If 'proxy' is set to True, some checks are relaxed."""

        if not url:
                return False

        # First split the URL and check if the scheme is one we support
        o = urlsplit(url)

        if not o[0] in _valid_proto:
                return False

        if o[0] == "file":
                path = urlparse(url, "file", allow_fragments=0)[2]
                path = url2pathname(path)
                if not os.path.abspath(path):
                        return False
                # No further validation to be done.
                return True

        # Next verify that the network location is valid
        if six.PY3:
                host = urllib.parse.splitport(o[1])[0]
        else:
                host = urllib.splitport(o[1])[0]

        if proxy:
                # We may have authentication details in the proxy URI, which
                # we must ignore when checking for hostname validity.
                host_parts = host.split("@")
                if len(host_parts) == 2:
                        host = host[1]

        if not host or _invalid_host_chars.match(host):
                return False

        if _hostname_re.match(host):
                return True

        return False
middlewares.py 文件源码 项目:scrapy-rotating-proxies 作者: TeamHG-Memex 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_proxy_slot(self, proxy):
        """
        Return downloader slot for a proxy.
        By default it doesn't take port in account, i.e. all proxies with
        the same hostname / ip address share the same slot.
        """
        # FIXME: an option to use website address as a part of slot as well?
        return urlsplit(proxy).hostname
wsgiprox.py 文件源码 项目:wsgiprox 作者: webrecorder 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def convert_env(self):
        full_uri = self.environ['REQUEST_URI']

        parts = urlsplit(full_uri)

        self.resolve(full_uri, self.environ, parts.netloc.split(':')[0])

        for header in list(self.environ.keys()):
            if header in self.FILTER_REQ_HEADERS:
                self.environ.pop(header, '')
common.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def get_id_from_href(href):
    """Return the id or uuid portion of a url.

    Given: 'http://www.foo.com/bar/123?q=4'
    Returns: '123'

    Given: 'http://www.foo.com/bar/abc123?q=4'
    Returns: 'abc123'

    """
    return urlparse.urlsplit("%s" % href).path.split('/')[-1]
common.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def _update_link_prefix(self, orig_url, prefix):
        if not prefix:
            return orig_url
        url_parts = list(urlparse.urlsplit(orig_url))
        prefix_parts = list(urlparse.urlsplit(prefix))
        url_parts[0:2] = prefix_parts[0:2]
        url_parts[2] = prefix_parts[2] + url_parts[2]
        return urlparse.urlunsplit(url_parts).rstrip('/')


问题


面经


文章

微信
公众号

扫码关注公众号