python类urlsplit()的实例源码

netutils.py 文件源码 项目:deb-oslo.utils 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def urlsplit(url, scheme='', allow_fragments=True):
    """Parse a URL using urlparse.urlsplit(), splitting query and fragments.
    This function papers over Python issue9374_ when needed.

    .. _issue9374: http://bugs.python.org/issue9374

    The parameters are the same as urlparse.urlsplit.
    """
    scheme, netloc, path, query, fragment = parse.urlsplit(
        url, scheme, allow_fragments)
    if allow_fragments and '#' in path:
        path, fragment = path.split('#', 1)
    if '?' in path:
        path, query = path.split('?', 1)
    return _ModifiedSplitResult(scheme, netloc,
                                path, query, fragment)
client.py 文件源码 项目:eclcli 作者: nttcom 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _get_session(self, url):
        if self._connection_pool:
            magic_tuple = parse.urlsplit(url)
            scheme, netloc, path, query, frag = magic_tuple
            service_url = '%s://%s' % (scheme, netloc)
            if self._current_url != service_url:
                # Invalidate Session object in case the url is somehow changed
                if self._session:
                    self._session.close()
                self._current_url = service_url
                self._logger.debug(
                    "New session created for: (%s)" % service_url)
                self._session = requests.Session()
                self._session.mount(service_url,
                                    self._connection_pool.get(service_url))
            return self._session
        elif self._session:
            return self._session

    # @set_headers_param
client.py 文件源码 项目:eclcli 作者: nttcom 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _get_session(self, url):
        if self._connection_pool:
            magic_tuple = parse.urlsplit(url)
            scheme, netloc, path, query, frag = magic_tuple
            service_url = '%s://%s' % (scheme, netloc)
            if self._current_url != service_url:
                # Invalidate Session object in case the url is somehow changed
                if self._session:
                    self._session.close()
                self._current_url = service_url
                self._logger.debug(
                    "New session created for: (%s)" % service_url)
                self._session = requests.Session()
                self._session.mount(service_url,
                                    self._connection_pool.get(service_url))
            return self._session
        elif self._session:
            return self._session
client.py 文件源码 项目:eclcli 作者: nttcom 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _get_session(self, url):
        if self._connection_pool:
            magic_tuple = parse.urlsplit(url)
            scheme, netloc, path, query, frag = magic_tuple
            service_url = '%s://%s' % (scheme, netloc)
            if self._current_url != service_url:
                # Invalidate Session object in case the url is somehow changed
                if self._session:
                    self._session.close()
                self._current_url = service_url
                self._logger.debug(
                    "New session created for: (%s)" % service_url)
                self._session = requests.Session()
                self._session.mount(service_url,
                                    self._connection_pool.get(service_url))
            return self._session
        elif self._session:
            return self._session
test_gabbi_live.py 文件源码 项目:gnocchi 作者: gnocchixyz 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def load_tests(loader, tests, pattern):
    """Provide a TestSuite to the discovery process."""
    gnocchi_url = os.getenv('GNOCCHI_ENDPOINT')
    if gnocchi_url:
        parsed_url = urlparse.urlsplit(gnocchi_url)
        prefix = parsed_url.path.rstrip('/')  # turn it into a prefix

        # NOTE(chdent): gabbi requires a port be passed or it will
        # default to 8001, so we must dance a little dance to get
        # the right ports. Probably gabbi needs to change.
        # https://github.com/cdent/gabbi/issues/50
        port = 443 if parsed_url.scheme == 'https' else 80
        if parsed_url.port:
            port = parsed_url.port

        test_dir = os.path.join(os.path.dirname(__file__), TESTS_DIR)
        return driver.build_tests(test_dir, loader,
                                  host=parsed_url.hostname,
                                  port=port,
                                  prefix=prefix)
    elif os.getenv("GABBI_LIVE"):
        raise RuntimeError('"GNOCCHI_ENDPOINT" is not set')
utils.py 文件源码 项目:edx-enterprise 作者: edx 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def update_query_parameters(url, query_parameters):
    """
    Return url with updated query parameters.

    Arguments:
        url (str): Original url whose query parameters need to be updated.
        query_parameters (dict): A dictionary containing query parameters to be added to course selection url.

    Returns:
        (slug): slug identifier for the identity provider that can be used for identity verification of
            users associated the enterprise customer of the given user.

    """
    scheme, netloc, path, query_string, fragment = urlsplit(url)
    url_params = parse_qs(query_string)

    # Update url query parameters
    url_params.update(query_parameters)

    return urlunsplit(
        (scheme, netloc, path, urlencode(url_params, doseq=True), fragment),
    )
