def reduce_uri(self, uri, default_port=True):
"""Accept authority or URI and extract only the authority and path."""
# note HTTP URLs do not have a userinfo component
parts = urlparse.urlsplit(uri)
if parts[1]:
# URI
scheme = parts[0]
authority = parts[1]
path = parts[2] or '/'
else:
# host or host:port
scheme = None
authority = uri
path = '/'
host, port = splitport(authority)
if default_port and port is None and scheme is not None:
dport = {"http": 80,
"https": 443,
}.get(scheme)
if dport is not None:
authority = "%s:%d" % (host, dport)
return authority, path
python类urlsplit()的实例源码
def _CalculateRequestSize(self, req):
"""Calculates the request size.
May be overriden to support different types of requests.
Args:
req: A urllib2.Request.
Returns:
the size of the request, in bytes.
"""
(unused_scheme,
unused_host_port, url_path,
unused_query, unused_fragment) = urlparse.urlsplit(req.get_full_url())
size = len('%s %s HTTP/1.1\n' % (req.get_method(), url_path))
size += self._CalculateHeaderSize(req.headers)
size += self._CalculateHeaderSize(req.unredirected_hdrs)
data = req.get_data()
if data:
size += len(data)
return size
def _parse_relative_url(relative_url):
"""Parses a relative URL and splits it into its path and query string.
Args:
relative_url: The relative URL, starting with a '/'.
Returns:
Tuple (path, query) where:
path: The path in the relative URL.
query: The query string in the URL without the '?' character.
Raises:
_RelativeUrlError if the relative_url is invalid for whatever reason.
"""
if not relative_url:
raise _RelativeUrlError('Relative URL is empty')
(scheme, netloc, path, query, fragment) = urlparse.urlsplit(relative_url)
if scheme or netloc:
raise _RelativeUrlError('Relative URL may not have a scheme or location')
if fragment:
raise _RelativeUrlError('Relative URL may not specify a fragment')
if not path or path[0] != '/':
raise _RelativeUrlError('Relative URL path must start with "/"')
return path, query
def join_url(base_url, path):
"""Joins base url and path removing extra slashes.
Removes trailing slashes. Joins queries.
E.g.: See unit tests.
:param base_url: Base url.
:param path: Path.
:return: Joined url.
"""
# Example of usages see in unittests
base_url = urlparse.urlsplit(base_url, allow_fragments=False)
path = urlparse.urlsplit(path, allow_fragments=False)
full_path = _join_paths(base_url.path, path.path)
full_query = _join_queries(base_url.query, path.query)
return urlparse.urlunsplit(
(base_url.scheme, base_url.netloc, full_path, full_query,
base_url.fragment))
def check_registry_status(url=DEFAULT_IMAGES_URL, _v2=False):
"""
Performs api check for registry health status.
:params url: registry url
:raises RegistryError: if registry is not available
"""
url = urlsplit(url)._replace(path='/v2/' if _v2 else '/v1/_ping').geturl()
with raise_registry_error(url):
response = requests.get(url, timeout=PING_REQUEST_TIMEOUT,
verify=False)
need_v2 = not _v2 and response.status_code == 404 and \
response.headers.get(API_VERSION_HEADER) == 'registry/2.0'
if need_v2:
check_registry_status(url, _v2=True)
elif response.status_code == 401:
return # user is not authorized, but registry is available
else:
response.raise_for_status()
def url_join(*parts, **kwargs):
"""
Normalize url parts and join them with a slash.
adapted from: http://codereview.stackexchange.com/q/13027
"""
def concat_paths(sequence):
result = []
for path in sequence:
result.append(path)
if path.startswith('/'):
break
return '/'.join(reversed(result))
schemes, netlocs, paths, queries, fragments = zip(*(urlsplit(part) for part in reversed(parts)))
scheme = next((x for x in schemes if x), kwargs.get('scheme', 'http'))
netloc = next((x for x in netlocs if x), '')
path = concat_paths(paths)
query = queries[0]
fragment = fragments[0]
return urlunsplit((scheme, netloc, path, query, fragment))
def url_to_path(self, url):
"""Create file system path for this URL
"""
components = urlparse.urlsplit(url)
# when empty path set to /index.html
path = components.path
if not path:
path = '/index.html'
elif path.endswith('/'):
path += 'index.html'
filename = components.netloc + path + components.query
# replace invalid characters
filename = re.sub('[^/0-9a-zA-Z\-.,;_ ]', '_', filename)
# restrict maximum number of characters
filename = '/'.join(segment[:255] for segment in filename.split('/'))
return os.path.join(self.cache_dir, filename)
def resource_dictize(res, context):
model = context['model']
resource = d.table_dictize(res, context)
extras = resource.pop("extras", None)
if extras:
resource.update(extras)
# some urls do not have the protocol this adds http:// to these
url = resource['url']
## for_edit is only called at the times when the dataset is to be edited
## in the frontend. Without for_edit the whole qualified url is returned.
if resource.get('url_type') == 'upload' and not context.get('for_edit'):
cleaned_name = munge.munge_filename(url)
resource['url'] = h.url_for(controller='package',
action='resource_download',
id=resource['package_id'],
resource_id=res.id,
filename=cleaned_name,
qualified=True)
elif resource['url'] and not urlparse.urlsplit(url).scheme and not context.get('for_edit'):
resource['url'] = u'http://' + url.lstrip('/')
return resource
def do_GET(self):
# /?oauth_token=72157630789362986-5405f8542b549e95&oauth_verifier=fe4eac402339100e
qs = urllib_parse.urlsplit(self.path).query
url_vars = urllib_parse.parse_qs(qs)
oauth_token = url_vars['oauth_token'][0]
oauth_verifier = url_vars['oauth_verifier'][0]
if six.PY2:
self.server.oauth_token = oauth_token.decode('utf-8')
self.server.oauth_verifier = oauth_verifier.decode('utf-8')
else:
self.server.oauth_token = oauth_token
self.server.oauth_verifier = oauth_verifier
assert (isinstance(self.server.oauth_token, six.string_types))
assert (isinstance(self.server.oauth_verifier, six.string_types))
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html.auth_okay_html)
def from_url(self, url):
"""extract host and port from an URL string"""
parts = urlparse.urlsplit(url)
if parts.scheme != "loop":
raise SerialException(
'expected a string in the form '
'"loop://[?logging={debug|info|warning|error}]": not starting '
'with loop:// ({!r})'.format(parts.scheme))
try:
# process options now, directly altering self
for option, values in urlparse.parse_qs(parts.query, True).items():
if option == 'logging':
logging.basicConfig() # XXX is that good to call it here?
self.logger = logging.getLogger('pySerial.loop')
self.logger.setLevel(LOGGER_LEVELS[values[0]])
self.logger.debug('enabled logging')
else:
raise ValueError('unknown option: {!r}'.format(option))
except ValueError as e:
raise SerialException(
'expected a string in the form '
'"loop://[?logging={debug|info|warning|error}]": {}'.format(e))
# - - - - - - - - - - - - - - - - - - - - - - - -
def _convert_to_idn(url):
"""Convert a URL to IDN notation"""
# this function should only be called with a unicode string
# strategy: if the host cannot be encoded in ascii, then
# it'll be necessary to encode it in idn form
parts = list(urlparse.urlsplit(url))
try:
parts[1].encode('ascii')
except UnicodeEncodeError:
# the url needs to be converted to idn notation
host = parts[1].rsplit(':', 1)
newhost = []
port = u''
if len(host) == 2:
port = host.pop()
for h in host[0].split('.'):
newhost.append(h.encode('idna').decode('utf-8'))
parts[1] = '.'.join(newhost)
if port:
parts[1] += ':' + port
return urlparse.urlunsplit(parts)
else:
return url
def _convert_to_idn(url):
"""Convert a URL to IDN notation"""
# this function should only be called with a unicode string
# strategy: if the host cannot be encoded in ascii, then
# it'll be necessary to encode it in idn form
parts = list(urlparse.urlsplit(url))
try:
parts[1].encode('ascii')
except UnicodeEncodeError:
# the url needs to be converted to idn notation
host = parts[1].rsplit(':', 1)
newhost = []
port = u''
if len(host) == 2:
port = host.pop()
for h in host[0].split('.'):
newhost.append(h.encode('idna').decode('utf-8'))
parts[1] = '.'.join(newhost)
if port:
parts[1] += ':' + port
return urlparse.urlunsplit(parts)
else:
return url
def _get_env_info(self, script_url):
script_folder = ModuleExec('system_info', [ '-info', 'script_folder' ]).load_result_or_run('script_folder')
if not script_folder: return
script_url_splitted = urlparse.urlsplit(script_url)
script_url_path_folder, script_url_path_filename = os.path.split(
script_url_splitted.path)
url_folder_pieces = script_url_path_folder.split(os.sep)
folder_pieces = script_folder.split(os.sep)
for pieceurl, piecefolder in zip(reversed(url_folder_pieces), reversed(folder_pieces)):
if pieceurl == piecefolder:
folder_pieces.pop()
url_folder_pieces.pop()
else:
break
base_url_path_folder = os.sep.join(url_folder_pieces)
self.base_folder_url = urlparse.urlunsplit(
script_url_splitted[:2] + (base_url_path_folder, ) + script_url_splitted[3:])
self.base_folder_path = os.sep.join(folder_pieces)
def gethtml(url):
with open('cookies') as f:
cookies = requests.utils.cookiejar_from_dict(pickle.load(f))
session = requests.session()
session.cookies = cookies
del session.cookies['c_visitor']
if not forceusa and localizecookies:
session.cookies['c_locale']={u'Español (Espana)' : 'esES', u'Français (France)' : 'frFR', u'Português (Brasil)' : 'ptBR',
u'English' : 'enUS', u'Español' : 'esLA', u'Türkçe' : 'enUS', u'Italiano' : 'itIT',
u'???????' : 'arME' , u'Deutsch' : 'deDE'}[lang]
if forceusa:
try:
session.cookies['sess_id'] = requests.get('http://www.crunblocker.com/sess_id.php').text
except:
sleep(10) # sleep so we don't overload crunblocker
session.cookies['sess_id'] = requests.get('http://www.crunblocker.com/sess_id.php').text
parts = urlparse.urlsplit(url)
if not parts.scheme or not parts.netloc:
print 'Apparently not a URL'
sys.exit()
data = {'Referer': 'http://crunchyroll.com/', 'Host': 'www.crunchyroll.com',
'User-Agent': 'Mozilla/5.0 Windows NT 6.1; rv:26.0 Gecko/20100101 Firefox/26.0'}
res = session.get(url, params=data)
res.encoding = 'UTF-8'
return res.text
def from_url(self, url):
"""extract host and port from an URL string"""
parts = urlparse.urlsplit(url)
if parts.scheme != "loop":
raise SerialException(
'expected a string in the form '
'"loop://[?logging={debug|info|warning|error}]": not starting '
'with loop:// ({!r})'.format(parts.scheme))
try:
# process options now, directly altering self
for option, values in urlparse.parse_qs(parts.query, True).items():
if option == 'logging':
logging.basicConfig() # XXX is that good to call it here?
self.logger = logging.getLogger('pySerial.loop')
self.logger.setLevel(LOGGER_LEVELS[values[0]])
self.logger.debug('enabled logging')
else:
raise ValueError('unknown option: {!r}'.format(option))
except ValueError as e:
raise SerialException(
'expected a string in the form '
'"loop://[?logging={debug|info|warning|error}]": {}'.format(e))
# - - - - - - - - - - - - - - - - - - - - - - - -
def __init__(self, base_url, login, api_key):
if not base_url.endswith("/"):
base_url += "/"
self.__base_url = base_url
self.__api_key = api_key
self.__login = login
self._api_version = "api/v1/"
self.__unique_code = self.get_unique_code()
self._scheme, self._server, self._api_base, _, _ = urlparse.urlsplit(base_url)
self.__sign_code = None
self.__entity_list = []
self.__general_doc_dict = None
self.__logger = None
# self.function_list = Command(self, "console/FunctionList", [])
entity_list_params = [
{"attr": "entity",
"type": "list",
"need": False}
]
self._entities_detail = Command(self, "console/entity", entity_list_params)
self.__init_entities()
def from_html(self, cr, uid, model, field, element, context=None):
url = element.find('img').get('src')
url_object = urlparse.urlsplit(url)
if url_object.path.startswith('/website/image'):
# url might be /website/image/<model>/<id>[_<checksum>]/<field>[/<width>x<height>]
fragments = url_object.path.split('/')
query = dict(urlparse.parse_qsl(url_object.query))
model = query.get('model', fragments[3])
oid = query.get('id', fragments[4].split('_')[0])
field = query.get('field', fragments[5])
item = self.pool[model].browse(cr, uid, int(oid), context=context)
return item[field]
if self.local_url_re.match(url_object.path):
return self.load_local_url(url)
return self.load_remote_url(url)
def load_local_url(self, url):
match = self.local_url_re.match(urlparse.urlsplit(url).path)
rest = match.group('rest')
for sep in os.sep, os.altsep:
if sep and sep != '/':
rest.replace(sep, '/')
path = openerp.modules.get_module_resource(
match.group('module'), 'static', *(rest.split('/')))
if not path:
return None
try:
with open(path, 'rb') as f:
# force complete image load to ensure it's valid image data
image = I.open(f)
image.load()
f.seek(0)
return f.read().encode('base64')
except Exception:
logger.exception("Failed to load local image %r", url)
return None
def serial_class_for_url(url):
"""extract host and port from an URL string"""
parts = urlparse.urlsplit(url)
if parts.scheme != 'alt':
raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
class_name = 'Serial'
try:
for option, values in urlparse.parse_qs(parts.query, True).items():
if option == 'class':
class_name = values[0]
else:
raise ValueError('unknown option: %r' % (option,))
except ValueError as e:
raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url):
"""extract host and port from an URL string"""
parts = urlparse.urlsplit(url)
if parts.scheme != "socket":
raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,))
try:
# process options now, directly altering self
for option, values in urlparse.parse_qs(parts.query, True).items():
if option == 'logging':
logging.basicConfig() # XXX is that good to call it here?
self.logger = logging.getLogger('pySerial.socket')
self.logger.setLevel(LOGGER_LEVELS[values[0]])
self.logger.debug('enabled logging')
else:
raise ValueError('unknown option: %r' % (option,))
# get host and port
host, port = parts.hostname, parts.port
if not 0 <= port < 65536:
raise ValueError("port not in range 0...65535")
except ValueError as e:
raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e)
return (host, port)
# - - - - - - - - - - - - - - - - - - - - - - - -