def strip_subfolder(url):
"""
Strip off the subfolder if it exists so we always use the exact same
share url for saving counts.
"""
subfolder = app.config.get('SUBFOLDER', None)
if not subfolder:
return url
p = urlparse.urlparse(url)
if not p.path.startswith(subfolder):
return url
new_path = p.path.replace('%s' % (subfolder), '', 1)
new_url = urlparse.ParseResult(p.scheme, p.netloc, new_path, p.params,
p.query, p.fragment)
return new_url.geturl()
python类ParseResult()的实例源码
def _build_url(self, endpoint, params={}):
"""Return the full URL for the desired endpoint.
Args:
endpoint (str): the API endpoint after base URL
params (dict): any params to include in the request
Returns:
(str) the full URL of the request
"""
new_params = {'circle-token': self._token}
new_params.update(params)
parsed_url = urlparse(self._base_url)
new_parse = ParseResult(scheme=parsed_url.scheme, netloc=parsed_url.netloc,
path='/'.join((parsed_url.path, endpoint)),
params='', query=urlencode(new_params),
fragment='')
return urlunparse(new_parse)
def normalize_website(cls, w):
from django.core.validators import EMPTY_VALUES
from urlparse import urlparse, urlunparse, ParseResult
w = w.decode('utf-8')
if w in EMPTY_VALUES:
return None
w = w.lower().strip()
if not w.startswith('http://') and not w.startswith('https://'):
w = 'http://' + w.lstrip('/')
try:
parsed = urlparse(w)
except ValueError as e:
return None
else:
new_parsed = ParseResult(scheme='http',
netloc=cls.get_website_tld(w),
path=parsed.path.rstrip('/'),
params='',
query=parsed.query,
fragment='')
return urlunparse(new_parsed)
def post(self, request, pk):
user = User.objects.get(id=pk)
sign = hashlib.md5(user.email + settings.SECRET_KEY).hexdigest()
url = urlparse.ParseResult(
scheme=request.scheme,
netloc=urlparse.urlparse(request.get_raw_uri()).netloc,
path=reverse(('core:SetPassword')),
params='',
query = urllib.urlencode({'email': user.email, 'sign': sign}),
fragment='',
).geturl()
msg = EmailMultiAlternatives(
subject='??????',
body=get_template('users/user_email_activate.html').render({'url': url}),
from_email=settings.EMAIL_HOST_USER,
to=[user.email,],
)
msg.content_subtype = 'html'
status = msg.send(fail_silently=True)
response = '??????' if status else '??????, ???'
return HttpResponse(response)
def build_path(link, url_path):
""" Build link with full path (for relatively links) """
if link.path[0:1] == '/':
return link
path = link.path
path = SpiderCommon.del_file_from_path(url_path) + "/" + path
return ParseResult(
scheme=link.scheme,
netloc=link.netloc,
path=path,
params=link.params,
query=link.query,
fragment=link.fragment
)
def replace_qs_param(url, params):
""" Add GET params to provided URL being aware of existing.
from: http://stackoverflow.com/a/25580545
:param url: string of target URL
:param params: dict containing requested params to be added
:return: string with updated URL
"""
url = urllib.unquote(url)
parsed_url = urlparse(url)
get_args = parsed_url.query
# Converting URL arguments to dict and update
parsed_get_args = dict(parse_qsl(get_args))
parsed_get_args.update(params)
# Converting URL argument to proper query string
encoded_get_args = urllib.urlencode(parsed_get_args, doseq=True)
new_url = ParseResult(
parsed_url.scheme, parsed_url.netloc, parsed_url.path,
parsed_url.params, encoded_get_args, parsed_url.fragment
).geturl()
return new_url
def make_url(base_url, url):
if urlparse.urlsplit(url).scheme == '':
url = urlparse.urljoin(base_url, url)
if 'HLS_PLAYER_SHIFT_PORT' in os.environ.keys():
shift = int(os.environ['HLS_PLAYER_SHIFT_PORT'])
p = urlparse.urlparse(url)
loc = p.netloc
if loc.find(":") != -1:
loc, port = loc.split(':')
port = int(port) + shift
loc = loc + ":" + str(port)
elif p.scheme == "http":
port = 80 + shift
loc = loc + ":" + str(shift)
p = urlparse.ParseResult(scheme=p.scheme,
netloc=loc,
path=p.path,
params=p.params,
query=p.query,
fragment=p.fragment)
url = urlparse.urlunparse(p)
return url
def validate_url(self, url):
url_path = urllib.quote(url.path, safe=b"/%")
url_query = urllib.quote(url.query, safe=b"?=&")
url = ParseResult(url.scheme, url.netloc, url_path,
url.params, url_query, url.fragment)
has_hostname = url.hostname is not None and len(url.hostname) > 0
has_http_scheme = url.scheme in ("http", "https")
has_path = not len(url.path) or url.path.startswith("/")
if not (has_hostname and has_http_scheme and has_path):
raise NotSupported("invalid url: %s" % repr(url))
return url
def test_marshal_uri(self):
"""
Should parse a 'uri' value into a `ParseResult`.
"""
uri = 'https://media.giphy.com/media/22kxQ12cxyEww/giphy.gif?something=variable'
marshalled, val = upnp.marshal.marshal_value('uri', uri)
self.assertTrue(marshalled)
self.assertIsInstance(val, ParseResult)
def test_result_pairs(self):
# Check encoding and decoding between result pairs
result_types = [
urlparse.DefragResult,
urlparse.SplitResult,
urlparse.ParseResult,
]
for result_type in result_types:
self._check_result_type(result_type)
def parse_url(url):
url = urlparse.urlparse(url)
if not url.scheme and not url.netloc and url.path:
netloc = url.path
if ':' not in netloc:
netloc = '{}:8765'.format(netloc)
return urlparse.ParseResult('http', netloc, '/', '', '', '')
return url
# Defined in phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
def add_url_params(url, params):
""" Add GET params to provided URL being aware of existing.
:param url: string of target URL
:param params: dict containing requested params to be added
:return: string with updated URL
>> url = 'http://stackoverflow.com/test?answers=true'
>> new_params = {'answers': False, 'data': ['some','values']}
>> add_url_params(url, new_params)
'http://stackoverflow.com/test?data=some&data=values&answers=false'
"""
# Unquoting URL first so we don't loose existing args
url = unquote(url)
# Extracting url info
parsed_url = urlparse(url)
# Extracting URL arguments from parsed URL
get_args = parsed_url.query
# Converting URL arguments to dict
parsed_get_args = dict(parse_qsl(get_args))
# Merging URL arguments dict with new params
parsed_get_args.update(params)
# Bool and Dict values should be converted to json-friendly values
# you may throw this part away if you don't like it :)
parsed_get_args.update(
{k: dumps(v) for k, v in parsed_get_args.items()
if isinstance(v, (bool, dict))}
)
# Converting URL argument to proper query string
encoded_get_args = urlencode(parsed_get_args, doseq=True)
# Creating new parsed result object based on provided with new
# URL arguments. Same thing happens inside of urlparse.
new_url = ParseResult(
parsed_url.scheme, parsed_url.netloc, parsed_url.path,
parsed_url.params, encoded_get_args, parsed_url.fragment
).geturl()
return new_url
def __init__(self, init):
"""
????urlparse.ParseResult?init????
"""
self.query = init.query
self.params = init.params
self.fragment = init.fragment
self.path = init.path
self.netloc = init.netloc
self.scheme = init.scheme
def to_standard(self):
"""
???????urlparse.ParseResult??
"""
return ParseResult(
scheme=self.scheme,
netloc=self.netloc,
path=self.path,
params=self.params,
query=self.query,
fragment=self.fragment
)
def test_link_allowed(self):
Registry().set('allow_regexp', re.compile('allowed'))
assert bool(
self.model._link_allowed(
ParseResult(path="/abc/allowed.php", scheme='', netloc='', params='', query='', fragment='')))
assert not bool(
self.model._link_allowed(
ParseResult(path="/denied2.php", scheme='', netloc='', params='', query='', fragment='')))
def test_build_path(self):
test_link = ParseResult(path="/abc/allowed.php", scheme='', netloc='', params='', query='', fragment='')
assert self.model.build_path(test_link, "def") == test_link
test_link = ParseResult(path="abc/allowed.php", scheme='', netloc='', params='', query='', fragment='')
check_link = ParseResult(path="/a/abc/allowed.php", scheme='', netloc='', params='', query='', fragment='')
assert self.model.build_path(test_link, "/a/") == check_link
def test_clear_link(self):
test_link = ParseResult(
path="/ab\\c//./d/../allowed.php", scheme='', netloc='', params='', query='?a=b&c=d', fragment='')
check_link = ParseResult(
path="/ab/c/allowed.php", scheme='', netloc='', params='', query='?a=b&c=d', fragment='')
assert self.model.clear_link(test_link) == check_link
def replicate_per_farm_dbs(cloud_url=None, local_url=None, farm_name=None):
"""
Sete up replication of the per-farm databases from the local server to the
cloud server.
:param str cloud_url: Used to override the cloud url from the global
configuration in case the calling function is in the process of
initializing the cloud server
:param str local_url: Used to override the local url from the global
configuration in case the calling function is in the process of
initializing the local server
:param str farm_name: Used to override the farm name from the global
configuratino in case the calling function is in the process of
initializing the farm
"""
cloud_url = cloud_url or config["cloud_server"]["url"]
local_url = local_url or config["local_server"]["url"]
farm_name = farm_name or config["cloud_server"]["farm_name"]
username = config["cloud_server"]["username"]
password = config["cloud_server"]["password"]
# Add credentials to the cloud url
parsed_cloud_url = urlparse(cloud_url)
if not parsed_cloud_url.username:
new_netloc = "{}:{}@{}".format(
username, password, parsed_cloud_url.netloc
)
cloud_url = ParseResult(
parsed_cloud_url.scheme, new_netloc, parsed_cloud_url.path,
parsed_cloud_url.params, parsed_cloud_url.query,
parsed_cloud_url.fragment
).geturl()
server = Server(local_url)
for db_name in per_farm_dbs:
remote_db_name = "{}/{}/{}".format(username, farm_name, db_name)
server.replicate(
db_name, db_name, urljoin(cloud_url, remote_db_name),
continuous=True
)
def open_url(url):
ru = urlparse.urlparse(url)
pu = urlparse.ParseResult('', '', ru.path, ru.params, ru.query, ru.fragment)
if ru.scheme == 'https':
c = httplib.HTTPSConnection(ru.netloc)
else:
c = httplib.HTTPConnection(ru.netloc)
c.putrequest('GET', pu.geturl())
c.putheader('User-Agent', FLAGS.user_agent)
c.endheaders()
return c.getresponse()
def construct_page_url(elements, params, index, limit):
if params and index is not None:
params[RESULT_LIMIT_PARAMETER] = limit
params[RESULT_OFFSET_PARAMETER] = index * limit
query = urlencode(params, True)
inputs = ParseResult(
elements.scheme,
elements.netloc,
elements.path,
elements.params,
query,
elements.fragment,
)
return urlunparse(inputs)
return None
def urlparams(url_, fragment=None, query_dict=None, **query):
"""
Add a fragment and/or query parameters to a URL.
New query params will be appended to exising parameters, except duplicate
names, which will be replaced.
"""
url_ = urlparse.urlparse(url_)
fragment = fragment if fragment is not None else url_.fragment
q = url_.query
new_query_dict = (QueryDict(smart_str(q), mutable=True) if
q else QueryDict('', mutable=True))
if query_dict:
for k, l in query_dict.lists():
new_query_dict[k] = None # Replace, don't append.
for v in l:
new_query_dict.appendlist(k, v)
for k, v in query.items():
# Replace, don't append.
if isinstance(v, list):
new_query_dict.setlist(k, v)
else:
new_query_dict[k] = v
query_string = urlencode([(k, v) for k, l in new_query_dict.lists() for
v in l if v is not None])
new = urlparse.ParseResult(url_.scheme, url_.netloc, url_.path,
url_.params, query_string, fragment)
return new.geturl()
def gen_urls(p, value, target_param=''):
# Make a different URL for each query argument
query = parse_qs(p.query.encode('utf-8'))
url_list = list()
for param in query.keys():
if target_param == '' or target_param == param:
new_query_d = copy(query) # Copy of query dictionary
new_query_d[param] = value
new_query = urlencode(new_query_d, doseq=True) # New query
# Gen and add new url to url list
url = ParseResult(p.scheme, p.netloc, p.path, p.params,
new_query, p.fragment).geturl()
url_list.append((url, param))
return url_list # Return full list of all generated urls
def __init__(self, post_editing_source, post_editing_reference, notebook, grid, output_directory):
self.post_editing_source = post_editing_source
self.post_editing_reference = post_editing_reference
self.translation_tab_grid = grid
self.notebook = notebook
self.modified_references = []
self.saved_modified_references = []
self.visibility_of_statistics_menu = True
self.output_directory = output_directory
self.tables = {}
self.source_log = {}
self.HTML_view = WebKit.WebView()
uri = "statistics/generated/stats.html"
uri = os.path.realpath(uri)
uri = urlparse.ParseResult('file', '', uri, '', '', '')
uri = urlparse.urlunparse(uri)
self.HTML_view.load_uri(uri)
filename = post_editing_reference[post_editing_reference.rfind('/'):]
filename_without_extension = os.path.splitext(filename)[0]
filename_extension = os.path.splitext(filename)[1]
self.saved_origin_filepath = self.output_directory + filename
self.tables["translation_table"] = Table("translation_table",self.post_editing_source,self.post_editing_reference, self.preparePostEditingAnalysis_event,self.preparePostEditingAnalysis, self.calculate_statistics_event, self.translation_tab_grid, self.output_directory)
self.source_log_filepath = self.output_directory + '/source_log.json'
shutil.rmtree("./statistics/generated", ignore_errors=True)
os.makedirs(os.path.abspath("statistics/generated"))
self.translation_tab_grid.show_all()
self.tables["translation_table"].save_post_editing_changes_button.hide()
self.tables["translation_table"].statistics_button.hide()
self.tables["translation_table"].insertions_statistics_button.hide()
self.tables["translation_table"].deletions_statistics_button.hide()
self.tables["translation_table"].time_statistics_button.hide()
def reparse_url(parsed_url, query_params):
return urlparse.ParseResult(
scheme=parsed_url.scheme,
netloc=parsed_url.netloc,
path=parsed_url.path,
params=parsed_url.params,
fragment=parsed_url.fragment,
query=urllib.urlencode(query_params, doseq=True))
def make_support_link(email, appname='SLiM'):
support_link_parts = ParseResult(
scheme='mailto',
netloc='',
path=email,
params='',
query='subject=' + quote('[%s] Support request' % appname),
# query=urlencode({'subject': '[%s] Support request' % appname},
# quote_via=quote),
fragment='',
)
return urlunparse(support_link_parts)
def sqlite_uri_for(path):
uri_parts = ParseResult(
scheme='sqlite',
netloc='/',
path=path,
params='',
query='',
fragment='')
return urlunparse(uri_parts)
def __init__(self, url):
if isinstance(url, ParseResult) or isinstance(url, SplitResult):
self.url_parsed = url
self.url_raw = None
else:
self.url_raw = url
self.url_parsed = None
def objectify(self, url):
if url is None:
return None
if isinstance(url, URL):
return url
else:
return URL(url)
# To deal with all kind of methods/properties in the ParseResult
# class
def unauth(self):
if not self.is_auth():
return self
return URL.objectify(ParseResult(
self.scheme,
'%s:%s' % (self.hostname,
self.port or {'https': 443, 'http': 80}[self.scheme]),
self.path.replace('//', '/'), self.params, self.query,
self.fragment))
def join(self, path):
"""
assumes this object is the base URL or base path. If the path
is relative, it should be appended to the base. If the path
is absolute, it should be added to the connection details of
self. If the path already contains connection details and the
connection details differ from self, raise an error.
"""
pathAsString = str(path)
if not path or not pathAsString:
return self
path = URL.objectify(path)
if (
(path.scheme and self.scheme and path.scheme != self.scheme) or
(path.hostname and self.hostname and
path.hostname != self.hostname) or
(path.port and self.port and path.port != self.port)
):
raise ValueError("%s can't be joined with %s" % (self, path))
if path.path[0] == '/':
ret_path = uc2utf8(path.path)
else:
sep = "/"
if self.path.endswith("/"):
sep = ""
ret_path = "%s%s%s" % (self.path, sep, uc2utf8(path.path))
return URL(ParseResult(
self.scheme or path.scheme, self.netloc or path.netloc, ret_path,
path.params, path.query, path.fragment))