python类urlparse()的实例源码

image_service.py 文件源码 项目:bareon-ironic 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def validate_href(self, image_href):
        path = urlparse.urlparse(image_href).path.lstrip('/')
        if not path:
            raise exception.InvalidParameterValue(
                _("No path specified in rsync resource reference: %s. "
                  "Reference must be like rsync:host::module/path")
                % str(image_href))
        try:
            stdout, stderr = utils.execute(
                'rsync', '--stats', '--dry-run',
                path,
                ".",
                check_exit_code=[0],
                log_errors=processutils.LOG_ALL_ERRORS)
            return path, stdout, stderr

        except (processutils.ProcessExecutionError, OSError) as ex:
            raise exception.ImageRefValidationFailed(
                _("Cannot fetch %(url)s resource. %(exc)s") %
                dict(url=str(image_href), exc=str(ex)))
image_service.py 文件源码 项目:bareon-ironic 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_glance_image_uuid_name(task, url):
    """Converting glance links.

    Links like:
        glance:name
        glance://name
        glance:uuid
        glance://uuid
        name
        uuid
    are converted to tuple
        uuid, name

    """
    urlobj = urlparse.urlparse(url)
    if urlobj.scheme and urlobj.scheme != 'glance':
        raise exception.InvalidImageRef("Only glance images are supported.")

    path = urlobj.path or urlobj.netloc
    img_info = _get_glanceclient(task.context).show(path)
    return img_info['id'], img_info['name']
resources.py 文件源码 项目:bareon-ironic 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def url_download_raw_secured(context, node, url):
    """Download raw contents of the URL bypassing cache and temp files."""
    if not url:
        return
    url = url.rstrip('/')
    url = append_storage_prefix(node, url)

    scheme = parse.urlparse(url).scheme
    sources_with_tenant_isolation = ('glance', 'swift')
    if scheme not in sources_with_tenant_isolation:
        raise bareon_exception.UnsafeUrlError(
            url=url,
            details="Use one of the following storages "
                    "for this resource: %s"
                    % str(sources_with_tenant_isolation))

    resource_storage = image_service.get_image_service(
        url, context=context)
    out = StringIO.StringIO()
    try:
        resource_storage.download(url, out)
        return out.getvalue()
    finally:
        out.close()
followall.py 文件源码 项目:Scrapy-BenchCLI 作者: Parth-Vader 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, **kw):
        super(FollowAllSpider, self).__init__(**kw)
        url = 'http://localhost/books.toscrape.com/index.html'
        if not url.startswith('http://') and not url.startswith('https://'):
            url = 'http://%s/' % url
        self.url = url
        self.allowed_domains = [re.sub(r'^www\.', '', urlparse(url).hostname)]
        self.link_extractor = LinkExtractor()
        self.cookies_seen = set()
        self.previtem = 0
        self.items = 0
        self.timesec = datetime.datetime.utcnow()
requirements.py 文件源码 项目:PornGuys 作者: followloda 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def __init__(self, requirement_string):
        try:
            req = REQUIREMENT.parseString(requirement_string)
        except ParseException as e:
            raise InvalidRequirement(
                "Invalid requirement, parse error at \"{0!r}\"".format(
                    requirement_string[e.loc:e.loc + 8]))

        self.name = req.name
        if req.url:
            parsed_url = urlparse.urlparse(req.url)
            if not (parsed_url.scheme and parsed_url.netloc) or (
                    not parsed_url.scheme and not parsed_url.netloc):
                raise InvalidRequirement("Invalid URL given")
            self.url = req.url
        else:
            self.url = None
        self.extras = set(req.extras.asList() if req.extras else [])
        self.specifier = SpecifierSet(req.specifier)
        self.marker = req.marker if req.marker else None
tpda.py 文件源码 项目:dnsknife 作者: Gandi 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def params(arg):
    """
    Parse `arg` and returns a dict describing the TPDA
    arguments.

    Resolution order:
        - `arg` is a string is parsed as an URL
        - `arg` has an items() method and is parsed
           (works for multidicts, ..)
        - `arg` is a iterable is parsed as a
           ((name, value), (name, value), ..)
    """
    if isinstance(arg, str):
        pr = parse.urlparse(arg)
        if not pr.query:
            raise exceptions.IncompleteURI
        return Params(parse.parse_qsl(pr.query)).params()
    elif hasattr(arg, 'items'):
        return Params(arg.items()).params()
    elif hasattr(arg, '__iter__'):
        return Params(arg).params()
