python类urlparse()的实例源码

adapters.py 文件源码 项目:trip 作者: littlecodersh 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _negotiate_socks(self, addr, proxy_addr):
        parsed = urlparse(proxy_addr[0])

        if parsed.scheme == 'socks5':
            socks_version, rdns = 2, False
        elif parsed.scheme == 'socks5h':
            socks_version, rdns = 2, True
        elif parsed.scheme == 'socks4':
            socks_version, rdns = 1, False
        elif parsed.scheme == 'socks4a':
            socks_version, rdns = 1, True
        else:
            raise ValueError(
                'Unable to determine SOCKS version from %s' % addr[0])
        username, password = get_auth_from_url(addr[0])
        stream = SockIOStream((
            socks_version, rdns, parsed.hostname, proxy_addr[1], username, password))
        return stream.connect(*addr)
adapters.py 文件源码 项目:deb-python-requests-unixsocket 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_connection(self, url, proxies=None):
        proxies = proxies or {}
        proxy = proxies.get(urlparse(url.lower()).scheme)

        if proxy:
            raise ValueError('%s does not support specifying proxies'
                             % self.__class__.__name__)

        with self.pools.lock:
            pool = self.pools.get(url)
            if pool:
                return pool

            pool = UnixHTTPConnectionPool(url, self.timeout)
            self.pools[url] = pool

        return pool
unixadapter.py 文件源码 项目:commissaire 作者: projectatomic 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_connection(self, url, proxies=None):
        proxies = proxies or {}
        proxy = proxies.get(urlparse(url.lower()).scheme)

        if proxy:
            raise ValueError('%s does not support specifying proxies'
                             % self.__class__.__name__)

        with self.pools.lock:
            pool = self.pools.get(url)
            if pool:
                return pool

            pool = UnixHTTPConnectionPool(url, self.timeout)
            self.pools[url] = pool

        return pool
kerberos_.py 文件源码 项目:deb-python-requests-kerberos 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def authenticate_user(self, response, **kwargs):
        """Handles user authentication with gssapi/kerberos"""

        host = urlparse(response.url).hostname

        try:
            auth_header = self.generate_request_header(response, host)
        except KerberosExchangeError:
            # GSS Failure, return existing response
            return response

        log.debug("authenticate_user(): Authorization header: {0}".format(
            auth_header))
        response.request.headers['Authorization'] = auth_header

        # Consume the content so we can reuse the connection for the next
        # request.
        response.content
        response.raw.release_conn()

        _r = response.connection.send(response.request, **kwargs)
        _r.history.append(response)

        log.debug("authenticate_user(): returning {0}".format(_r))
        return _r
kerberos_.py 文件源码 项目:deb-python-requests-kerberos 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __call__(self, request):
        if self.force_preemptive:
            # add Authorization header before we receive a 401
            # by the 401 handler
            host = urlparse(request.url).hostname

            auth_header = self.generate_request_header(None, host, is_preemptive=True)

            log.debug("HTTPKerberosAuth: Preemptive Authorization header: {0}".format(auth_header))

            request.headers['Authorization'] = auth_header

        request.register_hook('response', self.handle_response)
        try:
            self.pos = request.body.tell()
        except AttributeError:
            # In the case of HTTPKerberosAuth being reused and the body
            # of the previous request was a file-like object, pos has
            # the file position of the previous body. Ensure it's set to
            # None.
            self.pos = None
        return request
auth.py 文件源码 项目:edd 作者: JBEI 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _build_message(self, request):
        """
        Builds a string representation of the message contained in the request so it can be
        digested for HMAC generation
        """
        url = urlparse(request.url)

        # THe version 1 spec of the HmacSignature class calls for the message to be signed
        #   formatted as the following elements, each separated by a newline character:
        #   * UserId (same value as used in Authorization header)
        #   * HTTP Method (e.g. GET, POST)
        #   * HTTP Host (e.g. server.example.org)
        #   * Request path (e.g. /path/to/resource/)
        #   * SORTED query string, keyed by natural UTF8 byte-ordering of names
        #   * Request Body
        delimiter = '\n'
        msg = delimiter.join((
            self._USERNAME or '',
            request.method,
            url.netloc,
            url.path,
            self._sort_parameters(url.query),
            request.body or '',
        ))
        return msg