common.py 文件源码 项目:masakari 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def remove_trailing_version_from_href(href):
    """Removes the api version from the href.

    Given: 'http://www.masakari.com/ha/v1.1'
    Returns: 'http://www.masakari.com/ha'

    Given: 'http://www.masakari.com/v1.1'
    Returns: 'http://www.masakari.com'

    """
    parsed_url = urlparse.urlsplit(href)
    url_parts = parsed_url.path.rsplit('/', 1)

    # NOTE: this should match vX.X or vX
    expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)')
    if not expression.match(url_parts.pop()):
        LOG.debug('href %s does not contain version', href)
        raise ValueError(_('href %s does not contain version') % href)

    new_path = url_join(*url_parts)
    parsed_url = list(parsed_url)
    parsed_url[2] = new_path
    return urlparse.urlunsplit(parsed_url)
interceptor.py 文件源码 项目:deb-python-wsgi-intercept 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _init_from_url(self, url):
        port = None
        parsed_url = urlparse.urlsplit(url)
        if ':' in parsed_url.netloc:
            host, port = parsed_url.netloc.split(':')
        else:
            host = parsed_url.netloc
        if not port:
            if parsed_url.scheme == 'https':
                port = 443
            else:
                port = 80
        path = parsed_url.path
        if path == '/' or not path:
            self.script_name = ''
        else:
            self.script_name = path
        self.host = host
        self.port = int(port)
wsgiprox.py 文件源码 项目:wsgiprox 作者: webrecorder 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def resolve(self, url, env, hostname):
        if hostname in self.proxy_apps.keys():
            parts = urlsplit(url)
            full = parts.path
            if parts.query:
                full += '?' + parts.query

            env['REQUEST_URI'] = full
            env['wsgiprox.matched_proxy_host'] = hostname
            env['wsgiprox.proxy_host'] = hostname
        else:
            env['REQUEST_URI'] = self.prefix_resolver(url, env)
            env['wsgiprox.proxy_host'] = self.proxy_host

        queryparts = env['REQUEST_URI'].split('?', 1)

        env['PATH_INFO'] = queryparts[0]

        env['QUERY_STRING'] = queryparts[1] if len(queryparts) > 1 else ''
common.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def remove_trailing_version_from_href(href):
    """Removes the api version from the href.

    Given: 'http://www.nova.com/compute/v1.1'
    Returns: 'http://www.nova.com/compute'

    Given: 'http://www.nova.com/v1.1'
    Returns: 'http://www.nova.com'

    """
    parsed_url = urlparse.urlsplit(href)
    url_parts = parsed_url.path.rsplit('/', 1)

    # NOTE: this should match vX.X or vX
    expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)')
    if not expression.match(url_parts.pop()):
        LOG.debug('href %s does not contain version', href)
        raise ValueError(_('href %s does not contain version') % href)

    new_path = url_join(*url_parts)
    parsed_url = list(parsed_url)
    parsed_url[2] = new_path
    return urlparse.urlunsplit(parsed_url)
plane.py 文件源码 项目:caom2tools 作者: opencadc 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_plane_uri(cls, observation_uri, product_id):
        """
        Initializes an Plane URI instance

        Arguments:
        observation_uri : the uri of the observation
        product_id : ID of the product
        """
        caom_util.type_check(observation_uri, ObservationURI,
                             "observation_uri",
                             override=False)
        caom_util.type_check(product_id, str, "observation_uri",
                             override=False)
        caom_util.validate_path_component(cls, "product_id", product_id)

        path = urlsplit(observation_uri.uri).path
        uri = SplitResult(ObservationURI._SCHEME, "", path + "/" +
                          product_id, "", "").geturl()
        return cls(uri)

    # Properties
plane.py 文件源码 项目:caom2tools 作者: opencadc 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def uri(self, value):

        caom_util.type_check(value, str, "uri", override=False)
        tmp = urlsplit(value)

        if tmp.scheme != ObservationURI._SCHEME:
            raise ValueError("{} doesn't have an allowed scheme".format(value))
        if tmp.geturl() != value:
            raise ValueError("Failed to parse uri correctly: {}".format(value))

        (collection, observation_id, product_id) = tmp.path.split("/")

        if product_id is None:
            raise ValueError("Faield to get product ID from uri: {}"
                             .format(value))

        self._product_id = product_id
        self._observation_uri = \
            ObservationURI.get_observation_uri(collection, observation_id)
        self._uri = value
session.py 文件源码 项目:icrawler 作者: hellock 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _url_scheme(self, url):
        return urlsplit(url).scheme
mappings.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def stack_output(output):
    if not output:
        return u''
    if isinstance(output, six.string_types):
        parts = urlparse.urlsplit(output)
        if parts.netloc and parts.scheme in ('http', 'https'):
            url = html.escape(output)
            safe_link = u'<a href="%s" target="_blank">%s</a>' % (url, url)
            return safestring.mark_safe(safe_link)
    if isinstance(output, dict) or isinstance(output, list):
        output = json.dumps(output, indent=2)
    return safestring.mark_safe(u'<pre>%s</pre>' % html.escape(output))
