python类urlparse()的实例源码

repository_provider.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def validate_config(self, organization, config, actor=None):
        if config.get('url'):
            client = self.get_client(actor)
            # parse out the repo name and the instance
            parts = urlparse(config['url'])
            instance = parts.netloc
            name = parts.path.rsplit('_git/', 1)[-1]
            project = config.get('project') or name

            try:
                repo = client.get_repo(instance, name, project)
            except Exception as e:
                self.raise_error(e, identity=client.auth)
            config.update({
                'instance': instance,
                'project': project,
                'name': repo['name'],
                'external_id': six.text_type(repo['id']),
                'url': repo['_links']['web']['href'],
            })
        return config
vdi_handler.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _connect_request(self, vdi_ref):
        # request connection to xapi url service for VDI export
        try:
            # create task for VDI export
            label = 'VDI_EXPORT_for_' + self.instance['name']
            desc = 'Exporting VDI for instance: %s' % self.instance['name']
            self.task_ref = self.session.task.create(label, desc)
            LOG.debug("task_ref is %s" % self.task_ref)
            # connect to XS
            xs_url = urlparse.urlparse(self.host_url)
            if xs_url.scheme == 'http':
                conn = httplib.HTTPConnection(xs_url.netloc)
                LOG.debug("using http")
            elif xs_url.scheme == 'https':
                conn = httplib.HTTPSConnection(xs_url.netloc)
                LOG.debug("using https")
            vdi_export_path = utils.get_vdi_export_path(
                self.session, self.task_ref, vdi_ref)
            conn.request('GET', vdi_export_path)
            conn_resp = conn.getresponse()
        except Exception:
            LOG.debug('request connect for vdi export failed')
            raise
        return conn_resp
_common.py 文件源码 项目:pytablereader 作者: thombashi 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def make_temp_file_path_from_url(temp_dir_path, url):
    try:
        url_path = urlparse(url).path
    except AttributeError:
        raise InvalidFilePathError("url must be a string")

    if typepy.is_null_string(url_path):
        raise InvalidFilePathError("invalid URL path: {}".format(url_path))

    temp_name = os.path.basename(url_path.rstrip("/"))
    if typepy.is_null_string(temp_name):
        temp_name = pathvalidate.replace_symbol(
            temp_name, replacement_text="_")

    if typepy.is_null_string(temp_name):
        raise InvalidFilePathError("invalid URL: {}".format(url))

    try:
        return posixpath.join(temp_dir_path, temp_name)
    except (TypeError, AttributeError):
        raise InvalidFilePathError("temp_dir_path must be a string")
downloader.py 文件源码 项目:icrawler 作者: hellock 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def get_filename(self, task, default_ext):
        """Set the path where the image will be saved.

        The default strategy is to use an increasing 6-digit number as
        the filename. You can override this method if you want to set custom
        naming rules. The file extension is kept if it can be obtained from
        the url, otherwise ``default_ext`` is used as extension.

        Args:
            task (dict): The task dict got from ``task_queue``.

        Output:
            Filename with extension.
        """
        url_path = urlparse(task['file_url'])[2]
        extension = url_path.split('.')[-1] if '.' in url_path else default_ext
        file_idx = self.fetched_num + self.file_idx_offset
        return '{:06d}.{}'.format(file_idx, extension)
oauth2.py 文件源码 项目:flask-api-skeleton 作者: ianunruh 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def do_GET(self):
        self.send_response(200)
        self.send_header('Content-Type', 'text/plain')
        self.end_headers()

        parts = urlparse(self.path)
        query = parse_qs(parts.query)

        result = {}
        for k, v in query.items():
            if len(v) == 1:
                result[k] = v[0]
            elif len(v) == 0:
                result[k] = None
            else:
                result[k] = v

        self.server.callback_result = result

        self.wfile.write(b'You can close this window now')