auth.py 文件源码 项目:edd 作者: JBEI 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def insert_spoofed_https_csrf_headers(headers, base_url):
    """
    Creates HTTP headers that help to work around Django's CSRF protection, which shouldn't apply
    outside of the browser context.
    :param headers: a dictionary into which headers will be inserted, if needed
    :param base_url: the base URL of the Django application being contacted
    """
    # if connecting to Django/DRF via HTTPS, spoof the 'Host' and 'Referer' headers that Django
    # uses to help prevent cross-site scripting attacks for secure browser connections. This
    # should be OK for a standalone Python REST API client, since the origin of a
    # cross-site scripting attack is malicious website code that executes in a browser,
    # but accesses another site's credentials via the browser or via user prompts within the
    # browser. Not applicable in this case for a standalone REST API client.
    # References:
    # https://docs.djangoproject.com/en/dev/ref/csrf/#how-it-works
    # http://security.stackexchange.com/questions/96114/why-is-referer-checking-needed-for-django
    # http://mathieu.fenniak.net/is-your-web-api-susceptible-to-a-csrf-exploit/
    # -to-prevent-csrf
    if urlparse(base_url).scheme == 'https':
        headers['Host'] = urlsplit(base_url).netloc
        headers['Referer'] = base_url  # LOL! Bad spelling is now standard :-)
api.py 文件源码 项目:edd 作者: JBEI 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _process_query_dict(self, search_terms, entry_types, blast_program, blast_sequence,
                            search_web, sort_field, sort_ascending, page_number):
        query_dict = {}
        query_url = None  # TODO: re-instate this parameter if we can get ICE to support the same
        # queries in GET as in POST...should simplify client use
        if not query_url:
            if search_terms:
                query_dict['queryString'] = search_terms
            if entry_types:
                if not set(entry_types).issubset(set(ICE_ENTRY_TYPES)):
                    raise KeyError('')
                query_dict['entryTypes'] = entry_types
            self._process_query_blast(query_dict, blast_program, blast_sequence)
            query_dict['webSearch'] = search_web  # Note: affects results even if false?
            self._process_query_parameters(query_dict, sort_field, sort_ascending, page_number)
        else:
            # un-parse the query URL so we're using consistently following the same code path
            query_dict = parse_qs(urlparse(query_url).params)
        return query_dict
asyncadapter.py 文件源码 项目:plex-for-kodi-mod 作者: mrclemds 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_connection(self, url, proxies=None):
        """Returns a urllib3 connection for the given URL. This should not be
        called from user code, and is only exposed for use when subclassing the
        :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.

        :param url: The URL to connect to.
        :param proxies: (optional) A Requests-style dictionary of proxies used on this request.
        """
        proxies = proxies or {}
        proxy = proxies.get(urlparse(url.lower()).scheme)

        if proxy:
            proxy_headers = self.proxy_headers(proxy)

            if proxy not in self.proxy_manager:
                self.proxy_manager[proxy] = proxy_from_url(
                    proxy,
                    proxy_headers=proxy_headers,
                    num_pools=self._pool_connections,
                    maxsize=self._pool_maxsize,
                    block=self._pool_block
                )

            conn = self.proxy_manager[proxy].connection_from_url(url)
        else:
            # Only scheme should be lower case
            parsed = urlparse(url)
            url = parsed.geturl()
            conn = self.poolmanager.connection_from_url(url)

        self.connections.append(conn)
        return conn