media.py 文件源码 项目:momo 作者: gusibi 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def media_for(hash_key, scheme=None, style=None):
    if not hash_key:
        return None
    url = hash_key.split('!')[0]
    up = urlparse(url)
    hash_domain = up.hostname
    if hash_domain and hash_domain not in Config.QINIU_DOMAINS:
        if hash_domain == 'wx.qlogo.cn':
            hash_key = hash_key.replace('http://', 'https://')
        return hash_key
    _hash = up.path
    if len(_hash) != 0 and _hash[0] == '/':
        _hash = _hash[1:]

    media_host = Config.QINIU_HOST
    url = '%s/%s' % (media_host, _hash)
    if url.endswith('.amr'):
        url = '%s.mp3' % url[:-4]
    if url and style:
        url = '%s!%s' % (url, style)
    return url
deployment_utils.py 文件源码 项目:eclcli 作者: nttcom 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def create_temp_url(swift_client, name, timeout, container=None):

    container = container or '%(name)s-%(uuid)s' % {
        'name': name, 'uuid': uuid.uuid4()}
    object_name = str(uuid.uuid4())

    swift_client.put_container(container)
    key_header = 'x-account-meta-temp-url-key'
    if key_header not in swift_client.head_account():
        swift_client.post_account({
            key_header: six.text_type(uuid.uuid4())[:32]})

    key = swift_client.head_account()[key_header]
    project_path = swift_client.url.split('/')[-1]
    path = '/v1/%s/%s/%s' % (project_path, container, object_name)
    timeout_secs = timeout * 60
    tempurl = swiftclient_utils.generate_temp_url(path, timeout_secs, key,
                                                  'PUT')
    sw_url = urlparse.urlparse(swift_client.url)
    put_url = '%s://%s%s' % (sw_url.scheme, sw_url.netloc, tempurl)
    swift_client.put_object(container, object_name, '')
    return put_url
client.py 文件源码 项目:eclcli 作者: nttcom 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _pagination(self, collection, path, **params):
        if params.get('page_reverse', False):
            linkrel = 'previous'
        else:
            linkrel = 'next'
        next = True
        while next:
            res = self.get(path, params=params)
            yield res
            next = False
            try:
                for link in res['%s_links' % collection]:
                    if link['rel'] == linkrel:
                        query_str = urlparse.urlparse(link['href']).query
                        params = urlparse.parse_qs(query_str)
                        next = True
                        break
            except KeyError:
                break
client.py 文件源码 项目:eclcli 作者: nttcom 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _pagination(self, collection, path, **params):
        if params.get('page_reverse', False):
            linkrel = 'previous'
        else:
            linkrel = 'next'
        next = True
        while next:
            res = self.get(path, params=params)
            yield res
            next = False
            try:
                for link in res['%s_links' % collection]:
                    if link['rel'] == linkrel:
                        query_str = urlparse.urlparse(link['href']).query
                        params = urlparse.parse_qs(query_str)
                        next = True
                        break
            except KeyError:
                break
client.py 文件源码 项目:eclcli 作者: nttcom 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _pagination(self, collection, path, **params):
        if params.get('page_reverse', False):
            linkrel = 'previous'
        else:
            linkrel = 'next'
        next = True
        while next:
            res = self.get(path, params=params)
            yield res
            next = False
            try:
                for link in res['%s_links' % collection]:
                    if link['rel'] == linkrel:
                        query_str = urlparse.urlparse(link['href']).query
                        params = urlparse.parse_qs(query_str)
                        next = True
                        break
            except KeyError:
                break
deploy.py 文件源码 项目:ironic-staging-drivers 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _get_node_ip_heartbeat(task):
    callback_url = task.node.driver_internal_info.get('agent_url', '')
    return urlparse.urlparse(callback_url).netloc.split(':')[0]
deploy.py 文件源码 项目:ironic-staging-drivers 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _parse_root_device_hints(node):
    """Convert string with hints to dict. """
    root_device = node.properties.get('root_device')
    if not root_device:
        return {}
    try:
        parsed_hints = irlib_utils.parse_root_device_hints(root_device)
    except ValueError as e:
        raise exception.InvalidParameterValue(
            _('Failed to validate the root device hints for node %(node)s. '
              'Error: %(error)s') % {'node': node.uuid, 'error': e})
    root_device_hints = {}
    advanced = {}
    for hint, value in parsed_hints.items():
        if isinstance(value, six.string_types):
            if value.startswith('== '):
                root_device_hints[hint] = int(value[3:])
            elif value.startswith('s== '):
                root_device_hints[hint] = urlparse.unquote(value[4:])
            else:
                advanced[hint] = value
        else:
            root_device_hints[hint] = value
    if advanced:
        raise exception.InvalidParameterValue(
            _('Ansible-deploy does not support advanced root device hints '
              'based on oslo.utils operators. '
              'Present advanced hints for node %(node)s are %(hints)s.') % {
                  'node': node.uuid, 'hints': advanced})
    return root_device_hints
