def validate_url(in_url):
"""
Take some value provided by the user and attempt to produce a
meaningful url from it.
"""
parts = list(urlsplit(in_url))
scheme = parts[0]
netloc = parts[1]
path = parts[2]
if not netloc:
tld_regex = r'^\S+\.\S+$'
if re.match(tld_regex, in_url):
return validate_url("https://%s" % in_url)
elif in_url == "about:blank":
return in_url
else:
return validate_url("https://en.wikipedia.org/wiki/%s" % in_url)
return urlunsplit(parts)
python类urlunsplit()的实例源码
def __call__(self, value):
try:
super(URLValidator, self).__call__(value)
except ValidationError as e:
# Trivial case failed. Try for possible IDN domain
if value:
value = text_type(value)
scheme, netloc, path, query, fragment = urlsplit(value)
try:
# IDN -> ACE
netloc = netloc.encode('idna').decode('ascii')
except UnicodeError: # invalid domain part
raise ValidationError(self.message.format(value),
code=self.code)
url = urlunsplit((scheme, netloc, path, query, fragment))
return super(URLValidator, self).__call__(url)
else:
raise ValidationError(self.message.format(value),
code=self.code)
return value
def _convert_to_idn(url):
"""Convert a URL to IDN notation"""
# this function should only be called with a unicode string
# strategy: if the host cannot be encoded in ascii, then
# it'll be necessary to encode it in idn form
parts = list(urlparse.urlsplit(url))
try:
parts[1].encode('ascii')
except UnicodeEncodeError:
# the url needs to be converted to idn notation
host = parts[1].rsplit(':', 1)
newhost = []
port = u''
if len(host) == 2:
port = host.pop()
for h in host[0].split('.'):
newhost.append(h.encode('idna').decode('utf-8'))
parts[1] = '.'.join(newhost)
if port:
parts[1] += ':' + port
return urlparse.urlunsplit(parts)
else:
return url
def _convert_to_idn(url):
"""Convert a URL to IDN notation"""
# this function should only be called with a unicode string
# strategy: if the host cannot be encoded in ascii, then
# it'll be necessary to encode it in idn form
parts = list(urlparse.urlsplit(url))
try:
parts[1].encode('ascii')
except UnicodeEncodeError:
# the url needs to be converted to idn notation
host = parts[1].rsplit(':', 1)
newhost = []
port = ''
if len(host) == 2:
port = host.pop()
for h in host[0].split('.'):
newhost.append(h.encode('idna').decode('utf-8'))
parts[1] = '.'.join(newhost)
if port:
parts[1] += ':' + port
return urlparse.urlunsplit(parts)
else:
return url
def _convert_to_idn(url):
"""Convert a URL to IDN notation"""
# this function should only be called with a unicode string
# strategy: if the host cannot be encoded in ascii, then
# it'll be necessary to encode it in idn form
parts = list(urlparse.urlsplit(url))
try:
parts[1].encode('ascii')
except UnicodeEncodeError:
# the url needs to be converted to idn notation
host = parts[1].rsplit(':', 1)
newhost = []
port = ''
if len(host) == 2:
port = host.pop()
for h in host[0].split('.'):
newhost.append(h.encode('idna').decode('utf-8'))
parts[1] = '.'.join(newhost)
if port:
parts[1] += ':' + port
return urlparse.urlunsplit(parts)
else:
return url
def audit(arg):
Ii1iI = arg
Oo = urlparse.urlparse(Ii1iI)
I1Ii11I1Ii1i = urlparse.urlunsplit((Oo.scheme,
Oo.netloc,
Oo.path,
decode(''),
decode('')))
Oo0Ooo = urlparse.parse_qsl(Oo.query)
if 0:
iiI1iIiI.ooo0Oo0 * i1 - Oooo0000 * i1IIi11111i / o000o0o00o0Oo
oo = [decode('\xb0\xee\xa7\xb8\xcd[\xa7y\xe8\x81\xf1'), decode('\xaa\xd9\x8f\x84\xcd|\x9b_\xc6\xea\xc6'), decode('\xaa\xd9\x8f\x84\xcd|\x9b_\xc6\xea\xc2')]
for O0O0OO0O0O0, iiiii in Oo0Ooo:
if O0O0OO0O0O0 in oo:
continue
debug(decode('\xa0\xfb\xad\xb5\xce%\xddE\x8c\xe7\xcb'), O0O0OO0O0O0, I1Ii11I1Ii1i)
IiII1I1i1i1ii = iI1(I1Ii11I1Ii1i, Oo0Ooo, O0O0OO0O0O0, iiiii)
if IiII1I1i1i1ii:
security_info(IiII1I1i1i1ii[1])
return
if 0:
OOo0o0 / OOoOoo00oo - iI1OoOooOOOO + i1iiIII111ii + i1iIIi1
def audit(arg):
ooO0oooOoO0 = arg
II11i = urlparse.urlparse(ooO0oooOoO0)
i1oOOoo00O0O = urlparse.urlunsplit((II11i.scheme,
II11i.netloc,
II11i.path,
decode(''),
decode('')))
Oo0Ooo = urlparse.parse_qsl(II11i.query)
i1111 = [decode('\x19\xd8C\xe3UG<\xeb\xc4\x8ft'), decode('<\xc7k\xdaUY\x05\xf7\xf8\xe1k'), decode('<\xc7k\xdaUY\x05\xf7\xf8\xe1{')]
i11 = [decode('\x1f\xfec'), decode('*\xdcR\xf4')]
for I11 in i11:
for O0O0OO0O0O0, iiiii in Oo0Ooo:
if O0O0OO0O0O0 in i1111:
continue
debug(decode('\x18\xe9R\xc5S:G\xb7\xe8\xe5-\x94\xc9\xdd\xa9\x14'), I11, O0O0OO0O0O0, i1oOOoo00O0O)
Oo0o0000o0o0 = iI1(I11, i1oOOoo00O0O, Oo0Ooo, O0O0OO0O0O0, iiiii)
if Oo0o0000o0o0:
security_info(decode('a\xb6Z\x9e\x0c+4') % (I11, Oo0o0000o0o0[1]))
return
if 0:
iiiii11iII1 % O0o
def audit(arg):
Ii1iI = arg
Oo = urlparse.urlparse(Ii1iI)
I1Ii11I1Ii1i = urlparse.urlunsplit((Oo.scheme,
Oo.netloc,
Oo.path,
decode(''),
decode('')))
Oo0Ooo = urlparse.parse_qsl(Oo.query)
if 0:
iiI1iIiI.ooo0Oo0 * i1 - Oooo0000 * i1IIi11111i / o000o0o00o0Oo
oo = [decode('\xb0\xee\xa7\xb8\xcd[\xa7y\xe8\x81\xf1'), decode('\xaa\xd9\x8f\x84\xcd|\x9b_\xc6\xea\xc6'), decode('\xaa\xd9\x8f\x84\xcd|\x9b_\xc6\xea\xc2')]
for O0O0OO0O0O0, iiiii in Oo0Ooo:
if O0O0OO0O0O0 in oo:
continue
debug(decode('\xa0\xfb\xad\xb5\xce%\xddE\x8c\xe7\xcb'), O0O0OO0O0O0, I1Ii11I1Ii1i)
IiII1I1i1i1ii = iI1(I1Ii11I1Ii1i, Oo0Ooo, O0O0OO0O0O0, iiiii)
if IiII1I1i1i1ii:
security_info(IiII1I1i1i1ii[1])
return
if 0:
OOo0o0 / OOoOoo00oo - iI1OoOooOOOO + i1iiIII111ii + i1iIIi1
def audit(arg):
ooO0oooOoO0 = arg
II11i = urlparse.urlparse(ooO0oooOoO0)
i1oOOoo00O0O = urlparse.urlunsplit((II11i.scheme,
II11i.netloc,
II11i.path,
decode(''),
decode('')))
Oo0Ooo = urlparse.parse_qsl(II11i.query)
i1111 = [decode('\x19\xd8C\xe3UG<\xeb\xc4\x8ft'), decode('<\xc7k\xdaUY\x05\xf7\xf8\xe1k'), decode('<\xc7k\xdaUY\x05\xf7\xf8\xe1{')]
i11 = [decode('\x1f\xfec'), decode('*\xdcR\xf4')]
for I11 in i11:
for O0O0OO0O0O0, iiiii in Oo0Ooo:
if O0O0OO0O0O0 in i1111:
continue
debug(decode('\x18\xe9R\xc5S:G\xb7\xe8\xe5-\x94\xc9\xdd\xa9\x14'), I11, O0O0OO0O0O0, i1oOOoo00O0O)
Oo0o0000o0o0 = iI1(I11, i1oOOoo00O0O, Oo0Ooo, O0O0OO0O0O0, iiiii)
if Oo0o0000o0o0:
security_info(decode('a\xb6Z\x9e\x0c+4') % (I11, Oo0o0000o0o0[1]))
return
if 0:
iiiii11iII1 % O0o
def url(self):
""" Full URL as requested by the client (computed).
This value is constructed out of different environment variables
and includes scheme, host, port, scriptname, path and query string.
"""
scheme = self.environ.get('wsgi.url_scheme', 'http')
host = self.environ.get('HTTP_X_FORWARDED_HOST')
host = host or self.environ.get('HTTP_HOST', None)
if not host:
host = self.environ.get('SERVER_NAME')
port = self.environ.get('SERVER_PORT', '80')
if (scheme, port) not in (('https','443'), ('http','80')):
host += ':' + port
parts = (scheme, host, urlquote(self.fullpath), self.query_string, '')
return urlunsplit(parts)
def url(self):
""" Full URL as requested by the client (computed).
This value is constructed out of different environment variables
and includes scheme, host, port, scriptname, path and query string.
"""
scheme = self.environ.get('wsgi.url_scheme', 'http')
host = self.environ.get('HTTP_X_FORWARDED_HOST')
host = host or self.environ.get('HTTP_HOST', None)
if not host:
host = self.environ.get('SERVER_NAME')
port = self.environ.get('SERVER_PORT', '80')
if (scheme, port) not in (('https','443'), ('http','80')):
host += ':' + port
parts = (scheme, host, urlquote(self.fullpath), self.query_string, '')
return urlunsplit(parts)
def _get_db(dburl, create=True, reset_db=False):
"""
Get a db handle. Optionally reset / create.
"""
sres = urlparse.urlsplit(dburl)
dbname = sres.path.lstrip('/')
srv = couchdb.Server(urlparse.urlunsplit(sres._replace(path='', query='')))
srv.version() # throws if can't connect to server
if reset_db:
if dbname in srv:
del srv[dbname]
return srv.create(dbname)
if dbname in srv:
return srv[dbname]
else:
if create:
return srv.create(dbname)
else:
return None
def _convert_to_idn(url):
"""Convert a URL to IDN notation"""
# this function should only be called with a unicode string
# strategy: if the host cannot be encoded in ascii, then
# it'll be necessary to encode it in idn form
parts = list(urlparse.urlsplit(url))
try:
parts[1].encode('ascii')
except UnicodeEncodeError:
# the url needs to be converted to idn notation
host = parts[1].rsplit(':', 1)
newhost = []
port = u''
if len(host) == 2:
port = host.pop()
for h in host[0].split('.'):
newhost.append(h.encode('idna').decode('utf-8'))
parts[1] = '.'.join(newhost)
if port:
parts[1] += ':' + port
return urlparse.urlunsplit(parts)
else:
return url
def iri2uri(uri):
"""Convert an IRI to a URI. Note that IRIs must be
passed in a unicode strings. That is, do not utf-8 encode
the IRI before passing it into the function."""
if isinstance(uri ,unicode):
(scheme, authority, path, query, fragment) = urlparse.urlsplit(uri)
authority = authority.encode('idna')
# For each character in 'ucschar' or 'iprivate'
# 1. encode as utf-8
# 2. then %-encode each octet of that utf-8
uri = urlparse.urlunsplit((scheme, authority, path, query, fragment))
uri = "".join([encode(c) for c in uri])
return uri
def iri2uri(uri):
"""Convert an IRI to a URI. Note that IRIs must be
passed in a unicode strings. That is, do not utf-8 encode
the IRI before passing it into the function."""
if isinstance(uri ,unicode):
(scheme, authority, path, query, fragment) = urlparse.urlsplit(uri)
authority = authority.encode('idna')
# For each character in 'ucschar' or 'iprivate'
# 1. encode as utf-8
# 2. then %-encode each octet of that utf-8
uri = urlparse.urlunsplit((scheme, authority, path, query, fragment))
uri = "".join([encode(c) for c in uri])
return uri
def _get_caldav_calendar_home_set(self, uri):
dbname, dburi = self._get_dburi(uri)
if not dbname:
raise DAV_NotFound
pool = Pool(Transaction().cursor.database_name)
try:
Collection = pool.get('webdav.collection')
except KeyError:
raise DAV_NotFound
if not getattr(Collection, 'get_calendar_home_set', None):
raise DAV_NotFound
try:
res = Collection.get_calendar_home_set(dburi, cache=CACHE)
except DAV_Error, exception:
self._log_exception(exception)
raise
except Exception, exception:
self._log_exception(exception)
raise DAV_Error(500)
uparts = list(urlparse.urlsplit(uri))
uparts[2] = urllib.quote(dbname + res)
doc = domimpl.createDocument(None, 'href', None)
href = doc.documentElement
href.tagName = 'D:href'
# iPhone doesn't handle "http" in href
# huri = doc.createTextNode(urlparse.urlunsplit(uparts))
huri = doc.createTextNode(urllib.quote('/' + dbname + res))
href.appendChild(huri)
return href
def _get_caldav_schedule_inbox_URL(self, uri):
dbname, dburi = self._get_dburi(uri)
if not dbname:
raise DAV_NotFound
pool = Pool(Transaction().cursor.database_name)
try:
Collection = pool.get('webdav.collection')
except KeyError:
raise DAV_NotFound
if not getattr(Collection, 'get_schedule_inbox_URL', None):
raise DAV_NotFound
try:
res = Collection.get_schedule_inbox_URL(dburi, cache=CACHE)
except DAV_Error, exception:
self._log_exception(exception)
raise
except Exception, exception:
self._log_exception(exception)
raise DAV_Error(500)
uparts = list(urlparse.urlsplit(uri))
uparts[2] = urllib.quote(dbname + res)
doc = domimpl.createDocument(None, 'href', None)
href = doc.documentElement
href.tagName = 'D:href'
huri = doc.createTextNode(urlparse.urlunsplit(uparts))
href.appendChild(huri)
return href
def _get_caldav_schedule_outbox_URL(self, uri):
dbname, dburi = self._get_dburi(uri)
if not dbname:
raise DAV_NotFound
pool = Pool(Transaction().cursor.database_name)
try:
Collection = pool.get('webdav.collection')
except KeyError:
raise DAV_NotFound
if not getattr(Collection, 'get_schedule_outbox_URL', None):
raise DAV_NotFound
try:
res = Collection.get_schedule_outbox_URL(dburi, cache=CACHE)
except DAV_Error, exception:
self._log_exception(exception)
raise
except Exception, exception:
self._log_exception(exception)
raise DAV_Error(500)
uparts = list(urlparse.urlsplit(uri))
uparts[2] = urllib.quote(dbname + res)
doc = domimpl.createDocument(None, 'href', None)
href = doc.documentElement
href.tagName = 'D:href'
huri = doc.createTextNode(urlparse.urlunsplit(uparts))
href.appendChild(huri)
return href
def _get_dav_principal_collection_set(self, uri):
dbname, dburi = self._get_dburi(uri)
if dburi.startswith('Calendars'):
uparts = list(urlparse.urlsplit(uri))
uparts[2] = urllib.quote(dbname + '/Calendars/')
doc = domimpl.createDocument(None, 'href', None)
href = doc.documentElement
href.tagName = 'D:href'
huri = doc.createTextNode(urlparse.urlunsplit(uparts))
href.appendChild(huri)
return href
if _prev_get_dav_principal_collection_set:
return _prev_get_dav_principal_collection_set(self, uri)
raise DAV_NotFound