adapters.py 文件源码 项目:trip 作者: littlecodersh 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _create_stream(self, max_buffer_size, af, addr, source_ip=None,
            source_port=None):
        # Always connect in plaintext; we'll convert to ssl if necessary
        # after one connection has completed.
        source_port_bind = source_port if isinstance(source_port, int) else 0
        source_ip_bind = source_ip

        socket_obj = socket.socket(af)
        set_close_exec(socket_obj.fileno())
        try:
            stream = IOStream(socket_obj,
                              io_loop=self.io_loop,
                              max_buffer_size=max_buffer_size)

            # connect proxy
            if source_port_bind or source_ip_bind:
                @gen.coroutine
                def _(addr):
                    proxy_headers = get_proxy_headers(source_ip_bind)
                    parsed = urlparse(source_ip_bind)
                    scheme, host, port = parsed.scheme, parsed.hostname, source_port_bind
                    if 'socks' in scheme:
                        r = yield self._negotiate_socks(addr, (source_ip_bind, source_port_bind))
                        raise gen.Return(r)
                    elif scheme in ('http', 'https'):
                        r = yield stream.connect((host, port))
                        if scheme == 'https':
                            yield self._connect_tunnel(stream, addr, proxy_headers)
                        raise gen.Return(r)
                    else:
                        raise AttributeError('Unknown scheme: %s' % scheme)
                return _(addr)
            else:
                return stream.connect(addr)
        except socket.error as e:
            fu = Future()
            fu.set_exception(e)
            return fu
adapters.py 文件源码 项目:trip 作者: littlecodersh 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def send(self, stream=False, timeout=None, verify=True,
             cert=None, proxies=None):
        request = self.request
        connect_timeout, self.read_timeout = parse_timeout(timeout)
        self.stream_body = stream

        # set connect timeout
        with stack_context.ExceptionStackContext(self._handle_exception):
            if connect_timeout:
                self._timeout = self.io_loop.call_later(connect_timeout,
                    stack_context.wrap(functools.partial(
                        self._on_timeout, 'while connecting')))

            # set proxy related info
            proxy = select_proxy(request.url, proxies)
            self.headers = request.headers.copy()
            if proxy:
                proxy = prepend_scheme_if_needed(proxy, 'http')
                parsed = urlparse(proxy)
                scheme, host, port = parsed.scheme, proxy, parsed.port
                port = port or (443 if scheme == 'https' else 80)
                self.start_line = RequestStartLine(request.method, request.url, '')
                self.headers.update(get_proxy_headers(proxy))
            else:
                host, port = None, None
                self.start_line = request.start_line

            self.tcp_client.connect(
                request.host, request.port,
                af=request.af,
                ssl_options=self._get_ssl_options(request, verify, cert),
                max_buffer_size=self.max_buffer_size,
                source_ip=host, source_port=port,
                callback=self._on_connect)
__init__.py 文件源码 项目:requests-http-signature 作者: kislyuk 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_string_to_sign(self, request, headers):
        sts = []
        for header in headers:
            if header == "(request-target)":
                path_url = requests.models.RequestEncodingMixin.path_url.fget(request)
                sts.append("(request-target): {} {}".format(request.method.lower(), path_url))
            else:
                if header.lower() == "host":
                    value = request.headers.get("host", urlparse(request.url).hostname)
                else:
                    value = request.headers[header]
                sts.append("{k}: {v}".format(k=header.lower(), v=value))
        return "\n".join(sts).encode()
constants.py 文件源码 项目:pybel 作者: pybel 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_uri_name(url):
    """Gets the file name from the end of the URL. Only useful for PyBEL's testing though since it looks specifically
    if the file is from the weird owncloud resources distributed by Fraunhofer"""
    url_parsed = urlparse(url)

    if url.startswith(FRAUNHOFER_RESOURCES):
        return url_parsed.query.split('=')[-1]
    else:
        url_parts = url_parsed.path.split('/')
        return url_parts[-1]
utils.py 文件源码 项目:pybel 作者: pybel 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def is_url(s):
    """Checks if a string is a valid URL

    :param str s: An input string
    :return: Is the string a valid URL?
    :rtype: bool
    """
    return urlparse(s).scheme != ""
adapters.py 文件源码 项目:deb-python-requests-unixsocket 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def connect(self):
        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        sock.settimeout(self.timeout)
        socket_path = unquote(urlparse(self.unix_socket_url).netloc)
        sock.connect(socket_path)
        self.sock = sock