common.py 文件源码 项目:meteos 作者: openstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def remove_version_from_href(href):
    """Removes the first api version from the href.

    Given: 'http://www.meteos.com/v1.1/123'
    Returns: 'http://www.meteos.com/123'

    Given: 'http://www.meteos.com/v1.1'
    Returns: 'http://www.meteos.com'

    """
    parsed_url = parse.urlsplit(href)
    url_parts = parsed_url.path.split('/', 2)

    # NOTE: this should match vX.X or vX
    expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)')
    if expression.match(url_parts[1]):
        del url_parts[1]

    new_path = '/'.join(url_parts)

    if new_path == parsed_url.path:
        msg = 'href %s does not contain version' % href
        LOG.debug(msg)
        raise ValueError(msg)

    parsed_url = list(parsed_url)
    parsed_url[2] = new_path
    return parse.urlunsplit(parsed_url)
common.py 文件源码 项目:meteos 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _update_link_prefix(self, orig_url, prefix):
        if not prefix:
            return orig_url
        url_parts = list(parse.urlsplit(orig_url))
        prefix_parts = list(parse.urlsplit(prefix))
        url_parts[0:2] = prefix_parts[0:2]
        return parse.urlunsplit(url_parts)
validators.py 文件源码 项目:callisto-core 作者: project-callisto 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _get_url_parts(url):
    url = _clean_url(url)
    return urlsplit(url)
jinja_utils.py 文件源码 项目:fuel-ccp 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_host(path):
    return urlparse.urlsplit(path).netloc
tornado_fetcher.py 文件源码 项目:Url 作者: beiruan 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def can_fetch(self, user_agent, url):
        parsed = urlsplit(url)
        domain = parsed.netloc
        if domain in self.robots_txt_cache:
            robot_txt = self.robots_txt_cache[domain]
            if time.time() - robot_txt.mtime() > self.robot_txt_age:
                robot_txt = None
        else:
            robot_txt = None

        if robot_txt is None:
            robot_txt = RobotFileParser()
            try:
                response = yield gen.maybe_future(self.http_client.fetch(
                    urljoin(url, '/robots.txt'), connect_timeout=10, request_timeout=30))
                content = response.body
            except tornado.httpclient.HTTPError as e:
                logger.error('load robots.txt from %s error: %r', domain, e)
                content = ''

            try:
                content = content.decode('utf8', 'ignore')
            except UnicodeDecodeError:
                content = ''

            robot_txt.parse(content.splitlines())
            self.robots_txt_cache[domain] = robot_txt

        raise gen.Return(robot_txt.can_fetch(user_agent, url))
__init__.py 文件源码 项目:frontera-google-docker 作者: casertap 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def _get_domain_bucket(self, url):
        parsed = urlparse.urlsplit(url)
        hostname, _, _ = parsed.netloc.partition(':')
        return self.domain_cache.setdefault(hostname, {})
utils.py 文件源码 项目:scrapy-cdr 作者: TeamHG-Memex 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_path(url):
    p = urlsplit(url)
    return urlunsplit(['', '', p.path or '/', p.query, p.fragment])
es_upload.py 文件源码 项目:scrapy-cdr 作者: TeamHG-Memex 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _reverse_domain_storage(item, media_root):
    for obj in item.get('objects', []):
        stored_url = obj['obj_stored_url']
        assert '/' not in stored_url
        domain = urlsplit(obj['obj_original_url']).netloc
        if ':' in domain:
            domain, _ = domain.split(':', 1)
        parents = [p for p in reversed(domain.split('.')) if p]
        os.makedirs(os.path.join(media_root, *parents), exist_ok=True)
        stored_url_noext, _ = os.path.splitext(stored_url)
        new_stored_url = os.path.sep.join(parents + [stored_url_noext])
        dest = os.path.join(media_root, new_stored_url)
        if not os.path.exists(dest):
            shutil.copy(os.path.join(media_root, stored_url), dest)
        obj['obj_stored_url'] = new_stored_url
downloader.py 文件源码 项目:aws-vapor 作者: ohtomi 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def take_action(self, args):
        """Download a recipe from remote URL and save it to a local file under contrib directory.

        Args:
            args (:obj:`dict`): Parsed command line arguments.
                "url" is an URL where a recipe will be downloaded from.
        """
        file_url = args.url
        filename = parse.urlsplit(file_url).path.split('/')[-1:][0]
        contrib = utils.get_property_from_config_file('defaults', 'contrib')
        self._download_recipe(file_url, filename, contrib)
