def reduce_uri(self, uri, default_port=True):
"""Accept authority or URI and extract only the authority and path."""
# note HTTP URLs do not have a userinfo component
parts = urlparse.urlsplit(uri)
if parts[1]:
# URI
scheme = parts[0]
authority = parts[1]
path = parts[2] or '/'
else:
# host or host:port
scheme = None
authority = uri
path = '/'
host, port = splitport(authority)
if default_port and port is None and scheme is not None:
dport = {"http": 80,
"https": 443,
}.get(scheme)
if dport is not None:
authority = "%s:%d" % (host, dport)
return authority, path
python类urlsplit()的实例源码
def getOtherRecipeLinks(self):
"""Return a list of other recipes found in the page: while single recipe
pages do not have links, the various categories at
http://www.williams-sonoma.com/recipe/ do.
For example,
http://www.williams-sonoma.com/search/results.html?activeTab=recipes&words=winter_weeknight_dinners
has a collection of individual recipe links, and this method will find them.
"""
data = []
for link in self.tree.xpath('//ul[@class="recipe-list"]/li/a'):
if 'href' in link.keys():
href = urlsplit(link.get('href'))
if 'cm_src=RECIPESEARCH' == href.query:
data.append(href.scheme + '://' + href.netloc + href.path)
return data
def run(self):
ind=self.qu.get()
url=self.url+str(ind)
soup =bs.BeautifulSoup(''.join( ul.urlopen(url).readlines() ))
bu = up.urlsplit(self.url)
print 'started with the ' ,str(url).split('/')[-1],
for i in soup.find_all(attrs = { "class" : "recipe-title"}):
sp = up.urlsplit(i.a.get('href'))
path = sp.path
print path
if re.search(pat, path):
path = bu.scheme+'://'+bu.netloc+path
filename = str(path).split('/')[-2]
filename = op.join(op.abspath(op.curdir),filename+'.py') # recipe will be stored in given location
# filename = op.join(op.abspath(op.curdir),filename+'.html')
#uncomment the above line if downloading the web page for teh recipe
print path
self.q.put((path,filename))
self.fetch_data()
time.sleep(1)
self.qu.task_done()
self.q.join()
print 'done with the ' ,str(url).split('/')[-1],
def get_version_from_url(url):
components = urlparse.urlsplit(url)
path = components.path
pos = path.find('/')
ver = ''
if pos == 0:
path = path[1:]
i = path.find('/')
if i >= 0:
ver = path[:i]
else:
ver = path
elif pos > 0:
ver = path[:pos]
else:
ver = path
return ver
def remove_trailing_version_from_href(href):
"""Removes the api version from the href.
Given: 'http://www.nova.com/compute/v1.1'
Returns: 'http://www.nova.com/compute'
Given: 'http://www.nova.com/v1.1'
Returns: 'http://www.nova.com'
"""
parsed_url = urlparse.urlsplit(href)
url_parts = parsed_url.path.rsplit('/', 1)
# NOTE: this should match vX.X or vX
expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)')
if not expression.match(url_parts.pop()):
raise ValueError('URL %s does not contain version' % href)
new_path = url_join(*url_parts)
parsed_url = list(parsed_url)
parsed_url[2] = new_path
return urlparse.urlunsplit(parsed_url)
def click(self, st):
"""Return a path which is the URL where a browser would presumably take
you if you clicked on a link with an HREF as given.
"""
scheme, netloc, path, query, fragment = urlparse.urlsplit(st)
if not scheme:
scheme = self.scheme
if not netloc:
netloc = self.netloc
if not path:
path = self.path
if not query:
query = self.query
elif path[0] != '/':
l = self.pathList()
l[-1] = path
path = '/'.join(l)
return URLPath(scheme,
netloc,
path,
query,
fragment)
def reduce_uri(self, uri, default_port=True):
"""Accept authority or URI and extract only the authority and path."""
# note HTTP URLs do not have a userinfo component
parts = urlparse.urlsplit(uri)
if parts[1]:
# URI
scheme = parts[0]
authority = parts[1]
path = parts[2] or '/'
else:
# host or host:port
scheme = None
authority = uri
path = '/'
host, port = splitport(authority)
if default_port and port is None and scheme is not None:
dport = {"http": 80,
"https": 443,
}.get(scheme)
if dport is not None:
authority = "%s:%d" % (host, dport)
return authority, path
def _checkFrom(self, pyobj):
'''WS-Address From,
XXX currently not checking the hostname, not forwarding messages.
pyobj -- From server returned.
'''
if pyobj is None: return
value = pyobj._Address
if value != self._addressTo:
scheme,netloc,path,query,fragment = urlparse.urlsplit(value)
schemeF,netlocF,pathF,queryF,fragmentF = urlparse.urlsplit(self._addressTo)
if scheme==schemeF and path==pathF and query==queryF and fragment==fragmentF:
netloc = netloc.split(':') + ['80']
netlocF = netlocF.split(':') + ['80']
if netloc[1]==netlocF[1] and (socket.gethostbyname(netlocF[0]) in
('127.0.0.1', socket.gethostbyname(netloc[0]))):
return
raise WSActionException('wrong WS-Address From(%s), expecting %s'%(value,self._addressTo))
def change_locale(request):
"""
Redirect to a given url while changing the locale in the path
The url and the locale code need to be specified in the
request parameters.
"""
next = request.REQUEST.get('next', None)
if not next:
referrer = request.META.get('HTTP_REFERER', None)
if referrer:
next = urlsplit(referrer)[2]
if not next:
next = '/'
_, path = utils.strip_path(next)
if request.method == 'POST':
locale = request.POST.get('locale', None)
if locale and check_for_language(locale):
if localeurl_settings.USE_SESSION:
request.session['django_language'] = locale
path = utils.locale_path(path, locale)
response = http.HttpResponseRedirect(path)
return response
def serial_class_for_url(url):
"""extract host and port from an URL string"""
parts = urlparse.urlsplit(url)
if parts.scheme != 'alt':
raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
class_name = 'Serial'
try:
for option, values in urlparse.parse_qs(parts.query, True).items():
if option == 'class':
class_name = values[0]
else:
raise ValueError('unknown option: %r' % (option,))
except ValueError as e:
raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url):
"""extract host and port from an URL string"""
parts = urlparse.urlsplit(url)
if parts.scheme != "socket":
raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,))
try:
# process options now, directly altering self
for option, values in urlparse.parse_qs(parts.query, True).items():
if option == 'logging':
logging.basicConfig() # XXX is that good to call it here?
self.logger = logging.getLogger('pySerial.socket')
self.logger.setLevel(LOGGER_LEVELS[values[0]])
self.logger.debug('enabled logging')
else:
raise ValueError('unknown option: %r' % (option,))
# get host and port
host, port = parts.hostname, parts.port
if not 0 <= port < 65536:
raise ValueError("port not in range 0...65535")
except ValueError as e:
raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e)
return (host, port)
# - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url):
"""extract host and port from an URL string"""
parts = urlparse.urlsplit(url)
if parts.scheme != "loop":
raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": not starting with loop:// (%r)' % (parts.scheme,))
try:
# process options now, directly altering self
for option, values in urlparse.parse_qs(parts.query, True).items():
if option == 'logging':
logging.basicConfig() # XXX is that good to call it here?
self.logger = logging.getLogger('pySerial.loop')
self.logger.setLevel(LOGGER_LEVELS[values[0]])
self.logger.debug('enabled logging')
else:
raise ValueError('unknown option: %r' % (option,))
except ValueError as e:
raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": %s' % e)
# - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url):
"""extract host and port from an URL string"""
parts = urlparse.urlsplit(url)
if parts.scheme != 'spy':
raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
# process options now, directly altering self
formatter = FormatHexdump
color = False
output = sys.stderr
try:
for option, values in urlparse.parse_qs(parts.query, True).items():
if option == 'file':
output = open(values[0], 'w')
elif option == 'color':
color = True
elif option == 'raw':
formatter = FormatRaw
elif option == 'all':
self.show_all = True
else:
raise ValueError('unknown option: %r' % (option,))
except ValueError as e:
raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e)
self.formatter = formatter(output, color)
return ''.join([parts.netloc, parts.path])
def get_base_page_info(self, page_data):
"""Find the reverse-ip info for the base page"""
domain = urlparse.urlsplit(page_data['final_url']).hostname
try:
import socket
addr = socket.gethostbyname(domain)
host = str(socket.gethostbyaddr(addr)[0])
page_data['base_page_ip_ptr'] = host
except Exception:
pass
# keep moving up the domain until we can get a NS record
while domain is not None and 'base_page_dns_soa' not in page_data:
try:
import dns.resolver
dns_servers = dns.resolver.query(domain, "NS")
dns_server = str(dns_servers[0].target).strip('. ')
page_data['base_page_dns_ns'] = dns_server
except Exception:
pass
pos = domain.find('.')
if pos > 0:
domain = domain[pos + 1:]
else:
domain = None
def can_view_parent_source (self, url_data):
"""Determine if parent URL source can be retrieved."""
if not url_data.valid:
return False
parent = url_data.parent_url
if not parent:
return False
# Directory contents are dynamically generated, so it makes
# no sense in viewing/editing them.
if parent.startswith(u"file:"):
path = urlparse.urlsplit(parent)[2]
return not os.path.isdir(get_os_filename(path))
if parent.startswith((u"ftp:", u"ftps:")):
path = urlparse.urlsplit(parent)[2]
return bool(path) and not path.endswith(u'/')
# Only HTTP left
return parent.startswith((u"http:", u"https:"))
def serial_class_for_url(url):
"""extract host and port from an URL string"""
parts = urlparse.urlsplit(url)
if parts.scheme != 'alt':
raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
class_name = 'Serial'
try:
for option, values in urlparse.parse_qs(parts.query, True).items():
if option == 'class':
class_name = values[0]
else:
raise ValueError('unknown option: %r' % (option,))
except ValueError as e:
raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url):
"""extract host and port from an URL string"""
parts = urlparse.urlsplit(url)
if parts.scheme != "socket":
raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,))
try:
# process options now, directly altering self
for option, values in urlparse.parse_qs(parts.query, True).items():
if option == 'logging':
logging.basicConfig() # XXX is that good to call it here?
self.logger = logging.getLogger('pySerial.socket')
self.logger.setLevel(LOGGER_LEVELS[values[0]])
self.logger.debug('enabled logging')
else:
raise ValueError('unknown option: %r' % (option,))
# get host and port
host, port = parts.hostname, parts.port
if not 0 <= port < 65536:
raise ValueError("port not in range 0...65535")
except ValueError as e:
raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e)
return (host, port)
# - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url):
"""extract host and port from an URL string"""
parts = urlparse.urlsplit(url)
if parts.scheme != "loop":
raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": not starting with loop:// (%r)' % (parts.scheme,))
try:
# process options now, directly altering self
for option, values in urlparse.parse_qs(parts.query, True).items():
if option == 'logging':
logging.basicConfig() # XXX is that good to call it here?
self.logger = logging.getLogger('pySerial.loop')
self.logger.setLevel(LOGGER_LEVELS[values[0]])
self.logger.debug('enabled logging')
else:
raise ValueError('unknown option: %r' % (option,))
except ValueError as e:
raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": %s' % e)
# - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url):
"""extract host and port from an URL string"""
parts = urlparse.urlsplit(url)
if parts.scheme != 'spy':
raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
# process options now, directly altering self
formatter = FormatHexdump
color = False
output = sys.stderr
try:
for option, values in urlparse.parse_qs(parts.query, True).items():
if option == 'file':
output = open(values[0], 'w')
elif option == 'color':
color = True
elif option == 'raw':
formatter = FormatRaw
elif option == 'all':
self.show_all = True
else:
raise ValueError('unknown option: %r' % (option,))
except ValueError as e:
raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e)
self.formatter = formatter(output, color)
return ''.join([parts.netloc, parts.path])
def _convert_to_idn(url):
"""Convert a URL to IDN notation"""
# this function should only be called with a unicode string
# strategy: if the host cannot be encoded in ascii, then
# it'll be necessary to encode it in idn form
parts = list(urlparse.urlsplit(url))
try:
parts[1].encode('ascii')
except UnicodeEncodeError:
# the url needs to be converted to idn notation
host = parts[1].rsplit(':', 1)
newhost = []
port = u''
if len(host) == 2:
port = host.pop()
for h in host[0].split('.'):
newhost.append(h.encode('idna').decode('utf-8'))
parts[1] = '.'.join(newhost)
if port:
parts[1] += ':' + port
return urlparse.urlunsplit(parts)
else:
return url
def download_file(my_URL, my_outfile = ''):
# function to download a file from a URL
# !! This will overwrite the output file
# https://gist.github.com/hughdbrown/c145b8385a2afa6570e2
import urllib2
import urlparse
import os
URL_basename = os.path.basename(urlparse.urlsplit(my_URL).path)
# if no output file specified, save to URL filename in current dir
if my_outfile == '':
my_outfile = URL_basename
my_URL = urllib2.urlopen(my_URL)
with open(my_outfile, 'wb') as output:
while True:
data = my_URL.read(4096) # download in chunks
if data:
output.write(data)
else:
break
def download_file(my_URL, my_outfile = ''):
# function to download a file from a URL
# !! This will overwrite the output file
# https://gist.github.com/hughdbrown/c145b8385a2afa6570e2
import urllib2
import urlparse
import os
URL_basename = os.path.basename(urlparse.urlsplit(my_URL).path)
# if no output file specified, save to URL filename in current dir
if my_outfile == '':
my_outfile = URL_basename
my_URL = urllib2.urlopen(my_URL)
with open(my_outfile, 'wb') as output:
while True:
data = my_URL.read(4096) # download in chunks
if data:
output.write(data)
else:
break
def download_file(my_URL, my_outfile = ''):
# function to download a file from a URL
# !! This will overwrite the output file
# https://gist.github.com/hughdbrown/c145b8385a2afa6570e2
import urllib2
import urlparse
import os
URL_basename = os.path.basename(urlparse.urlsplit(my_URL).path)
# if no output file specified, save to URL filename in current dir
if my_outfile == '':
my_outfile = URL_basename
my_URL = urllib2.urlopen(my_URL)
with open(my_outfile, 'wb') as output:
while True:
data = my_URL.read(4096) # download in chunks
if data:
output.write(data)
else:
break
def download_file(my_URL, my_outfile = ''):
# function to download a file from a URL
# !! This will overwrite the output file
# https://gist.github.com/hughdbrown/c145b8385a2afa6570e2
import urllib2
import urlparse
import os
URL_basename = os.path.basename(urlparse.urlsplit(my_URL).path)
# if no output file specified, save to URL filename in current dir
if my_outfile == '':
my_outfile = URL_basename
my_URL = urllib2.urlopen(my_URL)
with open(my_outfile, 'wb') as output:
while True:
data = my_URL.read(4096) # download in chunks
if data:
output.write(data)
else:
break
def download_file(my_URL, my_outfile = ''):
# function to download a file from a URL
# !! This will overwrite the output file
# https://gist.github.com/hughdbrown/c145b8385a2afa6570e2
import urllib2
import urlparse
import os
URL_basename = os.path.basename(urlparse.urlsplit(my_URL).path)
# if no output file specified, save to URL filename in current dir
if my_outfile == '':
my_outfile = URL_basename
my_URL = urllib2.urlopen(my_URL)
with open(my_outfile, 'wb') as output:
while True:
data = my_URL.read(4096) # download in chunks
if data:
output.write(data)
else:
break
def create_http_request(self, method, url, headers, body, timeout, **kwargs):
scheme, netloc, path, query, _ = urlparse.urlsplit(url)
if netloc.rfind(':') <= netloc.rfind(']'):
# no port number
host = netloc
port = 443 if scheme == 'https' else 80
else:
host, _, port = netloc.rpartition(':')
port = int(port)
if query:
path += '?' + query
if 'Host' not in headers:
headers['Host'] = host
if body and 'Content-Length' not in headers:
headers['Content-Length'] = str(len(body))
ConnectionType = httplib.HTTPSConnection if scheme == 'https' else httplib.HTTPConnection
connection = ConnectionType(netloc, timeout=timeout)
connection.request(method, path, body=body, headers=headers)
response = connection.getresponse()
return response
def filter(self, handler):
path = urlparse.urlsplit(handler.path).path
if path.startswith('/'):
path = urllib.unquote_plus(path.lstrip('/') or '.').decode('utf8')
if os.path.isdir(path):
index_file = os.path.join(path, self.index_file)
if not os.path.isfile(index_file):
content = self.format_index_html(path).encode('UTF-8')
headers = {'Content-Type': 'text/html; charset=utf-8', 'Connection': 'close'}
return 'mock', {'status': 200, 'headers': headers, 'body': content}
else:
path = index_file
if os.path.isfile(path):
content_type = 'application/octet-stream'
try:
import mimetypes
content_type = mimetypes.types_map.get(os.path.splitext(path)[1])
if os.path.splitext(path)[1].endswith(('crt', 'pem')):
content_type = 'application/x-x509-ca-cert'
except StandardError as e:
logging.error('import mimetypes failed: %r', e)
with open(path, 'rb') as fp:
content = fp.read()
headers = {'Connection': 'close', 'Content-Type': content_type}
return 'mock', {'status': 200, 'headers': headers, 'body': content}
def reduce_uri(self, uri, default_port=True):
"""Accept authority or URI and extract only the authority and path."""
# note HTTP URLs do not have a userinfo component
parts = urlparse.urlsplit(uri)
if parts[1]:
# URI
scheme = parts[0]
authority = parts[1]
path = parts[2] or '/'
else:
# host or host:port
scheme = None
authority = uri
path = '/'
host, port = splitport(authority)
if default_port and port is None and scheme is not None:
dport = {"http": 80,
"https": 443,
}.get(scheme)
if dport is not None:
authority = "%s:%d" % (host, dport)
return authority, path
def _CalculateRequestSize(self, req):
"""Calculates the request size.
Args:
req: A tuple of (uri, method name, request body, header map)
Returns:
the size of the request, in bytes.
"""
uri, method, body, headers = req
(unused_scheme,
unused_host_port, url_path,
unused_query, unused_fragment) = urlparse.urlsplit(uri)
size = len('%s %s HTTP/1.1\n' % (method, url_path))
size += self._CalculateHeaderSize(headers)
if body:
size += len(body)
return size
def _parse_relative_url(relative_url):
"""Parses a relative URL and splits it into its path and query string.
Args:
relative_url: The relative URL, starting with a '/'.
Returns:
Tuple (path, query) where:
path: The path in the relative URL.
query: The query string in the URL without the '?' character.
Raises:
_RelativeUrlError if the relative_url is invalid for whatever reason.
"""
if not relative_url:
raise _RelativeUrlError('Relative URL is empty')
(scheme, netloc, path, query, fragment) = urlparse.urlsplit(relative_url)
if scheme or netloc:
raise _RelativeUrlError('Relative URL may not have a scheme or location')
if fragment:
raise _RelativeUrlError('Relative URL may not specify a fragment')
if not path or path[0] != '/':
raise _RelativeUrlError('Relative URL path must start with "/"')
return path, query