python类urljoin()的实例源码

keystone.py 文件源码 项目:iotronic 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_keystone_url(auth_url, auth_version):
    """Gives an http/https url to contact keystone.

    Given an auth_url and auth_version, this method generates the url in
    which keystone can be reached.

    :param auth_url: a http or https url to be inspected (like
        'http://127.0.0.1:9898/').
    :param auth_version: a string containing the version (like v2, v3.0, etc)
    :returns: a string containing the keystone url
    """
    api_v3 = _is_apiv3(auth_url, auth_version)
    api_version = 'v3' if api_v3 else 'v2.0'
    # NOTE(lucasagomes): Get rid of the trailing '/' otherwise urljoin()
    #   fails to override the version in the URL
    return parse.urljoin(auth_url.rstrip('/'), api_version)
http.py 文件源码 项目:storj-python-sdk 作者: Storj 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _prepare_request(self, **kwargs):
        """Prepares a HTTP request.
        Args:
            kwargs (dict): keyword arguments for the authentication function
                (``_add_ecdsa_signature()`` or ``_add_basic_auth()``) and
                :py:class:`requests.Request` class.
        Raises:
            AssertionError: in case ``kwargs['path']`` doesn't start with ``/``.
        """

        kwargs.setdefault('headers', {})

        # Add appropriate authentication headers
        if isinstance(self.private_key, SigningKey):
            self._add_ecdsa_signature(kwargs)
        elif self.email and self.password:
            self._add_basic_auth(kwargs)

        # Generate URL from path
        path = kwargs.pop('path')
        assert path.startswith('/')
        kwargs['url'] = urljoin(self.api_url, path)

        return requests.Request(**kwargs).prepare()
utils.py 文件源码 项目:python-kingbirdclient 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_contents_if_file(contents_or_file_name):
    """Get the contents of a file.

    If the value passed in is a file name or file URI, return the
    contents. If not, or there is an error reading the file contents,
    return the value passed in as the contents.

    For example, a workflow definition will be returned if either the
    workflow definition file name, or file URI are passed in, or the
    actual workflow definition itself is passed in.
    """
    try:
        if parse.urlparse(contents_or_file_name).scheme:
            definition_url = contents_or_file_name
        else:
            path = os.path.abspath(contents_or_file_name)
            definition_url = parse.urljoin(
                'file:',
                request.pathname2url(path)
            )
        return request.urlopen(definition_url).read().decode('utf8')
    except Exception:
        return contents_or_file_name
uploads.py 文件源码 项目:conda-concourse-ci 作者: conda 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def get_upload_channels(upload_config_dir, subdir, channels=None):
    """thought here was to provide whatever channel you have set as an output also to be an input

    Killed this in favor of setting channels in condarc in the docker image.
    """
    configurations = load_yaml_config_dir(upload_config_dir)
    channels = channels or []

    for config in configurations:
        if 'token' in config:
            channels.append(config['user'])
        elif 'server' in config:
            channels.append(parse.urljoin('http://' + config['server'],
                            config['destination_path'].format(subdir=subdir)))
        else:
            channels.append(config['channel'])
    return channels
test_pbm.py 文件源码 项目:deb-oslo.vmware 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_get_pbm_wsdl_location(self):
        wsdl = pbm.get_pbm_wsdl_location(None)
        self.assertIsNone(wsdl)

        def expected_wsdl(version):
            driver_abs_dir = os.path.abspath(os.path.dirname(pbm.__file__))
            path = os.path.join(driver_abs_dir, 'wsdl', version,
                                'pbmService.wsdl')
            return urlparse.urljoin('file:', urllib.pathname2url(path))

        with mock.patch('os.path.exists') as path_exists:
            path_exists.return_value = True
            wsdl = pbm.get_pbm_wsdl_location('5')
            self.assertEqual(expected_wsdl('5'), wsdl)
            wsdl = pbm.get_pbm_wsdl_location('5.5')
            self.assertEqual(expected_wsdl('5.5'), wsdl)
            wsdl = pbm.get_pbm_wsdl_location('5.5.1')
            self.assertEqual(expected_wsdl('5.5'), wsdl)
            path_exists.return_value = False
            wsdl = pbm.get_pbm_wsdl_location('5.5')
            self.assertIsNone(wsdl)