__init__.py 文件源码 项目:cti-taxii-client 作者: oasis-open 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def canonicalize_url(api_root_url):
    api_root_url = urlparse.urlsplit(api_root_url).geturl()
    if not api_root_url.endswith("/"):
        api_root_url += "/"
    return api_root_url
opener.py 文件源码 项目:targets-python 作者: targets-fs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def open(self, target_uri, **kwargs):
        """Open target uri.

        :param target_uri: Uri to open
        :type target_uri: string

        :returns: Target object

        """
        target = urlsplit(target_uri, scheme=self.default_opener)

        opener = self.get_opener(target.scheme)
        query = opener.conform_query(target.query)

        target = opener.get_target(
            target.scheme,
            target.path,
            target.fragment,
            target.username,
            target.password,
            target.hostname,
            target.port,
            query,
            **kwargs
        )
        target.opener_path = target_uri

        return target
fw.py 文件源码 项目:reahl 作者: reahl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, url_string):
        split_url = urllib_parse.urlsplit(url_string)
        self.scheme = split_url.scheme     #:
        self.username = split_url.username #:
        self.password = split_url.password #:
        self.hostname = split_url.hostname #:
        self.port = split_url.port         #:
        self.path = split_url.path         #:
        self.query = split_url.query       #:
        self.fragment = split_url.fragment #:
__init__.py 文件源码 项目:gnocchi 作者: gnocchixyz 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_driver(conf):
    """Return the configured driver."""
    split = parse.urlsplit(conf.indexer.url)
    d = driver.DriverManager('gnocchi.indexer',
                             split.scheme).driver
    return d(conf)
simple_wsgi.py 文件源码 项目:deb-python-gabbi 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _fully_qualify(environ, url):
        """Turn a URL path into a fully qualified URL."""
        split_url = urlparse.urlsplit(url)
        server_name = environ.get('SERVER_NAME')
        server_port = str(environ.get('SERVER_PORT'))
        server_scheme = environ.get('wsgi.url_scheme')
        if server_port not in ['80', '443']:
            netloc = '%s:%s' % (server_name, server_port)
        else:
            netloc = server_name

        return urlparse.urlunsplit((server_scheme, netloc, split_url.path,
                                    split_url.query, split_url.fragment))
case.py 文件源码 项目:deb-python-gabbi 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _parse_url(self, url):
        """Create a url from test data.

        If provided with a full URL, just return that. If SSL is requested
        set the scheme appropriately.

        Scheme and netloc are saved for later use in comparisons.
        """
        query_params = self.test_data['query_parameters']
        ssl = self.test_data['ssl']

        parsed_url = urlparse.urlsplit(url)
        if not parsed_url.scheme:
            full_url = utils.create_url(url, self.host, port=self.port,
                                        prefix=self.prefix, ssl=ssl)
            # parse again to set updated netloc and scheme
            parsed_url = urlparse.urlsplit(full_url)

        self.scheme = parsed_url.scheme
        self.netloc = parsed_url.netloc

        if query_params:
            query_string = self._update_query_params(parsed_url.query,
                                                     query_params)
        else:
            query_string = parsed_url.query

        return urlparse.urlunsplit((parsed_url.scheme, parsed_url.netloc,
                                    parsed_url.path, query_string, ''))
utils.py 文件源码 项目:deb-python-gabbi 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def create_url(base_url, host, port=None, prefix='', ssl=False):
    """Given pieces of a path-based url, return a fully qualified url."""
    scheme = 'http'

    # A host with : in it at this stage is assumed to be an IPv6
    # address of some kind (they come in many forms). Port should
    # already have been stripped off.
    if ':' in host and not (host.startswith('[') and host.endswith(']')):
        host = '[%s]' % host

    if port and not _port_follows_standard(port, ssl):
        netloc = '%s:%s' % (host, port)
    else:
        netloc = host

    if ssl:
        scheme = 'https'

    parsed_url = urlparse.urlsplit(base_url)
    query_string = parsed_url.query
    path = parsed_url.path

    # Guard against a prefix of None or the url already having the
    # prefix. Without the startswith check, the tests in prefix.yaml
    # fail. This is a pragmatic fix which does this for any URL in a
    # test request that does not have a scheme and does not
    # distinguish between URLs in a gabbi test file and those
    # generated by the server. Idealy we would not mutate nor need
    # to check URLs returned from the server. Doing that, however,
    # would require more complex data handling than we have now and
    # this covers most common cases and will be okay until someone
    # reports a bug.
    if prefix and not path.startswith(prefix):
        prefix = prefix.rstrip('/')
        path = path.lstrip('/')
        path = '%s/%s' % (prefix, path)

    return urlparse.urlunsplit((scheme, netloc, path, query_string, ''))


问题


面经


文章

微信
公众号

扫码关注公众号