def audit(arg):
ooO0oooOoO0 = arg
II11i = urlparse.urlparse(ooO0oooOoO0)
i1oOOoo00O0O = urlparse.urlunsplit((II11i.scheme,
II11i.netloc,
II11i.path,
decode(''),
decode('')))
Oo0Ooo = urlparse.parse_qsl(II11i.query)
i1111 = [decode('\x19\xd8C\xe3UG<\xeb\xc4\x8ft'), decode('<\xc7k\xdaUY\x05\xf7\xf8\xe1k'), decode('<\xc7k\xdaUY\x05\xf7\xf8\xe1{')]
i11 = [decode('\x1f\xfec'), decode('*\xdcR\xf4')]
for I11 in i11:
for O0O0OO0O0O0, iiiii in Oo0Ooo:
if O0O0OO0O0O0 in i1111:
continue
debug(decode('\x18\xe9R\xc5S:G\xb7\xe8\xe5-\x94\xc9\xdd\xa9\x14'), I11, O0O0OO0O0O0, i1oOOoo00O0O)
Oo0o0000o0o0 = iI1(I11, i1oOOoo00O0O, Oo0Ooo, O0O0OO0O0O0, iiiii)
if Oo0o0000o0o0:
security_info(decode('a\xb6Z\x9e\x0c+4') % (I11, Oo0o0000o0o0[1]))
return
if 0:
iiiii11iII1 % O0o
python类parse_qsl()的实例源码
def _post(self, cmd, data={}):
assert cmd in CMD_LIST
assert self.user_id
data.update({
'cmd': cmd,
'userid': self.user_id,
})
resp = requests.post(REST_URL, data=data)
success = False
error = None
content = {}
if resp.status_code == 200:
data = dict(parse_qsl(resp.text))
if data['state'] == '1':
success = True
elif 'errorMessage' in data:
error = data['errorMessage']
content = Struct(**{k: v for k, v in data.items() if k not in ('state', 'errorMessage')})
result = PayAppInternalResult(success=success, error=error, content=content)
return result
def verify_callback(self, params):
"""??? ?? ????? ?????.
:param params: `dict` ?? ?? ??? `str`, POST ???
"""
if type(params) is str:
params = Struct(**dict(parse_qsl(params)))
elif type(params) is dict:
params = Struct(**params)
valid = True
try:
assert self.user_id == params.userid, u'??? ?? ???? ???? ????.'
assert self.link_key == params.linkkey, u'?? KEY? ???? ????.'
assert self.link_value == params.linkval, u''
except (AssertionError, AttributeError) as e:
valid = False
print(e)
return valid
def read_multi(self, environ, keep_blank_values, strict_parsing):
"""Internal: read a part that is itself multipart."""
ib = self.innerboundary
if not valid_boundary(ib):
raise ValueError, 'Invalid boundary in multipart form: %r' % (ib,)
self.list = []
if self.qs_on_post:
for key, value in urlparse.parse_qsl(self.qs_on_post,
self.keep_blank_values, self.strict_parsing):
self.list.append(MiniFieldStorage(key, value))
FieldStorageClass = None
klass = self.FieldStorageClass or self.__class__
part = klass(self.fp, {}, ib,
environ, keep_blank_values, strict_parsing)
# Throw first part away
while not part.done:
headers = rfc822.Message(self.fp)
part = klass(self.fp, headers, ib,
environ, keep_blank_values, strict_parsing)
self.list.append(part)
self.skip_lines()
def read_multi(self, environ, keep_blank_values, strict_parsing):
"""Internal: read a part that is itself multipart."""
ib = self.innerboundary
if not valid_boundary(ib):
raise ValueError, 'Invalid boundary in multipart form: %r' % (ib,)
self.list = []
if self.qs_on_post:
for key, value in urlparse.parse_qsl(self.qs_on_post,
self.keep_blank_values, self.strict_parsing):
self.list.append(MiniFieldStorage(key, value))
FieldStorageClass = None
klass = self.FieldStorageClass or self.__class__
part = klass(self.fp, {}, ib,
environ, keep_blank_values, strict_parsing)
# Throw first part away
while not part.done:
headers = rfc822.Message(self.fp)
part = klass(self.fp, headers, ib,
environ, keep_blank_values, strict_parsing)
self.list.append(part)
self.skip_lines()
def read_multi(self, environ, keep_blank_values, strict_parsing):
"""Internal: read a part that is itself multipart."""
ib = self.innerboundary
if not valid_boundary(ib):
raise ValueError, 'Invalid boundary in multipart form: %r' % (ib,)
self.list = []
if self.qs_on_post:
for key, value in urlparse.parse_qsl(self.qs_on_post,
self.keep_blank_values, self.strict_parsing):
self.list.append(MiniFieldStorage(key, value))
FieldStorageClass = None
klass = self.FieldStorageClass or self.__class__
part = klass(self.fp, {}, ib,
environ, keep_blank_values, strict_parsing)
# Throw first part away
while not part.done:
headers = rfc822.Message(self.fp)
part = klass(self.fp, headers, ib,
environ, keep_blank_values, strict_parsing)
self.list.append(part)
self.skip_lines()
def read_multi(self, environ, keep_blank_values, strict_parsing):
"""Internal: read a part that is itself multipart."""
ib = self.innerboundary
if not valid_boundary(ib):
raise ValueError, 'Invalid boundary in multipart form: %r' % (ib,)
self.list = []
if self.qs_on_post:
for key, value in urlparse.parse_qsl(self.qs_on_post,
self.keep_blank_values, self.strict_parsing):
self.list.append(MiniFieldStorage(key, value))
FieldStorageClass = None
klass = self.FieldStorageClass or self.__class__
part = klass(self.fp, {}, ib,
environ, keep_blank_values, strict_parsing)
# Throw first part away
while not part.done:
headers = rfc822.Message(self.fp)
part = klass(self.fp, headers, ib,
environ, keep_blank_values, strict_parsing)
self.list.append(part)
self.skip_lines()
def do_GET(self):
"""Handle a GET request.
Parses the query parameters and prints a message
if the flow has completed. Note that we can't detect
if an error occurred.
"""
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
query = self.path.split('?', 1)[-1]
query = dict(urlparse.parse_qsl(query))
self.server.query_params = query
self.wfile.write("<html><head><title>Authentication Status</title></head>")
self.wfile.write("<body><p>The authentication flow has completed.</p>")
self.wfile.write("</body></html>")
def _add_query_parameter(url, name, value):
"""Adds a query parameter to a url.
Replaces the current value if it already exists in the URL.
Args:
url: string, url to add the query parameter to.
name: string, query parameter name.
value: string, query parameter value.
Returns:
Updated query parameter. Does not update the url if value is None.
"""
if value is None:
return url
else:
parsed = list(urlparse.urlparse(url))
q = dict(urlparse.parse_qsl(parsed[4]))
q[name] = value
parsed[4] = urllib.urlencode(q)
return urlparse.urlunparse(parsed)
def resolve(url):
try:
url = urlparse.urlparse(url).query
url = urlparse.parse_qsl(url)[0][1]
url = 'http://videomega.tv/cdn.php?ref=%s' % url
#control.log("### VIDEOMEGA RES %s" % url)
result = client.request(url, mobile=True)
#control.log("### VIDEOMEGA RES %s" % result)
result = re.compile('eval.*?{}\)\)').findall(result)[-1]
#control.log("### VIDEOMEGA RES2 %s" % result)
result = jsunpack.unpack(result)
#control.log("### VIDEOMEGA RE3 %s" % result)
#"src", "http://abo.cdn.vizplay.org/m2/769a65801d8e8a110452f2b74d4082d1.mp4?st=-A2O2o2soMR81Niiag5EyA&hash=Dw8Kth5gh-nNyMRbWLZMKA"
url = re.compile('"src","(.*?)"').findall(result)[-1]
#control.log("### VIDEOMEGA RE4 %s" % url)
return url
except:
return
def oembed(url, params=""):
"""
Render an OEmbed-compatible link as an embedded item.
:param url: A URL of an OEmbed provider.
:return: The OEMbed ``<embed>`` code.
"""
# Note: this method isn't currently very efficient - the data isn't
# cached or stored.
kwargs = dict(urlparse.parse_qsl(params))
try:
return mark_safe(get_oembed_data(
url,
**kwargs
)['html'])
except (KeyError, ProviderException):
if settings.DEBUG:
return "No OEmbed data returned"
return ""
def unmake(self, st):
p = st.split(".")
if len(p) != 2:
return None
s = self.decode(p[1].decode('hex'))
if s == None:
return None
h = SHA.new()
h.update(self.mac_key)
h.update(s)
f = h.hexdigest()
print s
if p[0] != f:
return None
kv = urlparse.parse_qsl(s)
ret ={}
for k, v in kv:
ret[k] = v
return ret
def router(paramstring):
params = dict(parse_qsl(paramstring[1:]))
if params:
if params['mode'] == 'play':
play_item = xbmcgui.ListItem(path=params['link'])
xbmcplugin.setResolvedUrl(__handle__, True, listitem=play_item)
else:
for stream in streams():
list_item = xbmcgui.ListItem(label=stream['name'], thumbnailImage=stream['thumb'])
list_item.setProperty('fanart_image', stream['thumb'])
list_item.setProperty('IsPlayable', 'true')
url = '{0}?mode=play&link={1}'.format(__url__, stream['link'])
xbmcplugin.addDirectoryItem(__handle__, url, list_item, isFolder=False)
xbmcplugin.endOfDirectory(__handle__)
# --------------------------------------------------------------------------------
# Main
# --------------------------------------------------------------------------------
def router(paramstring):
"""Decides what to do based on script parameters"""
check_settings()
params = dict(parse_qsl(paramstring))
# Nothing to do yet with those
if not params:
# Demo channel list
channels = map_channels(filter_channels(get_tv_channels()))
xbmcplugin.addDirectoryItems(plugin_handle, channels, len(channels))
xbmcplugin.addSortMethod(
plugin_handle, xbmcplugin.SORT_METHOD_LABEL_IGNORE_THE)
xbmcplugin.endOfDirectory(plugin_handle)
elif params['action'] == 'play':
play_channel(params['channel'])
elif params['action'] == 'get_user_id':
get_user_id()
def read_multi(self, environ, keep_blank_values, strict_parsing):
"""Internal: read a part that is itself multipart."""
ib = self.innerboundary
if not valid_boundary(ib):
raise ValueError, 'Invalid boundary in multipart form: %r' % (ib,)
self.list = []
if self.qs_on_post:
for key, value in urlparse.parse_qsl(self.qs_on_post,
self.keep_blank_values, self.strict_parsing):
self.list.append(MiniFieldStorage(key, value))
FieldStorageClass = None
klass = self.FieldStorageClass or self.__class__
part = klass(self.fp, {}, ib,
environ, keep_blank_values, strict_parsing)
# Throw first part away
while not part.done:
headers = rfc822.Message(self.fp)
part = klass(self.fp, headers, ib,
environ, keep_blank_values, strict_parsing)
self.list.append(part)
self.skip_lines()
def router(paramstring):
"""Router function that calls other functions depending on the provided paramstring."""
params = dict(urlparse.parse_qsl(paramstring))
if params:
if params['action'] == 'play_event':
play(params['channel_id'], params['airing_id'])
if params['action'] == 'play_channel':
play(params['channel_id'])
elif params['action'] == 'list_events':
list_events(params['schedule_type'])
elif params['action'] == 'list_events_by_date':
list_events(params['schedule_type'], params['filter_date'])
elif params['action'] == 'list_upcoming_days':
list_upcoming_days()
elif params['action'] == 'show_auth_details':
show_auth_details()
elif params['action'] == 'search':
search()
elif params['action'] == 'dialog':
dialog(params['dialog_type'], params['heading'], params['message'])
elif params['action'] == 'channel_to_favs':
channel_to_favs(params['channel_name'], params['channel_id'])
else:
main_menu()
def GET(self):
""" list all rucio accounts.
HTTP Success:
200 OK
HTTP Error:
401 Unauthorized
500 InternalError
:param Rucio-Account: Account identifier.
:param Rucio-Auth-Token: as an 32 character hex string.
:returns: A list containing all account names as dict.
"""
header('Content-Type', 'application/x-json-stream')
filter = {}
if ctx.query:
filter = dict(parse_qsl(ctx.query[1:]))
for account in list_accounts(filter=filter):
yield render_json(**account) + "\n"
def parse_qsl(qs, keep_blank_values=0, strict_parsing=0):
"""Parse a query given as a string argument."""
warn("cgi.parse_qsl is deprecated, use urlparse.parse_qsl instead",
PendingDeprecationWarning, 2)
return urlparse.parse_qsl(qs, keep_blank_values, strict_parsing)
def read_urlencoded(self):
"""Internal: read data in query string format."""
qs = self.fp.read(self.length)
if self.qs_on_post:
qs += '&' + self.qs_on_post
self.list = list = []
for key, value in urlparse.parse_qsl(qs, self.keep_blank_values,
self.strict_parsing):
list.append(MiniFieldStorage(key, value))
self.skip_lines()
def get_url_query(url):
parsed_url = urlparse(url)
url_query = parse_qsl(parsed_url.fragment)
# login_response_url_query can have multiple key
url_query = dict(url_query)
return url_query