pbm.py 文件源码 项目:deb-oslo.vmware 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_pbm_wsdl_location(vc_version):
    """Return PBM WSDL file location corresponding to VC version.

    :param vc_version: a dot-separated version string. For example, "1.2".
    :return: the pbm wsdl file location.
    """
    if not vc_version:
        return
    ver = vc_version.split('.')
    major_minor = ver[0]
    if len(ver) >= 2:
        major_minor = '%s.%s' % (major_minor, ver[1])
    curr_dir = os.path.abspath(os.path.dirname(__file__))
    pbm_service_wsdl = os.path.join(curr_dir, 'wsdl', major_minor,
                                    'pbmService.wsdl')
    if not os.path.exists(pbm_service_wsdl):
        LOG.warning(_LW("PBM WSDL file %s not found."), pbm_service_wsdl)
        return
    pbm_wsdl = urlparse.urljoin('file:', urllib.pathname2url(pbm_service_wsdl))
    LOG.debug("Using PBM WSDL location: %s.", pbm_wsdl)
    return pbm_wsdl
syncsession.py 文件源码 项目:morango 作者: learningequality 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _request(self, endpoint, method="GET", lookup=None, data={}, params={}, userargs=None, password=None):
        """
        Generic request method designed to handle any morango endpoint.

        :param endpoint: constant representing which morango endpoint we are querying
        :param method: HTTP verb/method for request
        :param lookup: the pk value for the specific object we are querying
        :param data: dict that will be form-encoded in request
        :param params: dict to be sent as part of URL's query string
        :param userargs: Authorization credentials
        :param password:
        :return: ``Response`` object from request
        """
        # convert user arguments into query str for passing to auth layer
        if isinstance(userargs, dict):
            userargs = "&".join(["{}={}".format(key, val) for (key, val) in iteritems(userargs)])

        # build up url and send request
        if lookup:
            lookup = lookup + '/'
        url = urljoin(urljoin(self.base_url, endpoint), lookup)
        auth = (userargs, password) if userargs else None
        resp = requests.request(method, url, json=data, params=params, auth=auth)
        resp.raise_for_status()
        return resp
template_utils.py 文件源码 项目:python-zunclient 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_file_contents(from_data, files, base_url=None,
                      ignore_if=None):

    if isinstance(from_data, dict):
        for key, value in from_data.items():
            if ignore_if and ignore_if(key, value):
                continue

            if base_url and not base_url.endswith('/'):
                base_url = base_url + '/'

            str_url = parse.urljoin(base_url, value)
            if str_url not in files:
                file_content = utils.read_url_content(str_url)
                if is_template(file_content):
                    template = get_template_contents(
                        template_url=str_url, files=files)[1]
                    file_content = jsonutils.dumps(template)
                files[str_url] = file_content
            # replace the data value with the normalised absolute URL
            from_data[key] = str_url
test_monzo_api.py 文件源码 项目:pymonzo 作者: pawelad 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_class_get_oauth_token_method(self, mocker, mocked_monzo):
        """Test class `_get_oauth_token` method"""
        mocked_fetch_token = mocker.MagicMock()
        mocked_oauth2_session = mocker.patch('pymonzo.monzo_api.OAuth2Session')
        mocked_oauth2_session.return_value.fetch_token = mocked_fetch_token

        token = mocked_monzo._get_oauth_token()

        assert token == mocked_fetch_token.return_value

        mocked_oauth2_session.assert_called_once_with(
            client_id=mocked_monzo._client_id,
            redirect_uri=config.PYMONZO_REDIRECT_URI,
        )
        mocked_fetch_token.assert_called_once_with(
            token_url=urljoin(mocked_monzo.api_url, '/oauth2/token'),
            code=mocked_monzo._auth_code,
            client_secret=mocked_monzo._client_secret,
        )
monzo_api.py 文件源码 项目:pymonzo 作者: pawelad 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _get_oauth_token(self):
        """
        Get Monzo access token via OAuth2 `authorization code` grant type.

        Official docs:
            https://monzo.com/docs/#acquire-an-access-token

        :returns: OAuth 2 access token
        :rtype: dict
        """
        url = urljoin(self.api_url, '/oauth2/token')

        oauth = OAuth2Session(
            client_id=self._client_id,
            redirect_uri=config.PYMONZO_REDIRECT_URI,
        )

        token = oauth.fetch_token(
            token_url=url,
            code=self._auth_code,
            client_secret=self._client_secret,
        )

        return token
webloader.py 文件源码 项目:webkivy 作者: miohtama 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def crawl(self, url, base_url):
        """Crawl .html page and extract all URls we think are part of application from there.

        Parallerize downloads using threads.
        """

        resp = requests.get(url)

        # See through redirects
        final_base_url = resp.url

        tree = lxml.html.fromstring(resp.content)
        elems = tree.cssselect("a")
        links = [urljoin(final_base_url, elem.attrib.get("href", "")) for elem in elems]
        links = [link for link in links if is_likely_app_part(link, base_url)]

        # Load all links paraller
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            future_to_url = {executor.submit(self.fetch_file, link, base_url): link for link in links}
            for future in concurrent.futures.as_completed(future_to_url):
                future.result()  # Raise exception in main thread if bad stuff happened