utils.py 文件源码 项目:python-bileanclient 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def sort_url_by_query_keys(url):
    """A helper function which sorts the keys of the query string of a url.

       For example, an input of '/v2/tasks?sort_key=id&sort_dir=asc&limit=10'
       returns '/v2/tasks?limit=10&sort_dir=asc&sort_key=id'. This is to
       prevent non-deterministic ordering of the query string causing
       problems with unit tests.
    :param url: url which will be ordered by query keys
    :returns url: url with ordered query keys
    """
    parsed = urlparse.urlparse(url)
    queries = urlparse.parse_qsl(parsed.query, True)
    sorted_query = sorted(queries, key=lambda x: x[0])

    encoded_sorted_query = urlparse.urlencode(sorted_query, True)

    url_parts = (parsed.scheme, parsed.netloc, parsed.path,
                 parsed.params, encoded_sorted_query,
                 parsed.fragment)

    return urlparse.urlunparse(url_parts)
utils.py 文件源码 项目:python-bileanclient 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def strip_version(endpoint):
    """Strip version from the last component of endpoint if present."""
    # NOTE(flaper87): This shouldn't be necessary if
    # we make endpoint the first argument. However, we
    # can't do that just yet because we need to keep
    # backwards compatibility.
    if not isinstance(endpoint, six.string_types):
        raise ValueError("Expected endpoint")

    version = None
    # Get rid of trailing '/' if present
    endpoint = endpoint.rstrip('/')
    url_parts = parse.urlparse(endpoint)
    (scheme, netloc, path, __, __, __) = url_parts
    path = path.lstrip('/')
    # regex to match 'v1' or 'v2.0' etc
    if re.match('v\d+\.?\d*', path):
        version = float(path.lstrip('v'))
        endpoint = scheme + '://' + netloc
    return endpoint, version
base.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def validate_link(self, link, bookmark=False):
        """Checks if the given link can get correct data."""
        # removes the scheme and net location parts of the link
        url_parts = list(urlparse.urlparse(link))
        url_parts[0] = url_parts[1] = ''

        # bookmark link should not have the version in the URL
        if bookmark and url_parts[2].startswith(PATH_PREFIX):
            return False

        full_path = urlparse.urlunparse(url_parts)
        try:
            self.get_json(full_path, path_prefix='')
            return True
        except Exception:
            return False
utils.py 文件源码 项目:python-kingbirdclient 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_contents_if_file(contents_or_file_name):
    """Get the contents of a file.

    If the value passed in is a file name or file URI, return the
    contents. If not, or there is an error reading the file contents,
    return the value passed in as the contents.

    For example, a workflow definition will be returned if either the
    workflow definition file name, or file URI are passed in, or the
    actual workflow definition itself is passed in.
    """
    try:
        if parse.urlparse(contents_or_file_name).scheme:
            definition_url = contents_or_file_name
        else:
            path = os.path.abspath(contents_or_file_name)
            definition_url = parse.urljoin(
                'file:',
                request.pathname2url(path)
            )
        return request.urlopen(definition_url).read().decode('utf8')
    except Exception:
        return contents_or_file_name
requirements.py 文件源码 项目:ivaochdoc 作者: ivaoch 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self, requirement_string):
        try:
            req = REQUIREMENT.parseString(requirement_string)
        except ParseException as e:
            raise InvalidRequirement(
                "Invalid requirement, parse error at \"{0!r}\"".format(
                    requirement_string[e.loc:e.loc + 8]))

        self.name = req.name
        if req.url:
            parsed_url = urlparse.urlparse(req.url)
            if not (parsed_url.scheme and parsed_url.netloc) or (
                    not parsed_url.scheme and not parsed_url.netloc):
                raise InvalidRequirement("Invalid URL given")
            self.url = req.url
        else:
            self.url = None
        self.extras = set(req.extras.asList() if req.extras else [])
        self.specifier = SpecifierSet(req.specifier)
        self.marker = req.marker if req.marker else None