database.py 文件源码 项目:pyuniprot 作者: cebel 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_path_to_file_from_url(cls, url):
        """standard file path

        :param str url: download URL
        """
        file_name = urlparse(url).path.split('/')[-1]
        return os.path.join(PYUNIPROT_DATA_DIR, file_name)
handler.py 文件源码 项目:flickr_downloader 作者: Denisolt 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _key_from_url(url):
        parsed = urlparse(url)
        return urlunparse((parsed.scheme.lower(),
                           parsed.netloc.lower(),
                           '', '', '', ''))
dump.py 文件源码 项目:flickr_downloader 作者: Denisolt 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _build_request_path(url, proxy_info):
    uri = compat.urlparse(url)
    proxy_url = proxy_info.get('request_path')
    if proxy_url is not None:
        return proxy_url, uri

    request_path = _coerce_to_bytes(uri.path)
    if uri.query:
        request_path += b'?' + _coerce_to_bytes(uri.query)

    return request_path, uri
handler.py 文件源码 项目:Liljimbo-Chatbot 作者: chrisjim316 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _key_from_url(url):
        parsed = urlparse(url)
        return urlunparse((parsed.scheme.lower(),
                           parsed.netloc.lower(),
                           '', '', '', ''))
dump.py 文件源码 项目:Liljimbo-Chatbot 作者: chrisjim316 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _build_request_path(url, proxy_info):
    uri = compat.urlparse(url)
    proxy_url = proxy_info.get('request_path')
    if proxy_url is not None:
        return proxy_url, uri

    request_path = _coerce_to_bytes(uri.path)
    if uri.query:
        request_path += b'?' + _coerce_to_bytes(uri.query)

    return request_path, uri
html_to_telegraph.py 文件源码 项目:html-telegraph-poster 作者: mercuree 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def preprocess_media_tags(element):
    if isinstance(element, html.HtmlElement):
        if element.tag in ['ol', 'ul']:
            # ignore any spaces between <ul> and <li>
            element.text = ''
        elif element.tag == 'li':
            # ignore spaces after </li>
            element.tail = ''
        elif element.tag == 'iframe':
            iframe_src = element.get('src')

            youtube = re.match(youtube_re, iframe_src)
            vimeo = re.match(vimeo_re, iframe_src)
            if youtube or vimeo:
                element.text = ''  # ignore any legacy text
                if youtube:
                    yt_id = urlparse(iframe_src).path.replace('/embed/', '')
                    element.set('src', '/embed/youtube?url=' + quote_plus('https://www.youtube.com/watch?v=' + yt_id))
                elif vimeo:
                    element.set('src', '/embed/vimeo?url=' + quote_plus('https://vimeo.com/' + vimeo.group(2)))

                if not len(element.xpath('./ancestor::figure')):
                    _wrap_figure(element)
            else:
                element.drop_tag()

        elif element.tag == 'blockquote' and element.get('class') == 'twitter-tweet':
            twitter_links = element.xpath('.//a[@href]')
            for tw_link in twitter_links:
                if twitter_re.match(tw_link.get('href')):
                    twitter_frame = html.HtmlElement()
                    twitter_frame.tag = 'iframe'
                    twitter_frame.set('src', '/embed/twitter?url=' + quote_plus(tw_link.get('href')))
                    element.addprevious(twitter_frame)
                    _wrap_figure(twitter_frame)
                    element.drop_tree()
unixadapter.py 文件源码 项目:commissaire 作者: projectatomic 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def connect(self):
        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        sock.settimeout(self.timeout)
        socket_path = unquote(urlparse(self.unix_socket_url).netloc)
        sock.connect(socket_path)
        self.sock = sock