redfish.py 文件源码 项目:deb-python-proliantutils 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _get_sushy_system(self, system_id):
        """Get the sushy system for system_id

        :param system_id: The identity of the System resource
        :returns: the Sushy system instance
        :raises: IloError
        """
        system_url = parse.urljoin(self._sushy.get_system_collection_path(),
                                   system_id)
        try:
            return self._sushy.get_system(system_url)
        except sushy.exceptions.SushyError as e:
            msg = (self._('The Redfish System "%(system)s" was not found. '
                          'Error %(error)s') %
                   {'system': system_id, 'error': str(e)})
            LOG.debug(msg)
            raise exception.IloError(msg)
redfish.py 文件源码 项目:deb-python-proliantutils 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _get_sushy_manager(self, manager_id):
        """Get the sushy Manager for manager_id

        :param manager_id: The identity of the Manager resource
        :returns: the Sushy Manager instance
        :raises: IloError
        """
        manager_url = parse.urljoin(self._sushy.get_manager_collection_path(),
                                    manager_id)
        try:
            return self._sushy.get_manager(manager_url)
        except sushy.exceptions.SushyError as e:
            msg = (self._('The Redfish Manager "%(manager)s" was not found. '
                          'Error %(error)s') %
                   {'manager': manager_id, 'error': str(e)})
            LOG.debug(msg)
            raise exception.IloError(msg)
client.py 文件源码 项目:python-distilclient 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def collect_usage(self):
        url = urlparse.urljoin(self.endpoint, "collect_usage")

        headers = {"Content-Type": "application/json",
                   "X-Auth-Token": self.auth_token}

        try:
            response = requests.post(url, headers=headers,
                                     verify=not self.insecure)
            if response.status_code != 200:
                raise AttributeError("Usage cycle failed: %s  code: %s" %
                                     (response.text, response.status_code))
            else:
                return response.json()
        except ConnectionError as e:
            print(e)
client.py 文件源码 项目:python-distilclient 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def last_collected(self):
        url = urlparse.urljoin(self.endpoint, "last_collected")

        headers = {"Content-Type": "application/json",
                   "X-Auth-Token": self.auth_token}

        try:
            response = requests.get(url, headers=headers,
                                    verify=not self.insecure)
            if response.status_code != 200:
                raise AttributeError("Get last collected failed: %s code: %s" %
                                     (response.text, response.status_code))
            else:
                return response.json()
        except ConnectionError as e:
            print(e)
client.py 文件源码 项目:python-distilclient 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _query_usage(self, tenant, start, end, endpoint):
        url = urlparse.urljoin(self.endpoint, endpoint)

        headers = {"X-Auth-Token": self.auth_token}

        params = {"tenant": tenant,
                  "start": start,
                  "end": end
                  }

        try:
            response = requests.get(url, headers=headers,
                                    params=params,
                                    verify=not self.insecure)
            if response.status_code != 200:
                raise AttributeError("Get usage failed: %s code: %s" %
                                     (response.text, response.status_code))
            else:
                return response.json()
        except ConnectionError as e:
            print(e)
httplib2_test.py 文件源码 项目:httplib2shim 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testGet300WithLocation(self):
        # Test the we automatically follow 300 redirects if a
        # Location: header is provided
        uri = urllib_parse.urljoin(base, "300/with-location-header.asis")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
        self.assertEqual(content, b"This is the final destination.\n")
        self.assertEqual(response.previous.status, 300)
        self.assertEqual(response.previous.fromcache, False)

        # Confirm that the intermediate 300 is not cached
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
        self.assertEqual(content, b"This is the final destination.\n")
        self.assertEqual(response.previous.status, 300)
        self.assertEqual(response.previous.fromcache, False)
httplib2_test.py 文件源码 项目:httplib2shim 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testGet301(self):
        # Test that we automatically follow 301 redirects
        # and that we cache the 301 response
        uri = urllib_parse.urljoin(base, "301/onestep.asis")
        destination = urllib_parse.urljoin(base, "302/final-destination.txt")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
        self.assertTrue('content-location' in response)
        self.assertEqual(response['content-location'], destination)
        self.assertEqual(content, b"This is the final destination.\n")
        self.assertEqual(response.previous.status, 301)
        self.assertEqual(response.previous.fromcache, False)

        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
        self.assertEqual(response['content-location'], destination)
        self.assertEqual(content, b"This is the final destination.\n")
        self.assertEqual(response.previous.status, 301)
        self.assertEqual(response.previous.fromcache, True)
