python类urljoin()的实例源码

api.py 文件源码 项目:python-percy 作者: teeberg 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def request(self, method, path, allowed_statuses=None, **kwargs):
        from . import __version__

        url = urljoin(self.base_url, path, '/')
        headers = kwargs.setdefault('headers', {})
        headers['Authorization'] = 'Token token="{}"'.format(self.access_token)
        headers['User-Agent'] = 'python-percy/{}'.format(__version__)
        try:
            response = requests.request(method, url, **kwargs)
            self._debug_response(response)
        except Exception as ex:
            l.debug('%s %s -> Exception: %s: %s', method, url, ex.__class__.__name__, ex.args)
            raise

        if not allowed_statuses or response.status_code not in allowed_statuses:
            self._check_response_error(response)
            assert response.status_code < 300, (response.status_code, response.content)
        return response
test_dashboard_basic_ops.py 文件源码 项目:tempest-horizon 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def user_login(self, username, password):
        response = self._get_opener().open(CONF.dashboard.dashboard_url).read()

        # Grab the CSRF token and default region
        parser = HorizonHTMLParser()
        parser.feed(response)

        # construct login url for dashboard, discovery accommodates non-/ web
        # root for dashboard
        login_url = parse.urljoin(CONF.dashboard.dashboard_url, parser.login)

        # Prepare login form request
        req = request.Request(login_url)
        req.add_header('Content-type', 'application/x-www-form-urlencoded')
        req.add_header('Referer', CONF.dashboard.dashboard_url)

        # Pass the default domain name regardless of the auth version in order
        # to test the scenario of when horizon is running with keystone v3
        params = {'username': username,
                  'password': password,
                  'region': parser.region,
                  'domain': CONF.auth.default_credentials_domain_name,
                  'csrfmiddlewaretoken': parser.csrf_token}
        self._get_opener().open(req, parse.urlencode(params))
tempestmail.py 文件源码 项目:dci-ansible-agent 作者: redhat-cip 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_index(self):
        '''Get index page of periodic job and returns all links to jobs'''
        url = urljoin(self.config.log_url, self.args.job)
        res = get_html(url)
        if res is None or not res.ok:
            return []

        body = res.content.decode() if res.content else ''
        hrefs = [HREF.search(l).group(1)
                 for l in body.splitlines() if HREF.search(l)]
        links = ["/".join((url, link))
                 for link in hrefs if JOBRE.match(link)]
        if links:
            # Number of links to return
            return links[:NLINKS]
        else:
            return []
tempestmail.py 文件源码 项目:dci-ansible-agent 作者: redhat-cip 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_index(self):
        '''Get index page of periodic job and returns all links to jobs'''
        url = urljoin(self.config.log_url, self.args.job)
        res = get_html(url)
        if res is None or not res.ok:
            return []

        body = res.content.decode() if res.content else ''
        hrefs = [HREF.search(l).group(1)
                 for l in body.splitlines() if HREF.search(l)]
        links = ["/".join((url, link))
                 for link in hrefs if JOBRE.match(link)]
        if links:
            # Number of links to return
            return links[:NLINKS]
        else:
            return []
tempestmail.py 文件源码 项目:dci-ansible-agent 作者: redhat-cip 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_index(self):
        '''Get index page of periodic job and returns all links to jobs'''
        url = urljoin(self.config.log_url, self.args.job)
        res = get_html(url)
        if res is None or not res.ok:
            return []

        body = res.content.decode() if res.content else ''
        hrefs = [HREF.search(l).group(1)
                 for l in body.splitlines() if HREF.search(l)]
        links = ["/".join((url, link))
                 for link in hrefs if JOBRE.match(link)]
        if links:
            # Number of links to return
            return links[:NLINKS]
        else:
            return []
http.py 文件源码 项目:cloak-server 作者: encryptme 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _call(method, path, api_version=None, **kwargs):
    # type: (str, str, str, **Any) -> requests.Response
    url = urljoin(base_url, '/api/server/')
    url = urljoin(url, path)

    headers = kwargs.setdefault('headers', {})
    if api_version is not None:
        headers['X-Cloak-API-Version'] = api_version
    elif default_api_version is not None:
        headers['X-Cloak-API-Version'] = default_api_version

    if method == 'GET':
        response = session.get(url, **kwargs)
    elif method == 'POST':
        response = session.post(url, **kwargs)
    else:
        raise NotImplementedError()

    if response.status_code not in xrange(200, 400):
        raise ServerApiError(response)

    return response
http.py 文件源码 项目:python-iotronicclient 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def log_curl_request(self, method, url, kwargs):
        curl = ['curl -i -X %s' % method]

        for (key, value) in kwargs['headers'].items():
            header = '-H \'%s: %s\'' % self._process_header(key, value)
            curl.append(header)

        if not self.session.verify:
            curl.append('-k')
        elif isinstance(self.session.verify, six.string_types):
            curl.append('--cacert %s' % self.session.verify)

        if self.session.cert:
            curl.append('--cert %s' % self.session.cert[0])
            curl.append('--key %s' % self.session.cert[1])

        if 'body' in kwargs:
            body = strutils.mask_password(kwargs['body'])
            curl.append('-d \'%s\'' % body)

        curl.append(urlparse.urljoin(self.endpoint_trimmed, url))
        LOG.debug(' '.join(curl))
