def test_url_cast(self, set_env, env):
set_env({'URL': 'http://stevenloria.com/projects/?foo=42'})
res = env.url('URL')
assert isinstance(res, urlparse.ParseResult)
python类ParseResult()的实例源码
languagestripper.py 文件源码
项目:wmt16-document-alignment-task
作者: christianbuck
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def strip_uri(self, uri, expected_language=None,
remove_index=False):
''' Returns (stripped_uri, success) '''
parsed_uri = urlparse.urlparse(uri)
matched_languages = [self.match(parsed_uri.path),
self.match(parsed_uri.query)]
if (expected_language is not None) and \
(expected_language not in matched_languages):
# we removed a bit of the URL but is does not support our
# hope to find expected_language, e.g. removed /fr/ when we were
# looking for Italian pages.
return '', False
stripped_path = self.strip_path(parsed_uri.path)
# repair some stripping artifacts
stripped_path = re.sub(r'//+', '/', stripped_path)
stripped_path = re.sub(r'__+', '_', stripped_path)
stripped_path = re.sub(r'/_+', '/', stripped_path)
stripped_path = re.sub(r'_/', '/', stripped_path)
stripped_path = re.sub(r'--+', '-', stripped_path)
# remove new trailing /
if stripped_path and stripped_path[-1] == '/' \
and parsed_uri.path and parsed_uri.path[-1] != '/':
stripped_path = stripped_path[:-1]
# add removed trailing /
if not stripped_path.endswith('/') and parsed_uri.path.endswith('/'):
stripped_path += '/'
stripped_query = self.strip_query(parsed_uri.query)
# remove index files from tail of path if query empty
if remove_index and not stripped_query:
if stripped_path.split('/')[-1].startswith('index'):
stripped_path = '/'.join(stripped_path.split('/')[:-1])
netloc = parsed_uri.netloc
if '@' in netloc:
netloc = netloc.split('@')[1]
if ':' in netloc:
netloc = netloc.split(':')[0]
if not netloc:
return '', False
stripped_uri = urlparse.ParseResult(scheme='http',
netloc=parsed_uri.netloc,
path=stripped_path,
params='',
query=stripped_query,
fragment='').geturl()
return stripped_uri, stripped_uri != uri
def _parse_url(self,dst,src):
"""
Check wether target url 'dst' is in the same domain(include port) with url 'src', and
convert url into complete url without params.
Returns:
String of complete url with query params if it has. if target url is not in the
same domain, return '';
"""
LOG.debug('detecting url: '+dst)
s_parsed=urlparse.urlparse(src)
s_scheme=s_parsed.scheme
s_netloc=s_parsed.netloc
s_cur_dir=s_parsed.path
if s_cur_dir[-1]!='/':
s_cur_dir='/'.join(s_cur_dir.split('/')[:-1])
else:
s_cur_dir=s_cur_dir[:-1]
d_parsed=urlparse.urlparse(dst)
d_scheme=d_parsed.scheme
if d_parsed.netloc.find(':')==-1 and d_parsed.netloc!='':
if d_scheme=='http':
d_netloc=d_parsed.netloc+':80'
elif d_scheme=='https':
d_netloc=d_parsed.netloc+':443'
elif d_scheme=='':
d_netloc=d_parsed.netloc+':80' if s_scheme=='http' else d_parsed.netloc+':443'
else:
d_netloc=d_parsed.netloc
else:
d_netloc=d_parsed.netloc
# add '/' as prefix if the path does not starts with '/'
if d_parsed.path!='':
d_path='/'+d_parsed.path if d_parsed.path[0]!='/' else d_parsed.path
else:
d_path='/'
d_query=d_parsed.query
# if it is a relative url
if d_netloc=='':
return urlparse.ParseResult(s_scheme,s_netloc,s_cur_dir+d_path,'',d_query,'').geturl()
elif d_netloc==s_netloc and (d_scheme==s_scheme or d_scheme==''):
return urlparse.ParseResult(s_scheme,s_netloc,d_path,'',d_query,'').geturl()
else:
return ''
def post(self, request):
form = UserCreateForm(request.POST)
if form.is_valid():
email = form.cleaned_data.get('email')
username = form.cleaned_data.get('username')
is_active = form.cleaned_data.get('is_active')
role = form.cleaned_data.get('role')
groups = form.cleaned_data.get('groups')
try:
user = User.objects.create_user(
email=email,
username=username,
is_active=is_active,
role=role,
)
except IntegrityError:
error_msg = '???????????'
groups = UserGroup.objects.only('id', 'name')
role_types = UserRoleType.attrs
status_types = UserStatusType.attrs
context = dict(
error_msg=error_msg,
groups=groups,
role_types=role_types,
status_types=status_types,
)
return render(request, 'users/user_create.html', context)
else:
user.groups.add(*groups)
sign = hashlib.md5(email + settings.SECRET_KEY).hexdigest()
url = urlparse.ParseResult(
scheme=request.scheme,
netloc=urlparse.urlparse(request.get_raw_uri()).netloc,
path=reverse(('core:SetPassword')),
params='',
query = urllib.urlencode({'email': email, 'sign': sign}),
fragment='',
).geturl()
msg = EmailMultiAlternatives(
subject='??????',
body=get_template('users/user_email_activate.html').render({'url': url}),
from_email=settings.EMAIL_HOST_USER,
to=[email,],
)
msg.content_subtype = 'html'
msg.send(fail_silently=True)
return HttpResponseRedirect(reverse('user:UserList'))
else:
groups = UserGroup.objects.only('id', 'name')
role_types = UserRoleType.attrs
status_types = UserStatusType.attrs
context = dict(
groups=groups,
role_types=role_types,
status_types=status_types,
form=form,
)
return render(request, 'users/user_create.html', context)
def parse(s):
'''
Parse a path given as a url. Accepts strings of the form:
s3://bucket-name/path/to/key
file:///path/to/file
/absolution/path/to/file
relative/path/to/file
~/path/from/home/dir/to/file
To avoid surprises, s3:// and file:// URLs should not
include ;, ? or #. You should URL-encode such paths.
Return value is a ParseResult; one of the following:
('s3', bucketname, valid_s3_key, ...)
('file', '', absolute_path_for_current_filesystem, ...)
'''
import re
from urlparse import urlparse, ParseResult
if not isinstance(s, basestring):
raise ValueError("An S3 path must be a string, got %s" % s.__class__.__name__)
is_windows_path = (len(s) >= 2 and s[1] == ':')
if is_windows_path:
scheme, netloc, s3path = 'file', '', s
else:
scheme, netloc, s3path, params, query, fragment = urlparse(s)
if any([params, query, fragment]):
raise ValueError("Invalid URI: %s" % s)
if any(char in ';?#' for char in s):
raise ValueError("Invalid URI: %s" % s)
try:
s3path.encode('UTF-8')
except (UnicodeDecodeError, UnicodeEncodeError):
raise ValueError("Invalid URI (bad unicode): %s" % s)
# If somehow something ever gets uploaded with binary in the
# key, this seems to be the only way to fix it:
# `s3cmd fixbucket s3://bodylabs-korper-assets`
if re.match(r'/\w:', s3path): # urlparse, given file:///C:\foo parses us to /C:\foo, so on reconstruction (on windows) we get C:\C:\foo.
s3path = s3path[1:]
is_windows_path = True
if scheme == '':
scheme = 'file'
if scheme == 'file' and not is_windows_path:
if s3path.endswith(os.sep) or s3path.endswith('/'):
# os.path.abspath strips the trailing '/' so we need to put it back
s3path = os.path.join(os.path.abspath(os.path.expanduser(s3path)), '')
else:
s3path = os.path.abspath(os.path.expanduser(s3path))
if scheme == 's3' and netloc == '':
raise ValueError('s3 urls must specify the bucket')
return ParseResult(scheme, netloc, s3path, params=None, query=None, fragment=None) # pylint: disable=too-many-function-args,unexpected-keyword-arg
def join(base, *additions):
'''
Extends os.path.join so work with s3:// and file:// urls
This inherits a quirk of os.path.join: if 'addition' is
an absolute path, path components of base are thrown away.
'addition' must be an absolute or relative path, not
a URL.
`base` and `addition` can use any path separator, but the
result will always be normalized to os.sep.
'''
from urlparse import urlparse, urljoin, ParseResult
addition = sep.join(additions)
(scheme, netloc, _, params, query, fragment) = urlparse(addition)
if any([scheme, netloc, params, query, fragment]):
raise ValueError('Addition must be an absolute or relative path, not a URL')
if islocal(base):
return os.path.join(parse(base).path, addition.replace(sep, os.sep))
k = parse(base)
# Call urljoin instead of os.path.join, since it uses '/' instead of
# os.sep, which is '\' on Windows.
#
# Given disparity between os.path.join and urljoin, we prefer the
# behavior of os.path.join:
#
# >>> os.path.join('foo/bar', 'baz')
# 'foo/bar/baz'
# >>> urlparse.urljoin('foo/bar', 'baz')
# 'foo/baz'
#
# So we add a trailing slash if there is none
if k.path.endswith(sep):
s3path = urljoin(k.path, addition)
else:
s3path = urljoin(k.path + sep, addition)
return ParseResult(k.scheme, k.netloc, s3path, k.params, k.query, k.fragment).geturl() # pylint: disable=too-many-function-args,unexpected-keyword-arg
def prepare_links_for_insert(links, url, site):
""" Get links dicts and prepare it to insert in MongoDB """
links_to_insert = []
for link in links:
if not link:
continue
link = urlparse(link)
if not link.scheme and \
not link.netloc and \
not link.path and \
not link.query:
continue
if link.netloc \
and link.netloc != site \
and 'www.' + link.netloc != site \
and link.netloc != 'www.' + site:
SpiderCommon._external_hosts.append(link.netloc)
continue
link = SpiderCommon.clear_link(link)
link = SpiderCommon.build_path(link, url.path)
link = SpiderCommon.clear_link(link)
links_to_insert.append(link)
separated_links = []
for link in links_to_insert:
paths = link.path.split("/")
while len(paths) != 1:
del paths[-1]
separated_links.append(
ParseResult(
scheme='',
netloc='',
path="/".join(paths) + '/',
params='',
query='',
fragment=''
)
)
return links_to_insert + separated_links