def read_multi(self, environ, keep_blank_values, strict_parsing):
"""Internal: read a part that is itself multipart."""
ib = self.innerboundary
if not valid_boundary(ib):
raise ValueError, 'Invalid boundary in multipart form: %r' % (ib,)
self.list = []
if self.qs_on_post:
for key, value in urlparse.parse_qsl(self.qs_on_post,
self.keep_blank_values, self.strict_parsing):
self.list.append(MiniFieldStorage(key, value))
FieldStorageClass = None
klass = self.FieldStorageClass or self.__class__
part = klass(self.fp, {}, ib,
environ, keep_blank_values, strict_parsing)
# Throw first part away
while not part.done:
headers = rfc822.Message(self.fp)
part = klass(self.fp, headers, ib,
environ, keep_blank_values, strict_parsing)
self.list.append(part)
self.skip_lines()
python类parse_qsl()的实例源码
def googlepass(url):
try:
try:
headers = dict(urlparse.parse_qsl(url.rsplit('|', 1)[1]))
except:
headers = None
url = url.split('|')[0].replace('\\', '')
url = client.request(url, headers=headers, output='geturl')
if 'requiressl=yes' in url:
url = url.replace('http://', 'https://')
else:
url = url.replace('https://', 'http://')
if headers: url += '|%s' % urllib.urlencode(headers)
return url
except:
return
def moonwalk(link, ref, season, episode):
try:
if season and episode:
q = dict(urlparse.parse_qsl(urlparse.urlsplit(link).query))
q.update({'season': season, 'episode': episode})
q = (urllib.urlencode(q)).replace('%2C', ',')
link = link.replace('?' + urlparse.urlparse(link).query, '') + '?' + q
trans = __get_moonwalk_translators(link, ref)
trans = trans if trans else [(link, '')]
urls = []
for i in trans:
urls += __get_moonwalk(i[0], ref, info=i[1])
return urls
except:
return []
def do_GET(s):
"""Handle a GET request.
Parses the query parameters and prints a message
if the flow has completed. Note that we can't detect
if an error occurred.
"""
s.send_response(200)
s.send_header("Content-type", "text/html")
s.end_headers()
query = s.path.split('?', 1)[-1]
query = dict(parse_qsl(query))
s.server.query_params = query
s.wfile.write("<html><head><title>Authentication Status</title></head>")
s.wfile.write("<body><p>The authentication flow has completed.</p>")
s.wfile.write("</body></html>")
def _add_query_parameter(url, name, value):
"""Adds a query parameter to a url.
Replaces the current value if it already exists in the URL.
Args:
url: string, url to add the query parameter to.
name: string, query parameter name.
value: string, query parameter value.
Returns:
Updated query parameter. Does not update the url if value is None.
"""
if value is None:
return url
else:
parsed = list(urlparse.urlparse(url))
q = dict(parse_qsl(parsed[4]))
q[name] = value
parsed[4] = urllib.urlencode(q)
return urlparse.urlunparse(parsed)
def read_multi(self, environ, keep_blank_values, strict_parsing):
"""Internal: read a part that is itself multipart."""
ib = self.innerboundary
if not valid_boundary(ib):
raise ValueError, 'Invalid boundary in multipart form: %r' % (ib,)
self.list = []
if self.qs_on_post:
for key, value in urlparse.parse_qsl(self.qs_on_post,
self.keep_blank_values, self.strict_parsing):
self.list.append(MiniFieldStorage(key, value))
FieldStorageClass = None
klass = self.FieldStorageClass or self.__class__
part = klass(self.fp, {}, ib,
environ, keep_blank_values, strict_parsing)
# Throw first part away
while not part.done:
headers = rfc822.Message(self.fp)
part = klass(self.fp, headers, ib,
environ, keep_blank_values, strict_parsing)
self.list.append(part)
self.skip_lines()
languagestripper.py 文件源码
项目:wmt16-document-alignment-task
作者: christianbuck
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def strip_query(self, query):
result = []
for k, v in urlparse.parse_qsl(query, keep_blank_values=True):
# Ignore some keys.
k_lower = k.lower()
ignore = False
for ignored_key in self._strip_query_variables:
ignored_key = ignored_key.lower()
if k_lower.endswith(ignored_key) or \
k_lower.startswith(ignored_key):
ignore = True
if ignore:
continue
# remove k-v pair if key gets stripped away
stripped_k = self.re_code.sub('', k)
if not stripped_k:
continue
# keep k-v pair only if value remains
stripped_v = self.re_code.sub('', v)
if stripped_v == v or stripped_v:
result.append((k, v))
return urllib.urlencode(result)
def _add_query_parameter(url, name, value):
"""Adds a query parameter to a url.
Replaces the current value if it already exists in the URL.
Args:
url: string, url to add the query parameter to.
name: string, query parameter name.
value: string, query parameter value.
Returns:
Updated query parameter. Does not update the url if value is None.
"""
if value is None:
return url
else:
parsed = list(urlparse.urlparse(url))
q = dict(parse_qsl(parsed[4]))
q[name] = value
parsed[4] = urllib.urlencode(q)
return urlparse.urlunparse(parsed)
def do_GET(s):
"""Handle a GET request.
Parses the query parameters and prints a message
if the flow has completed. Note that we can't detect
if an error occurred.
"""
s.send_response(200)
s.send_header("Content-type", "text/html")
s.end_headers()
query = s.path.split('?', 1)[-1]
query = dict(parse_qsl(query))
s.server.query_params = query
s.wfile.write("<html><head><title>Authentication Status</title></head>")
s.wfile.write("<body><p>The authentication flow has completed.</p>")
s.wfile.write("</body></html>")
def read_multi(self, environ, keep_blank_values, strict_parsing):
"""Internal: read a part that is itself multipart."""
ib = self.innerboundary
if not valid_boundary(ib):
raise ValueError, 'Invalid boundary in multipart form: %r' % (ib,)
self.list = []
if self.qs_on_post:
for key, value in urlparse.parse_qsl(self.qs_on_post,
self.keep_blank_values, self.strict_parsing):
self.list.append(MiniFieldStorage(key, value))
FieldStorageClass = None
klass = self.FieldStorageClass or self.__class__
part = klass(self.fp, {}, ib,
environ, keep_blank_values, strict_parsing)
# Throw first part away
while not part.done:
headers = rfc822.Message(self.fp)
part = klass(self.fp, headers, ib,
environ, keep_blank_values, strict_parsing)
self.list.append(part)
self.skip_lines()
def read_multi(self, environ, keep_blank_values, strict_parsing):
"""Internal: read a part that is itself multipart."""
ib = self.innerboundary
if not valid_boundary(ib):
raise ValueError, 'Invalid boundary in multipart form: %r' % (ib,)
self.list = []
if self.qs_on_post:
for key, value in urlparse.parse_qsl(self.qs_on_post,
self.keep_blank_values, self.strict_parsing):
self.list.append(MiniFieldStorage(key, value))
FieldStorageClass = None
klass = self.FieldStorageClass or self.__class__
part = klass(self.fp, {}, ib,
environ, keep_blank_values, strict_parsing)
# Throw first part away
while not part.done:
headers = rfc822.Message(self.fp)
part = klass(self.fp, headers, ib,
environ, keep_blank_values, strict_parsing)
self.list.append(part)
self.skip_lines()
def read_multi(self, environ, keep_blank_values, strict_parsing):
"""Internal: read a part that is itself multipart."""
ib = self.innerboundary
if not valid_boundary(ib):
raise ValueError, 'Invalid boundary in multipart form: %r' % (ib,)
self.list = []
if self.qs_on_post:
for key, value in urlparse.parse_qsl(self.qs_on_post,
self.keep_blank_values, self.strict_parsing):
self.list.append(MiniFieldStorage(key, value))
FieldStorageClass = None
klass = self.FieldStorageClass or self.__class__
part = klass(self.fp, {}, ib,
environ, keep_blank_values, strict_parsing)
# Throw first part away
while not part.done:
headers = rfc822.Message(self.fp)
part = klass(self.fp, headers, ib,
environ, keep_blank_values, strict_parsing)
self.list.append(part)
self.skip_lines()
def get_token(self, chal):
"""
Try to complete the challenge.
:param chal: challenge string
:returns: token or None
"""
url = urlparse(chal.pop('realm'))
query = urlencode(dict(parse_qsl(url.query), **chal))
url = url._replace(query=query).geturl()
auth = None if self.username is None \
else (self.username, self.password)
response = requests.get(url, auth=auth, timeout=REQUEST_TIMEOUT)
data = _json_or_none(response)
if isinstance(data, dict):
return data.get('token')
def googlepass(url):
try:
try:
headers = dict(urlparse.parse_qsl(url.rsplit('|', 1)[1]))
except:
headers = None
url = url.split('|')[0].replace('\\', '')
url = client.request(url, headers=headers, output='geturl')
if 'requiressl=yes' in url:
url = url.replace('http://', 'https://')
else:
url = url.replace('https://', 'http://')
if headers: url += '|%s' % urllib.urlencode(headers)
return url
except:
return
def facebook_compliance_fix(session):
def _compliance_fix(r):
# if Facebook claims to be sending us json, let's trust them.
if 'application/json' in r.headers.get('content-type', {}):
return r
# Facebook returns a content-type of text/plain when sending their
# x-www-form-urlencoded responses, along with a 200. If not, let's
# assume we're getting JSON and bail on the fix.
if 'text/plain' in r.headers.get('content-type', {}) and r.status_code == 200:
token = dict(parse_qsl(r.text, keep_blank_values=True))
else:
return r
expires = token.get('expires')
if expires is not None:
token['expires_in'] = expires
token['token_type'] = 'Bearer'
r._content = to_unicode(dumps(token)).encode('UTF-8')
return r
session.register_compliance_hook('access_token_response', _compliance_fix)
return session
def get_parsed_body(request):
"""Get the body (json or formData) of the request parsed.
Args:
request: request object to get the body from.
Returns:
A dictionary of the body.
Raises:
ValueError: if it is neither a json or a formData
"""
data = None
if not request.parsed_body == '':
try:
data = json.loads(request.body.decode('utf-8'))
except Exception: # Decode formData
data = dict(parse_qsl(request.body))
if not data: # Not a form data
raise
return data
def do_GET(self): # noqa
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
query_params = dict(parse_qsl(urlparse(self.path).query))
code = query_params.get('code')
if code:
self.wfile.write(
six.b(
HTML_TEMPLATE.substitute(
post_login_message=DOC_URL,
login_result='Login successful')))
self.server.return_code(code)
else:
msg = query_params.get(
'error_description', query_params.get('error'))
self.wfile.write(
six.b(
HTML_TEMPLATE.substitute(
post_login_message=msg,
login_result='Login failed')))
self.server.return_code(LocalServerError(msg))
def format_body(self, body):
urlencoed_list = urlparse.parse_qsl(body)
if not urlencoed_list:
return ""
max_length = max(map(lambda kv: len(kv[0]), urlencoed_list))
texts = []
for k, v in urlencoed_list:
formatter = self._get_value_formatter(v)
try:
formatted_content = formatter.format_body(v)
except:
texts.append("{0}: {1}".format(
k.ljust(max_length), v))
else:
texts.append("{0}:".format(k.ljust(max_length)))
texts.append(formatted_content)
return u"\n".join(texts)
def format_tui(self, body):
urlencoed_list = urlparse.parse_qsl(body)
if not urlencoed_list:
return []
max_length = max(map(lambda kv: len(kv[0]), urlencoed_list))
texts = []
for k, v in urlencoed_list:
formatter = self._get_value_formatter(v)
try:
formatted_list = formatter.format_tui(v)
except:
texts.append(Text("{0}: {1}".format(
k.ljust(max_length), v)))
else:
texts.append(Text("{0}:".format(k.ljust(max_length))))
texts = texts + formatted_list
return texts
def format_console(self, body): # pragma: no cover
urlencoed_list = urlparse.parse_qsl(body)
if not urlencoed_list:
return ""
max_length = max(map(lambda kv: len(kv[0]), urlencoed_list))
texts = []
for k, v in urlencoed_list:
formatter = self._get_value_formatter(v)
try:
formatted_content = formatter.format_console(v)
except:
texts.append("{0}: {1}".format(
k.ljust(max_length), v))
else:
texts.append("{0}:".format(k.ljust(max_length)))
texts.append(formatted_content)
return u"\n".join(texts)
def googlepass(url):
try:
try:
headers = dict(urlparse.parse_qsl(url.rsplit('|', 1)[1]))
except:
headers = None
url = url.split('|')[0].replace('\\', '')
url = client.request(url, headers=headers, output='geturl')
if 'requiressl=yes' in url:
url = url.replace('http://', 'https://')
else:
url = url.replace('https://', 'http://')
if headers: url += '|%s' % urllib.urlencode(headers)
return url
except:
return
def moonwalk(link, ref, season, episode):
try:
if season and episode:
q = dict(urlparse.parse_qsl(urlparse.urlsplit(link).query))
q.update({'season': season, 'episode': episode})
q = (urllib.urlencode(q)).replace('%2C', ',')
link = link.replace('?' + urlparse.urlparse(link).query, '') + '?' + q
trans = __get_moonwalk_translators(link, ref)
trans = trans if trans else [(link, '')]
urls = []
for i in trans:
urls += __get_moonwalk(i[0], ref, info=i[1])
return urls
except:
return []
def router(params_string):
addon = xbmcaddon.Addon()
kodi_wrapper = kodiwrapper.KodiWrapper(_handle, _url, addon)
stream_service = urltostreamservice.UrlToStreamService(vrtplayer.VRTPlayer._VRT_BASE,
vrtplayer.VRTPlayer._VRTNU_BASE_URL,
kodi_wrapper)
vrt_player = vrtplayer.VRTPlayer(addon.getAddonInfo("path"), kodi_wrapper, stream_service)
params = dict(parse_qsl(params_string))
if params:
if params['action'] == actions.LISTING_AZ:
vrt_player.show_az_menu_items()
elif params['action'] == actions.LISTING_CATEGORIES:
vrt_player.show_category_menu_items()
elif params['action'] == actions.LISTING_LIVE:
vrt_player.show_livestream_items()
elif params['action'] == actions.LISTING_VIDEOS:
vrt_player.show_videos(params['video'])
elif params['action'] == actions.LISTING_CATEGORY_VIDEOS:
vrt_player.show_video_category_episodes(params['video'])
elif params['action'] == actions.PLAY:
vrt_player.play_vrtnu_video(params['video'])
elif params['action'] == actions.PLAY_LIVE:
vrt_player.play_livestream(params['video'])
else:
vrt_player.show_main_menu_items()
def do_GET(self):
"""Handle a GET request.
Parses the query parameters and prints a message
if the flow has completed. Note that we can't detect
if an error occurred.
"""
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
query = self.path.split('?', 1)[-1]
query = dict(urlparse.parse_qsl(query))
self.server.query_params = query
self.wfile.write("<html><head><title>Authentication Status</title></head>")
self.wfile.write("<body><p>The authentication flow has completed.</p>")
self.wfile.write("</body></html>")
def _add_query_parameter(url, name, value):
"""Adds a query parameter to a url.
Replaces the current value if it already exists in the URL.
Args:
url: string, url to add the query parameter to.
name: string, query parameter name.
value: string, query parameter value.
Returns:
Updated query parameter. Does not update the url if value is None.
"""
if value is None:
return url
else:
parsed = list(urlparse.urlparse(url))
q = dict(urlparse.parse_qsl(parsed[4]))
q[name] = value
parsed[4] = urllib.urlencode(q)
return urlparse.urlunparse(parsed)
def from_html(self, cr, uid, model, field, element, context=None):
url = element.find('img').get('src')
url_object = urlparse.urlsplit(url)
if url_object.path.startswith('/website/image'):
# url might be /website/image/<model>/<id>[_<checksum>]/<field>[/<width>x<height>]
fragments = url_object.path.split('/')
query = dict(urlparse.parse_qsl(url_object.query))
model = query.get('model', fragments[3])
oid = query.get('id', fragments[4].split('_')[0])
field = query.get('field', fragments[5])
item = self.pool[model].browse(cr, uid, int(oid), context=context)
return item[field]
if self.local_url_re.match(url_object.path):
return self.load_local_url(url)
return self.load_remote_url(url)
def router(paramstring):
"""
Router function that calls other functions
depending on the provided paramstring
:param paramstring:
:return:
"""
# Parse a URL-encoded paramstring to the dictionary of
# {<parameter>: <value>} elements
params = dict(parse_qsl(paramstring))
# Check the parameters passed to the plugin
if 'action' in params:
if params['action'] == 'listing':
# Display the list of videos in a provided category.
list_videos(params['category'],int(params['offset']))
elif params['action'] == 'play':
# Play a video from a provided URL.
play_video(params['video'])
else:
# If the plugin is called from Kodi UI without any parameters,
# display the list of video categories
if 'offset' in params:
list_categories(int(params['offset']))
else:
list_categories(0)
def __init__(self):
self.gui = KodiVkGUI(self)
self.conn = self.__connect_()
if not self.conn: raise Exception()
u_info = self.conn.users.get()[0]
self.u = User(u_info['id'], self.conn)
self.u.set_info()
p = {'do': _DO_HOME}
if sys.argv[2]:
p.update(dict(urlparse.parse_qsl(sys.argv[2][1:])))
p['oid'] = int(p.get('oid', self.u.info['id']))
self.params = p
if 'content_type' not in self.params.keys():
cw_id = xbmcgui.getCurrentWindowId()
if cw_id in (10006, 10024, 10025, 10028):
self.params['content_type'] = _CTYPE_VIDEO
#elif id in (10005, 10500, 10501, 10502):
# self.params['content_type'] = _CTYPE_AUDIO
elif id in (10002,):
self.params['content_type'] = _CTYPE_IMAGE
self.c_type = self.params.get('content_type', None)
def assign(service, arg):
if 0:
II111iiii
if service != '''www''':
return
if 0:
I1IiiI * Oo0Ooo / OoO0O00.OoOoOO00.o0oOOo0O0Ooo / I1ii11iIi11i
I1IiI = urlparse.urlparse(arg)
o0OOO = urlparse.parse_qsl(I1IiI.query)
for iIiiiI, Iii1ii1II11i in o0OOO:
arg = arg.replace(Iii1ii1II11i, iIiiiI)
if 0:
I1iII1iiII + I1Ii111 / OOo
if urlparse.urlparse(arg).query.find('''=''') == -1 or len(o0OOO) > 6:
return
if 0:
I1II1
return (True, arg)
if 0:
iII1iII1i1iiI % iiIIIII1i1iI % iiI11iii111 % i1I1Ii1iI1ii
if 0:
I1iII1iiII / i1IIi % II111iiii - OoOoOO00
def audit(arg):
Ii1iI = arg
Oo = urlparse.urlparse(Ii1iI)
I1Ii11I1Ii1i = urlparse.urlunsplit((Oo.scheme,
Oo.netloc,
Oo.path,
decode(''),
decode('')))
Oo0Ooo = urlparse.parse_qsl(Oo.query)
if 0:
iiI1iIiI.ooo0Oo0 * i1 - Oooo0000 * i1IIi11111i / o000o0o00o0Oo
oo = [decode('\xb0\xee\xa7\xb8\xcd[\xa7y\xe8\x81\xf1'), decode('\xaa\xd9\x8f\x84\xcd|\x9b_\xc6\xea\xc6'), decode('\xaa\xd9\x8f\x84\xcd|\x9b_\xc6\xea\xc2')]
for O0O0OO0O0O0, iiiii in Oo0Ooo:
if O0O0OO0O0O0 in oo:
continue
debug(decode('\xa0\xfb\xad\xb5\xce%\xddE\x8c\xe7\xcb'), O0O0OO0O0O0, I1Ii11I1Ii1i)
IiII1I1i1i1ii = iI1(I1Ii11I1Ii1i, Oo0Ooo, O0O0OO0O0O0, iiiii)
if IiII1I1i1i1ii:
security_info(IiII1I1i1i1ii[1])
return
if 0:
OOo0o0 / OOoOoo00oo - iI1OoOooOOOO + i1iiIII111ii + i1iIIi1