websocket.py 文件源码 项目:dataplicity-lomond 作者: wildfoundry 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, url, protocols=None, agent=None):
        self.url = url
        self.protocols = protocols or []
        self.agent = agent or constants.USER_AGENT

        self._headers = []

        _url = urlparse(url)
        self.scheme = _url.scheme
        host, _, port = _url.netloc.partition(':')
        if not port:
            port = '443' if self.scheme == 'wss' else '80'
        if not port.isdigit():
            raise ValueError('illegal port value')
        port = int(port)
        self.host = host
        self.port = port
        self._host_port = "{}:{}".format(host, port)
        self.resource = _url.path or '/'
        if _url.query:
            self.resource = "{}?{}".format(self.resource, _url.query)

        self.state = self.State()
test_decorators.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_redirects_without_credentials(self):
        request = self.factory.get('/test')
        request.session = self.session

        @decorators.oauth_required
        def test_view(request):
            return http.HttpResponse('test')  # pragma: NO COVER

        response = test_view(request)
        self.assertIsInstance(response, http.HttpResponseRedirect)
        self.assertEqual(parse.urlparse(response['Location']).path,
                         '/oauth2/oauth2authorize/')
        self.assertIn(
            'return_url=%2Ftest', parse.urlparse(response['Location']).query)

        self.assertEqual(response.status_code,
                         http.HttpResponseRedirect.status_code)
requirements.py 文件源码 项目:hakkuframework 作者: 4shadoww 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, requirement_string):
        try:
            req = REQUIREMENT.parseString(requirement_string)
        except ParseException as e:
            raise InvalidRequirement(
                "Invalid requirement, parse error at \"{0!r}\"".format(
                    requirement_string[e.loc:e.loc + 8]))

        self.name = req.name
        if req.url:
            parsed_url = urlparse.urlparse(req.url)
            if not (parsed_url.scheme and parsed_url.netloc) or (
                    not parsed_url.scheme and not parsed_url.netloc):
                raise InvalidRequirement("Invalid URL given")
            self.url = req.url
        else:
            self.url = None
        self.extras = set(req.extras.asList() if req.extras else [])
        self.specifier = SpecifierSet(req.specifier)
        self.marker = req.marker if req.marker else None
processors.py 文件源码 项目:portia2code 作者: scrapinghub 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def extract_image_url(text):
    text = _strip_url(text)
    imgurl = None
    if text:
        # check if the text is style content
        match = _CSS_IMAGERE.search(text)
        text = match.groups()[0] if match else text
        parsed = urlparse(text)
        path = None
        match = _IMAGE_PATH_RE.search(parsed.path)
        if match:
            path = match.group()
        elif parsed.query:
            match = _GENERIC_PATH_RE.search(parsed.path)
            if match:
                path = match.group()
        if path is not None:
            parsed = list(parsed)
            parsed[2] = path
            imgurl = urlunparse(parsed)
        if not imgurl:
            imgurl = text
    return imgurl
rw_handles.py 文件源码 项目:deb-oslo.vmware 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _fix_esx_url(url, host, port):
        """Fix netloc in the case of an ESX host.

        In the case of an ESX host, the netloc is set to '*' in the URL
        returned in HttpNfcLeaseInfo. It should be replaced with host name
        or IP address.
        """
        urlp = urlparse.urlparse(url)
        if urlp.netloc == '*':
            scheme, netloc, path, params, query, fragment = urlp
            if netutils.is_valid_ipv6(host):
                netloc = '[%s]:%d' % (host, port)
            else:
                netloc = "%s:%d" % (host, port)
            url = urlparse.urlunparse((scheme,
                                       netloc,
                                       path,
                                       params,
                                       query,
                                       fragment))
        return url