adapters.py 文件源码 项目:django-scim2 作者: 15five 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def resource_type_dict(cls, request=None):
        """
        Return a ``dict`` containing ResourceType metadata for the user object.
        """
        id_ = cls.resource_type
        path = reverse('scim:resource-types', kwargs={'uuid': id_})
        location = urljoin(get_base_scim_location_getter()(request), path)
        return {
            'schemas': [constants.SchemaURI.RESOURCE_TYPE],
            'id': id_,
            'name': 'User',
            'endpoint': reverse('scim:users'),
            'description': 'User Account',
            'schema': constants.SchemaURI.USER,
            'meta': {
                'location': location,
                'resourceType': 'ResourceType'
            }
        }
adapters.py 文件源码 项目:django-scim2 作者: 15five 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def resource_type_dict(cls, request=None):
        """
        Return a ``dict`` containing ResourceType metadata for the group object.
        """
        id_ = cls.resource_type
        path = reverse('scim:resource-types', kwargs={'uuid': id_})
        location = urljoin(get_base_scim_location_getter()(request), path)
        return {
            'schemas': [constants.SchemaURI.RESOURCE_TYPE],
            'id': id_,
            'name': 'Group',
            'endpoint': reverse('scim:groups'),
            'description': 'Group',
            'schema': constants.SchemaURI.GROUP,
            'meta': {
                'location': location,
                'resourceType': 'ResourceType'
            }
        }
sf_scrummaster.py 文件源码 项目:software-factory 作者: softwarefactory-project 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, url, api_key, project_group, board):
        self.url = url
        self.cookie = get_cookie(url, api_key=api_key)
        self.client = SFStoryboard(urljoin(url, "storyboard_api"),
                                   self.cookie)
        try:
            self.project_group = self.client.project_groups.find(
                name=project_group)
        except exceptions.NotFound:
            raise Exception('projects group not found')
        self.stories = self.client.stories.get_all(
            project_group_id=self.project_group.id)
        self.board_id = None
        if board:
            try:
                self.board_id = self.client.boards.find(title=board).id
            except exceptions.NotFound:
                raise Exception('board not found')
        self.board_lanes = {}
        for lane in self.client.worklists.get_all(board_id=self.board_id):
            if not lane.archived and lane.title in LANES:
                self.board_lanes[lane.title] = lane
depotcontroller.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __network_ping(self):
                try:
                        repourl = urljoin(self.get_depot_url(),
                            "versions/0")
                        # Disable SSL peer verification, we just want to check
                        # if the depot is running.
                        url = urlopen(repourl,
                            context=ssl._create_unverified_context())
                        url.close()
                except HTTPError as e:
                        # Server returns NOT_MODIFIED if catalog is up
                        # to date
                        if e.code == http_client.NOT_MODIFIED:
                                return True
                        else:
                                return False
                except URLError as e:
                        return False
                return True
t_pkg_depotd.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_bug_5366(self):
                """Publish a package with slashes in the name, and then verify
                that the depot manifest and info operations work regardless of
                the encoding."""
                depot_url = self.dc.get_depot_url()
                plist = self.pkgsend_bulk(depot_url, self.system10)
                # First, try it un-encoded.
                repourl = urljoin(depot_url, "info/0/{0}".format(plist[0]))
                urlopen(repourl)
                repourl = urljoin(depot_url, "manifest/0/{0}".format(
                    plist[0]))
                urlopen(repourl)
                # Second, try it encoded.
                repourl = urljoin(depot_url, "info/0/{0}".format(
                    quote(plist[0])))
                urlopen(repourl)
                repourl = urljoin(depot_url, "manifest/0/{0}".format(
                    quote(plist[0])))
                urlopen(repourl)
t_pkg_depotd.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_bug_15482(self):
                """Test to make sure BUI search doesn't trigger a traceback."""

                # Now update the repository configuration while the depot is
                # stopped so changes won't be overwritten on exit.
                self.__update_repo_config()

                # Start the depot.
                self.dc.start()

                # Then, publish some packages we can abuse for testing.
                durl = self.dc.get_depot_url()
                self.pkgsend_bulk(durl, self.quux10, refresh_index=True)

                surl = urljoin(durl,
                    "en/search.shtml?action=Search&token=*")
                urlopen(surl).read()
                surl = urljoin(durl,
                    "en/advanced_search.shtml?action=Search&token=*")
                urlopen(surl).read()
                surl = urljoin(durl,
                    "en/advanced_search.shtml?token=*&show=a&rpp=50&"
                    "action=Advanced+Search")
                urlopen(surl).read()