httplib2_test.py 文件源码 项目:httplib2shim 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testGet302RedirectionLimit(self):
        # Test that we can set a lower redirection limit
        # and that we raise an exception when we exceed
        # that limit.
        self.http.force_exception_to_status_code = False

        uri = urllib_parse.urljoin(base, "302/twostep.asis")
        try:
            (response, content) = self.http.request(uri, "GET", redirections=1)
            self.fail("This should not happen")
        except httplib2.RedirectLimit:
            pass
        except Exception:
            self.fail("Threw wrong kind of exception ")

        # Re-run the test with out the exceptions
        self.http.force_exception_to_status_code = True

        (response, content) = self.http.request(uri, "GET", redirections=1)
        self.assertEqual(response.status, 500)
        self.assertTrue(response.reason.startswith("Redirected more"))
        self.assertEqual("302", response['status'])
        self.assertTrue(content.startswith(b"<html>"))
        self.assertTrue(response.previous is not None)
httplib2_test.py 文件源码 项目:httplib2shim 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testGetIgnoreEtag(self):
        # Test that we can forcibly ignore ETags
        uri = urllib_parse.urljoin(base, "reflector/reflector.cgi")
        (response, content) = self.http.request(uri, "GET", headers={
            'accept-encoding': 'identity'})
        self.assertNotEqual(response['etag'], "")

        (response, content) = self.http.request(uri, "GET", headers={
            'accept-encoding': 'identity', 'cache-control': 'max-age=0'})
        d = self.reflector(content)
        self.assertTrue('HTTP_IF_NONE_MATCH' in d)

        self.http.ignore_etag = True
        (response, content) = self.http.request(uri, "GET", headers={
            'accept-encoding': 'identity', 'cache-control': 'max-age=0'})
        d = self.reflector(content)
        self.assertEqual(response.fromcache, False)
        self.assertFalse('HTTP_IF_NONE_MATCH' in d)
httplib2_test.py 文件源码 项目:httplib2shim 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def testGet307(self):
        # Test that we do follow 307 redirects but
        # do not cache the 307
        uri = urllib_parse.urljoin(base, "307/onestep.asis")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
        self.assertEqual(content, b"This is the final destination.\n")
        self.assertEqual(response.previous.status, 307)
        self.assertEqual(response.previous.fromcache, False)

        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
        self.assertEqual(response.fromcache, True)
        self.assertEqual(content, b"This is the final destination.\n")
        self.assertEqual(response.previous.status, 307)
        self.assertEqual(response.previous.fromcache, False)
httplib2_test.py 文件源码 项目:httplib2shim 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testGetGZipFailure(self):
        # Test that we raise a good exception when the gzip fails
        self.http.force_exception_to_status_code = False
        uri = urllib_parse.urljoin(base, "gzip/failed-compression.asis")
        try:
            (response, content) = self.http.request(uri, "GET")
            self.fail("Should never reach here")
        except httplib2.FailedToDecompressContent:
            pass
        except Exception:
            self.fail("Threw wrong kind of exception")

        # Re-run the test with out the exceptions
        self.http.force_exception_to_status_code = True

        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 500)
        self.assertTrue(response.reason.startswith("Content purported"))
httplib2_test.py 文件源码 项目:httplib2shim 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testGetDeflateFailure(self):
        # Test that we raise a good exception when the deflate fails
        self.http.force_exception_to_status_code = False

        uri = urllib_parse.urljoin(base, "deflate/failed-compression.asis")
        try:
            (response, content) = self.http.request(uri, "GET")
            self.fail("Should never reach here")
        except httplib2.FailedToDecompressContent:
            pass
        except Exception:
            self.fail("Threw wrong kind of exception")

        # Re-run the test with out the exceptions
        self.http.force_exception_to_status_code = True

        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 500)
        self.assertTrue(response.reason.startswith("Content purported"))
httplib2_test.py 文件源码 项目:httplib2shim 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def testGetCacheControlNoCache(self):
        # Test Cache-Control: no-cache on requests
        uri = urllib_parse.urljoin(base, "304/test_etag.txt")
        (response, content) = self.http.request(
            uri, "GET", headers={'accept-encoding': 'identity'})
        self.assertNotEqual(response['etag'], "")
        (response, content) = self.http.request(
            uri, "GET", headers={'accept-encoding': 'identity'})
        self.assertEqual(response.status, 200)
        self.assertEqual(response.fromcache, True)

        (response, content) = self.http.request(
            uri, "GET", headers={
                'accept-encoding': 'identity',
                'Cache-Control': 'no-cache'})
        self.assertEqual(response.status, 200)
        self.assertEqual(response.fromcache, False)