requirements.py 文件源码 项目:RealtimePythonChat 作者: quangtqag 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, requirement_string):
        try:
            req = REQUIREMENT.parseString(requirement_string)
        except ParseException as e:
            raise InvalidRequirement(
                "Invalid requirement, parse error at \"{0!r}\"".format(
                    requirement_string[e.loc:e.loc + 8]))

        self.name = req.name
        if req.url:
            parsed_url = urlparse.urlparse(req.url)
            if not (parsed_url.scheme and parsed_url.netloc) or (
                    not parsed_url.scheme and not parsed_url.netloc):
                raise InvalidRequirement("Invalid URL given")
            self.url = req.url
        else:
            self.url = None
        self.extras = set(req.extras.asList() if req.extras else [])
        self.specifier = SpecifierSet(req.specifier)
        self.marker = req.marker if req.marker else None
auth.py 文件源码 项目:myautotest 作者: auuppp 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _decorate_request(self, filters, method, url, headers=None, body=None,
                          auth_data=None):
        if auth_data is None:
            auth_data = self.auth_data
        token, _ = auth_data
        #base_url = self.base_url(filters=filters, auth_data=auth_data)
        base_url = url
        # build authenticated request
        # returns new request, it does not touch the original values
        _headers = copy.deepcopy(headers) if headers is not None else {}
        _headers['X-Auth-Token'] = str(token)
        if url is None or url == "":
            _url = base_url
        else:
            # Join base URL and url, and remove multiple contiguous slashes
            _url = "/".join([base_url, url])
            parts = [x for x in urlparse.urlparse(_url)]
            parts[2] = re.sub("/{2,}", "/", parts[2])
            _url = urlparse.urlunparse(parts)
        # no change to method or body
        return str(_url), _headers, body
execution.py 文件源码 项目:http-prompt 作者: eliangcs 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def visit_ls(self, node, children):
        path = urlparse(self.context_override.url).path
        path = filter(None, path.split('/'))
        nodes = self.context.root.ls(*path)
        if self.output.isatty():
            names = []
            for node in nodes:
                token_type = String if node.data.get('type') == 'dir' else Name
                name = self._colorize(node.name, token_type)
                names.append(name)
            lines = list(colformat(list(names)))
        else:
            lines = [n.name for n in nodes]
        if lines:
            self.output.write('\n'.join(lines))
        return node
coverage2sql_fixtures.py 文件源码 项目:coverage2sql 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _drop_db(self):
        addr = urlparse.urlparse(self.url)
        database = addr.path.strip('/')
        loc_pieces = addr.netloc.split('@')
        host = loc_pieces[1]
        auth_pieces = loc_pieces[0].split(':')
        user = auth_pieces[0]
        password = ""
        if len(auth_pieces) > 1:
            if auth_pieces[1].strip():
                password = "-p\"%s\"" % auth_pieces[1]
        sql = ("drop database if exists %(database)s; create "
               "database %(database)s;") % {'database': database}
        cmd = ("mysql -u \"%(user)s\" %(password)s -h %(host)s "
               "-e \"%(sql)s\"") % {'user': user, 'password': password,
                                    'host': host, 'sql': sql}
        execute_cmd(cmd)
httpclient.py 文件源码 项目:python-zunclient 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_connection_params(endpoint, **kwargs):
        parts = urlparse.urlparse(endpoint)

        # trim API version and trailing slash from endpoint
        path = parts.path
        path = path.rstrip('/').rstrip(API_VERSION)

        _args = (parts.hostname, parts.port, path)
        _kwargs = {'timeout': (float(kwargs.get('timeout'))
                               if kwargs.get('timeout') else 600)}

        if parts.scheme == 'https':
            _class = VerifiedHTTPSConnection
            _kwargs['ca_file'] = kwargs.get('ca_file', None)
            _kwargs['cert_file'] = kwargs.get('cert_file', None)
            _kwargs['key_file'] = kwargs.get('key_file', None)
            _kwargs['insecure'] = kwargs.get('insecure', False)
        elif parts.scheme == 'http':
            _class = six.moves.http_client.HTTPConnection
        else:
            msg = 'Unsupported scheme: %s' % parts.scheme
            raise exceptions.EndpointException(msg)

        return (_class, _args, _kwargs)