pkg5unittest.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _network_ping(self):
                try:
                        # Ping the versions URL, rather than the default /
                        # so that we don't initialize the BUI code yet.
                        repourl = urljoin(self.url, "versions/0")
                        # Disable SSL peer verification, we just want to check
                        # if the depot is running.
                        urlopen(repourl,
                            context=ssl._create_unverified_context())
                except HTTPError as e:
                        if e.code == http_client.FORBIDDEN:
                                return True
                        return False
                except URLError:
                        return False
                return True
bittorrent.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _lookup_torrent_url_fn():
        """Load a "fetcher" func to get the right torrent URL.
        """

        if CONF.xenserver.torrent_base_url:
            if '/' not in CONF.xenserver.torrent_base_url:
                LOG.warning(_LW('Value specified in conf file for'
                             ' xenserver.torrent_base_url does not contain a'
                             ' slash character, therefore it will not be used'
                             ' as part of the torrent URL. Specify a valid'
                             ' base URL as defined by RFC 1808 (see step 6).'))

            def _default_torrent_url_fn(image_id):
                return urlparse.urljoin(CONF.xenserver.torrent_base_url,
                                        "%s.torrent" % image_id)

            return _default_torrent_url_fn

        raise RuntimeError(_('Cannot create default bittorrent URL'
                             ' without xenserver.torrent_base_url'
                             ' configuration option set.'))
base_requests.py 文件源码 项目:django-anymail 作者: anymail 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_request_params(self, api_url):
        """Returns a dict of requests.request params that will send payload to the ESP.

        :param api_url: the base api_url for the backend
        :return: dict
        """
        api_endpoint = self.get_api_endpoint()
        if api_endpoint is not None:
            url = urljoin(api_url, api_endpoint)
        else:
            url = api_url

        return dict(
            method=self.method,
            url=url,
            params=self.params,
            data=self.serialize_data(),
            headers=self.headers,
            files=self.files,
            auth=self.auth,
            # json= is not here, because we prefer to do our own serialization
            #       to provide extra context in error messages
        )
test_local_server.py 文件源码 项目:htsget 作者: jeromekelleher 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def do_GET(self):
        url_map = {instance.url: instance for instance in self.server.test_instances}
        if self.path == self.ticket_path:
            self.send_response(200)
            self.end_headers()
            urls = [
                {
                    "url": urljoin(SERVER_URL, test_instance.url),
                    "headers": test_instance.headers
                } for test_instance in self.server.test_instances]
            ticket = {"htsget": {"urls": urls}}
            self.wfile.write(json.dumps(ticket).encode())
        elif self.path in url_map:
            instance = url_map[self.path]
            if instance.error_code is not None:
                self.send_error(instance.error_code)
            else:
                self.send_response(200)
                self.send_header("Content-Length", len(instance.data))
                if instance.truncate:
                    self.end_headers()
                    self.wfile.write(instance.data[:-1])
                    self.wfile.flush()
                else:
                    self.end_headers()
                    self.wfile.write(instance.data)
        else:
            self.send_error(404)
keystone.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _get_endpoint_url(request, endpoint_type, catalog=None):
    if getattr(request.user, "service_catalog", None):
        url = base.url_for(request,
                           service_type='identity',
                           endpoint_type=endpoint_type)
    else:
        auth_url = getattr(settings, 'OPENSTACK_KEYSTONE_URL')
        url = request.session.get('region_endpoint', auth_url)

    # TODO(gabriel): When the Service Catalog no longer contains API versions
    # in the endpoints this can be removed.
    url = url.rstrip('/')
    url = urlparse.urljoin(url, 'v%s' % VERSIONS.active)

    return url
context.py 文件源码 项目:ovirtcmd 作者: fbacchella 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def resolve_service_href(self, href):
        absolute_href = urljoin(self.api.url, href)

        # The first replace remove the root of ovirt location
        # The second replace is to remove the first / in the path
        # The str ensure that service_path is a str, not a unicode in python 2
        service_path = str(absolute_href.replace(self.api.url, "").replace("/", "", 1))

        new_service = self.api.service(service_path)
        return new_service
base.py 文件源码 项目:threatstack-python-client 作者: MyPureCloud 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def http_request(self, method, path, data=None, params=None):
        """ Wraps HTTP calls to ThrestStack API """

        s = Session()
        url = urljoin(self.BASE_URL, path)
        headers = {"Authorization": self.api_key}
        if self.org_id:
            headers[self.org_id_header] = self.org_id

        req = Request(
            method,
            url,
            headers=headers,
            data=data,
            params=params
        )
        prepped = req.prepare()
        resp = s.send(prepped, timeout=self.timeout)
        if resp.status_code == 429:
            raise errors.APIRateLimitError("Threat Stack API rate limit exceeded")
        else:
            return self.handle_response(resp)


问题


面经


文章

微信
公众号

扫码关注公众号