httplib2_test.py 文件源码 项目:httplib2shim 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testGetCacheControlPragmaNoCache(self):
        # Test Pragma: no-cache on requests
        uri = urllib_parse.urljoin(base, "304/test_etag.txt")
        (response, content) = self.http.request(
            uri, "GET", headers={'accept-encoding': 'identity'})
        self.assertNotEqual(response['etag'], "")
        (response, content) = self.http.request(
            uri, "GET", headers={'accept-encoding': 'identity'})
        self.assertEqual(response.status, 200)
        self.assertEqual(response.fromcache, True)

        (response, content) = self.http.request(
            uri, "GET", headers={
                'accept-encoding': 'identity',
                'Pragma': 'no-cache'})
        self.assertEqual(response.status, 200)
        self.assertEqual(response.fromcache, False)
httplib2_test.py 文件源码 项目:httplib2shim 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testBasicAuthTwoDifferentCredentials(self):
        # Test Basic Authentication with multiple sets of credentials
        uri = urllib_parse.urljoin(base, "basic2/file.txt")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 401)

        uri = urllib_parse.urljoin(base, "basic2/")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 401)

        self.http.add_credentials('fred', 'barney')
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)

        uri = urllib_parse.urljoin(base, "basic2/file.txt")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
httplib2_test.py 文件源码 项目:httplib2shim 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testDigestAuthNextNonceAndNC(self):
        # Test that if the server sets nextnonce that we reset
        # the nonce count back to 1
        uri = urllib_parse.urljoin(base, "digest/file.txt")
        self.http.add_credentials('joe', 'password')
        (response, content) = self.http.request(
            uri, "GET", headers={"cache-control": "no-cache"})
        info = httplib2._parse_www_authenticate(
            response, 'authentication-info')
        self.assertEqual(response.status, 200)
        (response, content) = self.http.request(
            uri, "GET", headers={"cache-control": "no-cache"})
        info2 = httplib2._parse_www_authenticate(
            response, 'authentication-info')
        self.assertEqual(response.status, 200)

        if 'nextnonce' in info:
            self.assertEqual(info2['nc'], 1)
httplib2_test.py 文件源码 项目:httplib2shim 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testReflector(self):
        uri = urllib_parse.urljoin(base, "reflector/reflector.cgi")
        (response, content) = self.http.request(uri, "GET")
        d = self.reflector(content)
        self.assertTrue('HTTP_USER_AGENT' in d)

    # NOTE: disabled because this isn't relevant to the shim.
    # def testConnectionClose(self):
    #     uri = "http://www.google.com/"
    #     (response, content) = self.http.request(uri, "GET")
    #     for c in self.http.connections.values():
    #         self.assertNotEqual(None, c.sock)
    #     (response, content) = self.http.request(
    #         uri, "GET", headers={"connection": "close"})
    #     for c in self.http.connections.values():
    #         self.assertEqual(None, c.sock)
__init__.py 文件源码 项目:bottlecap 作者: foxx 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_full_url(self, routename, **kwargs):
        """
        Construct full URL using components from current
        bottle request, merged with get_url()

        For example:
        https://example.com/hello?world=1

        XXX: Needs UT
        """
        url = self.app.get_url(routename, **kwargs)
        return urljoin(self.base_url, url)


############################################################
# CBVs (class based views)
############################################################
base.py 文件源码 项目:PyBloqs 作者: manahl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def show(self, fmt="html", header_block=None, footer_block=None):
        """
        Show the block in a browser.

        :param fmt: The format of the saved block. Supports the same output as `Block.save`
        :return: Path to the block file.
        """
        file_name = str_base(hash(self._id)) + "." + fmt
        file_path = self.publish(os.path.expanduser(os.path.join(user_config["tmp_html_dir"], file_name)),
                                 header_block=header_block, footer_block=footer_block)

        try:
            url_base = user_config["public_dir"]
        except KeyError:
            path = os.path.expanduser(file_path)
        else:
            path = urljoin(url_base, os.path.expanduser(user_config["tmp_html_dir"] + "/" + file_name))

        webbrowser.open_new_tab(path)

        return path


问题


面经


文章

微信
公众号

扫码关注公众号