test_decorators.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_redirects_without_credentials(self):
        request = self.factory.get('/test')
        request.session = self.session

        @decorators.oauth_required
        def test_view(request):
            return http.HttpResponse('test')  # pragma: NO COVER

        response = test_view(request)
        self.assertIsInstance(response, http.HttpResponseRedirect)
        self.assertEqual(parse.urlparse(response['Location']).path,
                         '/oauth2/oauth2authorize/')
        self.assertIn(
            'return_url=%2Ftest', parse.urlparse(response['Location']).query)

        self.assertEqual(response.status_code,
                         http.HttpResponseRedirect.status_code)
service_tests.py 文件源码 项目:partycrasher 作者: naturalness 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_absolute_url(self):
        """
        Does the server return absolute URIs?

        Relies on the root returing a ``self`` object.
        """

        response = requests.get(self.root_url)

        resource = response.json().get('href')
        assert resource is not None

        href = urlparse(resource)
        # Test in decreasing order of likely correctness:
        # Path => Domain => Scheme
        assert href.path == '/'
        # Get the origin name, minus the scheme.
        assert href.netloc == urlparse(self.origin).netloc
        assert href.scheme == 'http'

        resource = response.json().get('buckets').get('1.0')
        href = urlparse(resource)
        assert href.path.endswith('buckets/1.0')
session.py 文件源码 项目:rets 作者: refindlyllc 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def add_capability(self, name, uri):
        """
        Add a capability of the RETS board
        :param name: The name of the capability
        :param uri: The capability URI given by the RETS board
        :return: None
        """

        parse_results = urlparse(uri)
        if parse_results.hostname is None:
            # relative URL given, so build this into an absolute URL
            login_url = self.capabilities.get('Login')
            if not login_url:
                logger.error("There is no login URL stored, so additional capabilities cannot be added.")
                raise ValueError("Cannot automatically determine absolute path for {0!s} given.".format(uri))

            parts = urlparse(login_url)
            uri = parts.scheme + '://' + parts.hostname + '/' + uri.lstrip('/')

        self.capabilities[name] = uri
requirements.py 文件源码 项目:Tencent_Cartoon_Download 作者: Fretice 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, requirement_string):
        try:
            req = REQUIREMENT.parseString(requirement_string)
        except ParseException as e:
            raise InvalidRequirement(
                "Invalid requirement, parse error at \"{0!r}\"".format(
                    requirement_string[e.loc:e.loc + 8]))

        self.name = req.name
        if req.url:
            parsed_url = urlparse.urlparse(req.url)
            if not (parsed_url.scheme and parsed_url.netloc) or (
                    not parsed_url.scheme and not parsed_url.netloc):
                raise InvalidRequirement("Invalid URL given")
            self.url = req.url
        else:
            self.url = None
        self.extras = set(req.extras.asList() if req.extras else [])
        self.specifier = SpecifierSet(req.specifier)
        self.marker = req.marker if req.marker else None
chunked_request.py 文件源码 项目:lddmm-ot 作者: jeanfeydy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _get_proxy_config(self):
        """
        Determine if self._url should be passed through a proxy. If so, return
        the appropriate proxy_server and proxy_port. Assumes https_proxy is used
        when ssl_enabled=True.

        """

        proxy_server = None
        proxy_port = None
        ssl_enabled = self._ssl_enabled

        if ssl_enabled:
            proxy = os.environ.get("https_proxy")
        else:
            proxy = os.environ.get("http_proxy")
        no_proxy = os.environ.get("no_proxy")
        no_proxy_url = no_proxy and self._server in no_proxy

        if proxy and not no_proxy_url:
            p = urlparse(proxy)
            proxy_server = p.hostname
            proxy_port = p.port

        return proxy_server, proxy_port