structs.py 文件源码 项目:aegea 作者: kislyuk 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, count, url, cls, session, params=None, etag=None,
                 headers=None):
        GitHubCore.__init__(self, {}, session)
        #: Original number of items requested
        self.original = count
        #: Number of items left in the iterator
        self.count = count
        #: URL the class used to make it's first GET
        self.url = url
        #: Last URL that was requested
        self.last_url = None
        self._api = self.url
        #: Class for constructing an item to return
        self.cls = cls
        #: Parameters of the query string
        self.params = params or {}
        self._remove_none(self.params)
        # We do not set this from the parameter sent. We want this to
        # represent the ETag header returned by GitHub no matter what.
        # If this is not None, then it won't be set from the response and
        # that's not what we want.
        #: The ETag Header value returned by GitHub
        self.etag = None
        #: Headers generated for the GET request
        self.headers = headers or {}
        #: The last response seen
        self.last_response = None
        #: Last status code received
        self.last_status = 0

        if etag:
            self.headers.update({'If-None-Match': etag})

        self.path = urlparse(self.url).path
models.py 文件源码 项目:aegea 作者: kislyuk 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _api(self, uri):
        self._uri = urlparse(uri)
handler.py 文件源码 项目:OctoFusion 作者: tapnair 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def _key_from_url(url):
        parsed = urlparse(url)
        return urlunparse((parsed.scheme.lower(),
                           parsed.netloc.lower(),
                           '', '', '', ''))
dump.py 文件源码 项目:OctoFusion 作者: tapnair 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _build_request_path(url, proxy_info):
    uri = compat.urlparse(url)
    proxy_url = proxy_info.get('request_path')
    if proxy_url is not None:
        return proxy_url, uri

    request_path = _coerce_to_bytes(uri.path)
    if uri.query:
        request_path += b'?' + _coerce_to_bytes(uri.query)

    return request_path, uri
database.py 文件源码 项目:pyctd 作者: cebel 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_path_to_file_from_url(cls, url):
        """standard file path

        :param str url: CTD download URL 
        """
        file_name = urlparse(url).path.split('/')[-1]
        return os.path.join(cls.pyctd_data_dir, file_name)
structs.py 文件源码 项目:gitsome 作者: donnemartin 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, count, url, cls, session, params=None, etag=None,
                 headers=None):
        models.GitHubCore.__init__(self, {}, session)
        #: Original number of items requested
        self.original = count
        #: Number of items left in the iterator
        self.count = count
        #: URL the class used to make it's first GET
        self.url = url
        #: Last URL that was requested
        self.last_url = None
        self._api = self.url
        #: Class for constructing an item to return
        self.cls = cls
        #: Parameters of the query string
        self.params = params or {}
        self._remove_none(self.params)
        # We do not set this from the parameter sent. We want this to
        # represent the ETag header returned by GitHub no matter what.
        # If this is not None, then it won't be set from the response and
        # that's not what we want.
        #: The ETag Header value returned by GitHub
        self.etag = None
        #: Headers generated for the GET request
        self.headers = headers or {}
        #: The last response seen
        self.last_response = None
        #: Last status code received
        self.last_status = 0

        if etag:
            self.headers.update({'If-None-Match': etag})

        self.path = urlparse(self.url).path
models.py 文件源码 项目:gitsome 作者: donnemartin 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _api(self, uri):
        self._uri = urlparse(uri)
        self.url = uri
kerberos_.py 文件源码 项目:deb-python-requests-kerberos 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def authenticate_server(self, response):
        """
        Uses GSSAPI to authenticate the server.

        Returns True on success, False on failure.
        """

        log.debug("authenticate_server(): Authenticate header: {0}".format(
            _negotiate_value(response)))

        host = urlparse(response.url).hostname

        try:
            result = kerberos.authGSSClientStep(self.context[host],
                                                _negotiate_value(response))
        except kerberos.GSSError:
            log.exception("authenticate_server(): authGSSClientStep() failed:")
            return False

        if result < 1:
            log.error("authenticate_server(): authGSSClientStep() failed: "
                      "{0}".format(result))
            return False

        log.debug("authenticate_server(): returning {0}".format(response))
        return True


问题


面经


文章

微信
公众号

扫码关注公众号