def _negotiate_socks(self, addr, proxy_addr):
parsed = urlparse(proxy_addr[0])
if parsed.scheme == 'socks5':
socks_version, rdns = 2, False
elif parsed.scheme == 'socks5h':
socks_version, rdns = 2, True
elif parsed.scheme == 'socks4':
socks_version, rdns = 1, False
elif parsed.scheme == 'socks4a':
socks_version, rdns = 1, True
else:
raise ValueError(
'Unable to determine SOCKS version from %s' % addr[0])
username, password = get_auth_from_url(addr[0])
stream = SockIOStream((
socks_version, rdns, parsed.hostname, proxy_addr[1], username, password))
return stream.connect(*addr)
python类urlparse()的实例源码
def get_connection(self, url, proxies=None):
proxies = proxies or {}
proxy = proxies.get(urlparse(url.lower()).scheme)
if proxy:
raise ValueError('%s does not support specifying proxies'
% self.__class__.__name__)
with self.pools.lock:
pool = self.pools.get(url)
if pool:
return pool
pool = UnixHTTPConnectionPool(url, self.timeout)
self.pools[url] = pool
return pool
def get_connection(self, url, proxies=None):
proxies = proxies or {}
proxy = proxies.get(urlparse(url.lower()).scheme)
if proxy:
raise ValueError('%s does not support specifying proxies'
% self.__class__.__name__)
with self.pools.lock:
pool = self.pools.get(url)
if pool:
return pool
pool = UnixHTTPConnectionPool(url, self.timeout)
self.pools[url] = pool
return pool
def authenticate_user(self, response, **kwargs):
"""Handles user authentication with gssapi/kerberos"""
host = urlparse(response.url).hostname
try:
auth_header = self.generate_request_header(response, host)
except KerberosExchangeError:
# GSS Failure, return existing response
return response
log.debug("authenticate_user(): Authorization header: {0}".format(
auth_header))
response.request.headers['Authorization'] = auth_header
# Consume the content so we can reuse the connection for the next
# request.
response.content
response.raw.release_conn()
_r = response.connection.send(response.request, **kwargs)
_r.history.append(response)
log.debug("authenticate_user(): returning {0}".format(_r))
return _r
def __call__(self, request):
if self.force_preemptive:
# add Authorization header before we receive a 401
# by the 401 handler
host = urlparse(request.url).hostname
auth_header = self.generate_request_header(None, host, is_preemptive=True)
log.debug("HTTPKerberosAuth: Preemptive Authorization header: {0}".format(auth_header))
request.headers['Authorization'] = auth_header
request.register_hook('response', self.handle_response)
try:
self.pos = request.body.tell()
except AttributeError:
# In the case of HTTPKerberosAuth being reused and the body
# of the previous request was a file-like object, pos has
# the file position of the previous body. Ensure it's set to
# None.
self.pos = None
return request
def _build_message(self, request):
"""
Builds a string representation of the message contained in the request so it can be
digested for HMAC generation
"""
url = urlparse(request.url)
# THe version 1 spec of the HmacSignature class calls for the message to be signed
# formatted as the following elements, each separated by a newline character:
# * UserId (same value as used in Authorization header)
# * HTTP Method (e.g. GET, POST)
# * HTTP Host (e.g. server.example.org)
# * Request path (e.g. /path/to/resource/)
# * SORTED query string, keyed by natural UTF8 byte-ordering of names
# * Request Body
delimiter = '\n'
msg = delimiter.join((
self._USERNAME or '',
request.method,
url.netloc,
url.path,
self._sort_parameters(url.query),
request.body or '',
))
return msg
def insert_spoofed_https_csrf_headers(headers, base_url):
"""
Creates HTTP headers that help to work around Django's CSRF protection, which shouldn't apply
outside of the browser context.
:param headers: a dictionary into which headers will be inserted, if needed
:param base_url: the base URL of the Django application being contacted
"""
# if connecting to Django/DRF via HTTPS, spoof the 'Host' and 'Referer' headers that Django
# uses to help prevent cross-site scripting attacks for secure browser connections. This
# should be OK for a standalone Python REST API client, since the origin of a
# cross-site scripting attack is malicious website code that executes in a browser,
# but accesses another site's credentials via the browser or via user prompts within the
# browser. Not applicable in this case for a standalone REST API client.
# References:
# https://docs.djangoproject.com/en/dev/ref/csrf/#how-it-works
# http://security.stackexchange.com/questions/96114/why-is-referer-checking-needed-for-django
# http://mathieu.fenniak.net/is-your-web-api-susceptible-to-a-csrf-exploit/
# -to-prevent-csrf
if urlparse(base_url).scheme == 'https':
headers['Host'] = urlsplit(base_url).netloc
headers['Referer'] = base_url # LOL! Bad spelling is now standard :-)
def _process_query_dict(self, search_terms, entry_types, blast_program, blast_sequence,
search_web, sort_field, sort_ascending, page_number):
query_dict = {}
query_url = None # TODO: re-instate this parameter if we can get ICE to support the same
# queries in GET as in POST...should simplify client use
if not query_url:
if search_terms:
query_dict['queryString'] = search_terms
if entry_types:
if not set(entry_types).issubset(set(ICE_ENTRY_TYPES)):
raise KeyError('')
query_dict['entryTypes'] = entry_types
self._process_query_blast(query_dict, blast_program, blast_sequence)
query_dict['webSearch'] = search_web # Note: affects results even if false?
self._process_query_parameters(query_dict, sort_field, sort_ascending, page_number)
else:
# un-parse the query URL so we're using consistently following the same code path
query_dict = parse_qs(urlparse(query_url).params)
return query_dict
def get_connection(self, url, proxies=None):
"""Returns a urllib3 connection for the given URL. This should not be
called from user code, and is only exposed for use when subclassing the
:class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.
:param url: The URL to connect to.
:param proxies: (optional) A Requests-style dictionary of proxies used on this request.
"""
proxies = proxies or {}
proxy = proxies.get(urlparse(url.lower()).scheme)
if proxy:
proxy_headers = self.proxy_headers(proxy)
if proxy not in self.proxy_manager:
self.proxy_manager[proxy] = proxy_from_url(
proxy,
proxy_headers=proxy_headers,
num_pools=self._pool_connections,
maxsize=self._pool_maxsize,
block=self._pool_block
)
conn = self.proxy_manager[proxy].connection_from_url(url)
else:
# Only scheme should be lower case
parsed = urlparse(url)
url = parsed.geturl()
conn = self.poolmanager.connection_from_url(url)
self.connections.append(conn)
return conn
def _create_stream(self, max_buffer_size, af, addr, source_ip=None,
source_port=None):
# Always connect in plaintext; we'll convert to ssl if necessary
# after one connection has completed.
source_port_bind = source_port if isinstance(source_port, int) else 0
source_ip_bind = source_ip
socket_obj = socket.socket(af)
set_close_exec(socket_obj.fileno())
try:
stream = IOStream(socket_obj,
io_loop=self.io_loop,
max_buffer_size=max_buffer_size)
# connect proxy
if source_port_bind or source_ip_bind:
@gen.coroutine
def _(addr):
proxy_headers = get_proxy_headers(source_ip_bind)
parsed = urlparse(source_ip_bind)
scheme, host, port = parsed.scheme, parsed.hostname, source_port_bind
if 'socks' in scheme:
r = yield self._negotiate_socks(addr, (source_ip_bind, source_port_bind))
raise gen.Return(r)
elif scheme in ('http', 'https'):
r = yield stream.connect((host, port))
if scheme == 'https':
yield self._connect_tunnel(stream, addr, proxy_headers)
raise gen.Return(r)
else:
raise AttributeError('Unknown scheme: %s' % scheme)
return _(addr)
else:
return stream.connect(addr)
except socket.error as e:
fu = Future()
fu.set_exception(e)
return fu
def send(self, stream=False, timeout=None, verify=True,
cert=None, proxies=None):
request = self.request
connect_timeout, self.read_timeout = parse_timeout(timeout)
self.stream_body = stream
# set connect timeout
with stack_context.ExceptionStackContext(self._handle_exception):
if connect_timeout:
self._timeout = self.io_loop.call_later(connect_timeout,
stack_context.wrap(functools.partial(
self._on_timeout, 'while connecting')))
# set proxy related info
proxy = select_proxy(request.url, proxies)
self.headers = request.headers.copy()
if proxy:
proxy = prepend_scheme_if_needed(proxy, 'http')
parsed = urlparse(proxy)
scheme, host, port = parsed.scheme, proxy, parsed.port
port = port or (443 if scheme == 'https' else 80)
self.start_line = RequestStartLine(request.method, request.url, '')
self.headers.update(get_proxy_headers(proxy))
else:
host, port = None, None
self.start_line = request.start_line
self.tcp_client.connect(
request.host, request.port,
af=request.af,
ssl_options=self._get_ssl_options(request, verify, cert),
max_buffer_size=self.max_buffer_size,
source_ip=host, source_port=port,
callback=self._on_connect)
def get_string_to_sign(self, request, headers):
sts = []
for header in headers:
if header == "(request-target)":
path_url = requests.models.RequestEncodingMixin.path_url.fget(request)
sts.append("(request-target): {} {}".format(request.method.lower(), path_url))
else:
if header.lower() == "host":
value = request.headers.get("host", urlparse(request.url).hostname)
else:
value = request.headers[header]
sts.append("{k}: {v}".format(k=header.lower(), v=value))
return "\n".join(sts).encode()
def get_uri_name(url):
"""Gets the file name from the end of the URL. Only useful for PyBEL's testing though since it looks specifically
if the file is from the weird owncloud resources distributed by Fraunhofer"""
url_parsed = urlparse(url)
if url.startswith(FRAUNHOFER_RESOURCES):
return url_parsed.query.split('=')[-1]
else:
url_parts = url_parsed.path.split('/')
return url_parts[-1]
def is_url(s):
"""Checks if a string is a valid URL
:param str s: An input string
:return: Is the string a valid URL?
:rtype: bool
"""
return urlparse(s).scheme != ""
def connect(self):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
socket_path = unquote(urlparse(self.unix_socket_url).netloc)
sock.connect(socket_path)
self.sock = sock
def get_path_to_file_from_url(cls, url):
"""standard file path
:param str url: download URL
"""
file_name = urlparse(url).path.split('/')[-1]
return os.path.join(PYUNIPROT_DATA_DIR, file_name)
def _key_from_url(url):
parsed = urlparse(url)
return urlunparse((parsed.scheme.lower(),
parsed.netloc.lower(),
'', '', '', ''))
def _build_request_path(url, proxy_info):
uri = compat.urlparse(url)
proxy_url = proxy_info.get('request_path')
if proxy_url is not None:
return proxy_url, uri
request_path = _coerce_to_bytes(uri.path)
if uri.query:
request_path += b'?' + _coerce_to_bytes(uri.query)
return request_path, uri
def _key_from_url(url):
parsed = urlparse(url)
return urlunparse((parsed.scheme.lower(),
parsed.netloc.lower(),
'', '', '', ''))
def _build_request_path(url, proxy_info):
uri = compat.urlparse(url)
proxy_url = proxy_info.get('request_path')
if proxy_url is not None:
return proxy_url, uri
request_path = _coerce_to_bytes(uri.path)
if uri.query:
request_path += b'?' + _coerce_to_bytes(uri.query)
return request_path, uri
def preprocess_media_tags(element):
if isinstance(element, html.HtmlElement):
if element.tag in ['ol', 'ul']:
# ignore any spaces between <ul> and <li>
element.text = ''
elif element.tag == 'li':
# ignore spaces after </li>
element.tail = ''
elif element.tag == 'iframe':
iframe_src = element.get('src')
youtube = re.match(youtube_re, iframe_src)
vimeo = re.match(vimeo_re, iframe_src)
if youtube or vimeo:
element.text = '' # ignore any legacy text
if youtube:
yt_id = urlparse(iframe_src).path.replace('/embed/', '')
element.set('src', '/embed/youtube?url=' + quote_plus('https://www.youtube.com/watch?v=' + yt_id))
elif vimeo:
element.set('src', '/embed/vimeo?url=' + quote_plus('https://vimeo.com/' + vimeo.group(2)))
if not len(element.xpath('./ancestor::figure')):
_wrap_figure(element)
else:
element.drop_tag()
elif element.tag == 'blockquote' and element.get('class') == 'twitter-tweet':
twitter_links = element.xpath('.//a[@href]')
for tw_link in twitter_links:
if twitter_re.match(tw_link.get('href')):
twitter_frame = html.HtmlElement()
twitter_frame.tag = 'iframe'
twitter_frame.set('src', '/embed/twitter?url=' + quote_plus(tw_link.get('href')))
element.addprevious(twitter_frame)
_wrap_figure(twitter_frame)
element.drop_tree()
def connect(self):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
socket_path = unquote(urlparse(self.unix_socket_url).netloc)
sock.connect(socket_path)
self.sock = sock
def __init__(self, count, url, cls, session, params=None, etag=None,
headers=None):
GitHubCore.__init__(self, {}, session)
#: Original number of items requested
self.original = count
#: Number of items left in the iterator
self.count = count
#: URL the class used to make it's first GET
self.url = url
#: Last URL that was requested
self.last_url = None
self._api = self.url
#: Class for constructing an item to return
self.cls = cls
#: Parameters of the query string
self.params = params or {}
self._remove_none(self.params)
# We do not set this from the parameter sent. We want this to
# represent the ETag header returned by GitHub no matter what.
# If this is not None, then it won't be set from the response and
# that's not what we want.
#: The ETag Header value returned by GitHub
self.etag = None
#: Headers generated for the GET request
self.headers = headers or {}
#: The last response seen
self.last_response = None
#: Last status code received
self.last_status = 0
if etag:
self.headers.update({'If-None-Match': etag})
self.path = urlparse(self.url).path
def _api(self, uri):
self._uri = urlparse(uri)
def _key_from_url(url):
parsed = urlparse(url)
return urlunparse((parsed.scheme.lower(),
parsed.netloc.lower(),
'', '', '', ''))
def _build_request_path(url, proxy_info):
uri = compat.urlparse(url)
proxy_url = proxy_info.get('request_path')
if proxy_url is not None:
return proxy_url, uri
request_path = _coerce_to_bytes(uri.path)
if uri.query:
request_path += b'?' + _coerce_to_bytes(uri.query)
return request_path, uri
def get_path_to_file_from_url(cls, url):
"""standard file path
:param str url: CTD download URL
"""
file_name = urlparse(url).path.split('/')[-1]
return os.path.join(cls.pyctd_data_dir, file_name)
def __init__(self, count, url, cls, session, params=None, etag=None,
headers=None):
models.GitHubCore.__init__(self, {}, session)
#: Original number of items requested
self.original = count
#: Number of items left in the iterator
self.count = count
#: URL the class used to make it's first GET
self.url = url
#: Last URL that was requested
self.last_url = None
self._api = self.url
#: Class for constructing an item to return
self.cls = cls
#: Parameters of the query string
self.params = params or {}
self._remove_none(self.params)
# We do not set this from the parameter sent. We want this to
# represent the ETag header returned by GitHub no matter what.
# If this is not None, then it won't be set from the response and
# that's not what we want.
#: The ETag Header value returned by GitHub
self.etag = None
#: Headers generated for the GET request
self.headers = headers or {}
#: The last response seen
self.last_response = None
#: Last status code received
self.last_status = 0
if etag:
self.headers.update({'If-None-Match': etag})
self.path = urlparse(self.url).path
def _api(self, uri):
self._uri = urlparse(uri)
self.url = uri
def authenticate_server(self, response):
"""
Uses GSSAPI to authenticate the server.
Returns True on success, False on failure.
"""
log.debug("authenticate_server(): Authenticate header: {0}".format(
_negotiate_value(response)))
host = urlparse(response.url).hostname
try:
result = kerberos.authGSSClientStep(self.context[host],
_negotiate_value(response))
except kerberos.GSSError:
log.exception("authenticate_server(): authGSSClientStep() failed:")
return False
if result < 1:
log.error("authenticate_server(): authGSSClientStep() failed: "
"{0}".format(result))
return False
log.debug("authenticate_server(): returning {0}".format(response))
return True