example_utils.py 文件源码 项目:elm 作者: ContinuumIO 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, base_url='https://hydro1.gesdisc.eosdis.nasa.gov/data/NLDAS/', **layout_kwargs):
        self.base_url = base_url
        self.selected_url = None
        if 'min_width' not in layout_kwargs:
            layout_kwargs['min_width'] = '30%'
        self.label_layout = Layout(**layout_kwargs)

        dd = widgets.Select(
            options=self.get_links(
                base_url,
                href_filter=self.dir_and_not_data,
            ),
            description='', #urlparse(base_url).path,
        )

        dd.observe(partial(self.on_value_change, url=self.base_url), names='value')
        lbl = widgets.Label(urlparse(self.base_url).path, layout=self.label_layout)
        hbox = widgets.HBox([lbl, dd])
        self.elts = [hbox, lbl, dd]
        display(hbox)
example_utils.py 文件源码 项目:elm 作者: ContinuumIO 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def on_value_change(self, change, url):
        next_url = change['new']
        if next_url is None: # 'Select...' chosen
            return
        if next_url.endswith('.grb'): # File reached
            return self.select_url(next_url)
        [w.close() for w in self.elts]
        links = self.get_links(next_url,
                               href_filter=(self.dir_and_not_data
                                            if next_url == self.base_url
                                            else self.dir_or_grib))
        if not links:
            return
        next_dd = widgets.Select(
            options=links,
            description='', #urlparse(url).path,
        )
        next_dd.observe(partial(self.on_value_change, url=next_url), names='value')
        lbl = widgets.Label(urlparse(next_url).path, layout=self.label_layout)
        hbox = widgets.HBox([lbl, next_dd])
        self.elts = [hbox, lbl, next_dd]
        display(hbox)
requirements.py 文件源码 项目:coolrelation 作者: mrtial 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, requirement_string):
        try:
            req = REQUIREMENT.parseString(requirement_string)
        except ParseException as e:
            raise InvalidRequirement(
                "Invalid requirement, parse error at \"{0!r}\"".format(
                    requirement_string[e.loc:e.loc + 8]))

        self.name = req.name
        if req.url:
            parsed_url = urlparse.urlparse(req.url)
            if not (parsed_url.scheme and parsed_url.netloc) or (
                    not parsed_url.scheme and not parsed_url.netloc):
                raise InvalidRequirement("Invalid URL given")
            self.url = req.url
        else:
            self.url = None
        self.extras = set(req.extras.asList() if req.extras else [])
        self.specifier = SpecifierSet(req.specifier)
        self.marker = req.marker if req.marker else None
requirements.py 文件源码 项目:python-group-proj 作者: Sharcee 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, requirement_string):
        try:
            req = REQUIREMENT.parseString(requirement_string)
        except ParseException as e:
            raise InvalidRequirement(
                "Invalid requirement, parse error at \"{0!r}\"".format(
                    requirement_string[e.loc:e.loc + 8]))

        self.name = req.name
        if req.url:
            parsed_url = urlparse.urlparse(req.url)
            if not (parsed_url.scheme and parsed_url.netloc) or (
                    not parsed_url.scheme and not parsed_url.netloc):
                raise InvalidRequirement("Invalid URL given")
            self.url = req.url
        else:
            self.url = None
        self.extras = set(req.extras.asList() if req.extras else [])
        self.specifier = SpecifierSet(req.specifier)
        self.marker = req.marker if req.marker else None
image_service.py 文件源码 项目:bareon-ironic 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def validate_href(self, image_href):
        path = urlparse.urlparse(image_href).path.lstrip('/')
        if not path:
            raise exception.ImageRefValidationFailed(
                _("No path specified in swift resource reference: %s. "
                  "Reference must be like swift:container/path")
                % str(image_href))

        container, s, object = path.partition('/')
        try:
            headers = self.client.head_object(container, object)
        except exception.SwiftOperationError as e:
            raise exception.ImageRefValidationFailed(
                _("Cannot fetch %(url)s resource. %(exc)s") %
                dict(url=str(image_href), exc=str(e)))

        return (container, object, headers)


问题


面经


文章

微信
公众号

扫码关注公众号