deploy.py 文件源码 项目:ironic-staging-drivers 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _prepare_variables(task):
    node = task.node
    i_info = node.instance_info
    image = {}
    for i_key, i_value in i_info.items():
        if i_key.startswith('image_'):
            image[i_key[6:]] = i_value
    image['mem_req'] = _calculate_memory_req(task)

    checksum = image.get('checksum')
    if checksum:
        # NOTE(pas-ha) checksum can be in <algo>:<checksum> format
        # as supported by various Ansible modules, mostly good for
        # standalone Ironic case when instance_info is populated manually.
        # With no <algo> we take that instance_info is populated from Glance,
        # where API reports checksum as MD5 always.
        if ':' not in checksum:
            image['checksum'] = 'md5:%s' % checksum
    _add_ssl_image_options(image)
    variables = {'image': image}
    configdrive = i_info.get('configdrive')
    if configdrive:
        if urlparse.urlparse(configdrive).scheme in ('http', 'https'):
            cfgdrv_type = 'url'
            cfgdrv_location = configdrive
        else:
            cfgdrv_location = _get_configdrive_path(node.uuid)
            with open(cfgdrv_location, 'w') as f:
                f.write(configdrive)
            cfgdrv_type = 'file'
        variables['configdrive'] = {'type': cfgdrv_type,
                                    'location': cfgdrv_location}

    root_device_hints = _parse_root_device_hints(node)
    if root_device_hints:
        variables['root_device_hints'] = root_device_hints

    return variables
plugin.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_addon_host_ident():
    ident = urlparse(options.get('system.url-prefix')).hostname
    if ident in ('localhost', '127.0.0.1', None, ''):
        return 'app.dev.getsentry.com'
    return ident
models.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def base_url(url):
    result = urlparse(url)
    return '%s://%s' % (result.scheme, result.netloc)
exceptions.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def from_request(cls, request):
        host = urlparse(request.url).netloc
        return cls('Unable to reach host: {}'.format(host))
httmockbackend.py 文件源码 项目:placebo 作者: huseyinyilmaz 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_decorator(placebo):
    url = placebo._get_url()

    if isinstance(url, six.string_types):
        url = parse.urlparse(url)

    # TODO should we remove empty parts of url?
    match_kwargs = {
        'scheme': url.scheme,
        'netloc': url.netloc,
        'path': url.path,
        'method': placebo._get_method(),
        'query': url.query
    }

    match_kwargs = {k: v
                    for k, v in match_kwargs.items()
                    if v}

    @httmock.urlmatch(**match_kwargs)
    def mock_response(url, request):
        # Convert parse result type from SplitResult to ParseResult
        url = parse.urlparse(url.geturl())
        # if body is empty httmock returns None
        # but we want ot to be always string.
        body = request.body or ''
        headers = request.headers
        return {'status_code': placebo._get_status(url, headers, body),
                'content': placebo._get_body(url, headers, body),
                'headers': placebo._get_headers(url, headers, body)}

    return httmock.with_httmock(mock_response)
request.py 文件源码 项目:placebo 作者: huseyinyilmaz 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, url, headers, body):
        if isinstance(url, (parse.ParseResult, parse.SplitResult)):
            self.url = url.geturl()
            self.parsed_url = url
        elif isinstance(url, six.string_types):
            self.url = url
            self.parsed_url = parse.urlparse(url)

        self.headers = headers
        self.body = body
httmockbackend.py 文件源码 项目:placebo 作者: huseyinyilmaz 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_decorator(placebo):
    url = placebo._get_url()

    if isinstance(url, six.string_types):
        url = parse.urlparse(url)

    # TODO should we remove empty parts of url?
    match_kwargs = {
        'scheme': url.scheme,
        'netloc': url.netloc,
        'path': url.path,
        'method': placebo._get_method(),
        'query': url.query
    }

    match_kwargs = {k: v
                    for k, v in match_kwargs.items()
                    if v}

    @httmock.urlmatch(**match_kwargs)
    def mock_response(url, request):
        # Convert parse result type from SplitResult to ParseResult
        url = parse.urlparse(url.geturl())
        # if body is empty httmock returns None
        # but we want ot to be always string.
        body = request.body or ''
        headers = request.headers
        return {'status_code': placebo._get_status(url, headers, body),
                'content': placebo._get_body(url, headers, body),
                'headers': placebo._get_headers(url, headers, body)}

    return httmock.with_httmock(mock_response)


问题


面经


文章

微信
公众号

扫码关注公众号