def parseResponse(self, data):
data = unquote(data)
return dict(item.split('=', 1) for item in data.split('?')[-1].split('&'))
python类unquote()的实例源码
def _parse_qsl(qs):
r = []
for pair in qs.replace(';', '&').split('&'):
if not pair: continue
nv = pair.split('=', 1)
if len(nv) != 2: nv.append('')
key = urlunquote(nv[0].replace('+', ' '))
value = urlunquote(nv[1].replace('+', ' '))
r.append((key, value))
return r
def current_user_id(self):
if CONFIG.COOKIE_SECRET:
user_id = self.get_secure_cookie('user_id', min_version=2)
if user_id:
return user_id
# user_id = self.get_secure_cookie("user_id") # user_id
# fixed no cookie value in User-Agent for Shockwave Flash and for lua upload
if not user_id:
secure_code = self.get_argument('code', '') # code = self.get_cookie('user_id')
if secure_code:
secure_user_id = unquote(secure_code)
user_id = decode_signed_value(self.application.settings["cookie_secret"], 'user_id', secure_user_id)
return user_id
def create_google_session(self):
"""Summary
Returns:
TYPE: Description
"""
session = requests.session ()
login_html = session.get ( DataManagement.__GOOGLE_ACCOUNT_URL )
#Check cookies returned because there is an issue with the authentication
#GAPS , GALX , NID - These cookies are used to identify the user when using Google + functionality.
#GAPS is still provided
self.logger.debug(session.cookies.get_dict ().keys ())
try:
galx = session.cookies['GALX']
except:
self.logger.error('No cookie GALX')
soup_login = BeautifulSoup ( login_html.content , 'html.parser' ).find ( 'form' ).find_all ( 'input' )
payload = {}
for u in soup_login:
if u.has_attr ( 'value' ):
payload[u['name']] = u['value']
payload['Email'] = self.__username
payload['Passwd'] = self.__password
auto = login_html.headers.get ( 'X-Auto-Login' )
follow_up = unquote ( unquote ( auto ) ).split ( 'continue=' )[-1]
#Commented as suggested in https://github.com/tracek/gee_asset_manager/issues/36
#galx = login_html.cookies['GALX']
payload['continue'] = follow_up
# Commented as suggested in https://github.com/tracek/gee_asset_manager/issues/36
#payload['GALX'] = galx
session.post ( DataManagement.__AUTHENTICATION_URL , data=payload )
return session
def push_state(self, request, title, url=''):
if request.is_mobile():
# FIXME hack????webview?????document.title???
script = '''
(function(){
var $body = $('body');
var $iframe = $('<iframe src="/@@/img/favicon.ico" style="display:none;"></iframe>').on('load', function() {
setTimeout(function() {
$iframe.off('load').remove()
}, 0)
}).appendTo($body);
})();
'''
self._append_script(script, False)
title = self._escape_value(title)
# ??ajax???pushState
if not request.headers.has_key('kss'):
self._append_script('document.title=%s' % title, False)
return
form = self.request.form
# ?????
if form.has_key('back'):
return
else:
form['back'] = True
kss = request.getURL()
if form:
kss += '?%s' % urllib.urlencode(form)
data = json.dumps({'form':form, 'url':kss})
if not url:
url = urllib.unquote(kss)
script = "History.trigger=false;History.pushState(%s, %s, '%s')" % (data, title, url)
self._append_script(script, False)
def displayContents(contents, isBase64=False):
'''my hacky way to not display duplicate contents.
for some reason xml sends back to back requests
and i only want to show the first one'''
global LAST_CONTENTS
newContents = sha1(contents).hexdigest()
if LAST_CONTENTS != newContents:
print "[+] Received response, displaying\n"
if not isBase64:
print urllib.unquote(contents)
else:
print urllib.unquote(contents).decode('base64')
LAST_CONTENTS = newContents
print "------\n"
return
def url2pathname(url):
"""OS-specific conversion from a relative URL of the 'file' scheme
to a file system path; not recommended for general use."""
# e.g.
# ///C|/foo/bar/spam.foo
# becomes
# C:\foo\bar\spam.foo
import string, urllib
# Windows itself uses ":" even in URLs.
url = url.replace(':', '|')
if not '|' in url:
# No drive specifier, just convert slashes
if url[:4] == '////':
# path is something like ////host/path/on/remote/host
# convert this to \\host\path\on\remote\host
# (notice halving of slashes at the start of the path)
url = url[2:]
components = url.split('/')
# make sure not to convert quoted slashes :-)
return urllib.unquote('\\'.join(components))
comp = url.split('|')
if len(comp) != 2 or comp[0][-1] not in string.ascii_letters:
error = 'Bad URL: ' + url
raise IOError, error
drive = comp[0][-1].upper()
path = drive + ':'
components = comp[1].split('/')
for comp in components:
if comp:
path = path + '\\' + urllib.unquote(comp)
# Issue #11474: url like '/C|/' should convert into 'C:\\'
if path.endswith(':') and url.endswith('/'):
path += '\\'
return path
def parseaddr(addr):
addrs = _AddressList(addr).addresslist
if not addrs:
return '', ''
return addrs[0]
# rfc822.unquote() doesn't properly de-backslash-ify in Python pre-2.3.
def unquote(str):
"""Remove quotes from a string."""
if len(str) > 1:
if str.startswith('"') and str.endswith('"'):
return str[1:-1].replace('\\\\', '\\').replace('\\"', '"')
if str.startswith('<') and str.endswith('>'):
return str[1:-1]
return str
# RFC2231-related functions - parameter encoding and decoding
def collapse_rfc2231_value(value, errors='replace',
fallback_charset='us-ascii'):
if isinstance(value, tuple):
rawval = unquote(value[2])
charset = value[0] or 'us-ascii'
try:
return unicode(rawval, charset, errors)
except LookupError:
# XXX charset is unknown to Python.
return unicode(rawval, fallback_charset, errors)
else:
return unquote(value)
def list_directory(self, path):
"""Helper to produce a directory listing (absent index.html).
Return value is either a file object, or None (indicating an
error). In either case, the headers are sent, making the
interface the same as for send_head().
"""
try:
list = os.listdir(path)
except os.error:
self.send_error(404, "No permission to list directory")
return None
list.sort(key=lambda a: a.lower())
f = StringIO()
displaypath = cgi.escape(urllib.unquote(self.path))
f.write('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">')
f.write("<html>\n<title>Directory listing for %s</title>\n" % displaypath)
f.write("<body>\n<h2>Directory listing for %s</h2>\n" % displaypath)
f.write("<hr>\n<ul>\n")
for name in list:
fullname = os.path.join(path, name)
displayname = linkname = name
# Append / for directories or @ for symbolic links
if os.path.isdir(fullname):
displayname = name + "/"
linkname = name + "/"
if os.path.islink(fullname):
displayname = name + "@"
# Note: a link to a directory displays with @ and links with /
f.write('<li><a href="%s">%s</a>\n'
% (urllib.quote(linkname), cgi.escape(displayname)))
f.write("</ul>\n<hr>\n</body>\n</html>\n")
length = f.tell()
f.seek(0)
self.send_response(200)
encoding = sys.getfilesystemencoding()
self.send_header("Content-type", "text/html; charset=%s" % encoding)
self.send_header("Content-Length", str(length))
self.end_headers()
return f
def translate_path(self, path):
"""Translate a /-separated PATH to the local filename syntax.
Components that mean special things to the local file system
(e.g. drive or directory names) are ignored. (XXX They should
probably be diagnosed.)
"""
# abandon query parameters
path = path.split('?',1)[0]
path = path.split('#',1)[0]
path = posixpath.normpath(urllib.unquote(path))
words = path.split('/')
words = filter(None, words)
path = os.getcwd()
for word in words:
drive, word = os.path.splitdrive(word)
head, word = os.path.split(word)
if word in (os.curdir, os.pardir): continue
path = os.path.join(path, word)
return path
def get_environ(self):
env = self.server.base_environ.copy()
env['SERVER_PROTOCOL'] = self.request_version
env['REQUEST_METHOD'] = self.command
if '?' in self.path:
path,query = self.path.split('?',1)
else:
path,query = self.path,''
env['PATH_INFO'] = urllib.unquote(path)
env['QUERY_STRING'] = query
host = self.address_string()
if host != self.client_address[0]:
env['REMOTE_HOST'] = host
env['REMOTE_ADDR'] = self.client_address[0]
if self.headers.typeheader is None:
env['CONTENT_TYPE'] = self.headers.type
else:
env['CONTENT_TYPE'] = self.headers.typeheader
length = self.headers.getheader('content-length')
if length:
env['CONTENT_LENGTH'] = length
for h in self.headers.headers:
k,v = h.split(':',1)
k=k.replace('-','_').upper(); v=v.strip()
if k in env:
continue # skip content length, type,etc.
if 'HTTP_'+k in env:
env['HTTP_'+k] += ','+v # comma-separate multiple headers
else:
env['HTTP_'+k] = v
return env
def get_host_info(self, host):
x509 = {}
if isinstance(host, TupleType):
host, x509 = host
import urllib
auth, host = urllib.splituser(host)
if auth:
import base64
auth = base64.encodestring(urllib.unquote(auth))
auth = string.join(string.split(auth), "") # get rid of whitespace
extra_headers = [
("Authorization", "Basic " + auth)
]
else:
extra_headers = None
return host, extra_headers, x509
##
# Connect to server.
#
# @param host Target host.
# @return A connection handle.
def __handle_unescape(self, key):
start = 0
while True:
start_js = self.js
offset = self.js.find(key, start)
if offset == -1: break
offset += len(key)
expr = ''
extra = ''
last_c = self.js[offset - 1]
abort = False
for i, c in enumerate(self.js[offset:]):
extra += c
if c == ')':
break
elif (i > 0 and c == '(') or (c == '[' and last_c != '+'):
abort = True
break
elif c == '%' or c in string.hexdigits:
expr += c
last_c = c
if not abort:
self.js = self.js.replace(key + extra, urllib.unquote(expr))
if start_js == self.js:
break
else:
start = offset
def sources(self, url, hostDict, hostprDict):
try:
sources = []
if url == None: return sources
data = urlparse.parse_qs(url)
data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
title = data['title'] ; year = data['year']
h = {'User-Agent': client.randomagent()}
v = '%s_%s' % (cleantitle.geturl(title).replace('-', '_'), year)
url = '/watch?v=%s' % v
url = urlparse.urljoin(self.base_link, url)
#c = client.request(url, headers=h, output='cookie')
#c = client.request(urlparse.urljoin(self.base_link, '/av'), cookie=c, output='cookie', headers=h, referer=url)
#c = client.request(url, cookie=c, headers=h, referer=url, output='cookie')
post = urllib.urlencode({'v': v})
u = urlparse.urljoin(self.base_link, '/video_info/iframe')
#r = client.request(u, post=post, cookie=c, headers=h, XHR=True, referer=url)
r = client.request(u, post=post, headers=h, XHR=True, referer=url)
r = json.loads(r).values()
r = [urllib.unquote(i.split('url=')[-1]) for i in r]
for i in r:
try: sources.append({'source': 'gvideo', 'quality': directstream.googletag(i)[0]['quality'], 'language': 'en', 'url': i, 'direct': True, 'debridonly': False})
except: pass
return sources
except:
return sources
def sources(self, url, hostDict, hostprDict):
try:
sources = []
if url == None: return sources
data = urlparse.parse_qs(url)
data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
title = data['title'] ; year = data['year']
h = {'User-Agent': client.randomagent()}
v = '%s_%s' % (cleantitle.geturl(title).replace('-', '_'), year)
url = '/watch_%s.html' % v
url = urlparse.urljoin(self.base_link, url)
c = client.request(url, headers=h, output='cookie')
c = client.request(urlparse.urljoin(self.base_link, '/av'), cookie=c, output='cookie', headers=h, referer=url)
#c = client.request(url, cookie=c, headers=h, referer=url, output='cookie')
post = urllib.urlencode({'v': v})
u = urlparse.urljoin(self.base_link, '/video_info/frame')
#r = client.request(u, post=post, cookie=c, headers=h, XHR=True, referer=url)
r = client.request(u, post=post, headers=h, XHR=True, referer=url)
r = json.loads(r).values()
r = [urllib.unquote(i.split('url=')[-1]) for i in r]
for i in r:
try: sources.append({'source': 'gvideo', 'quality': directstream.googletag(i)[0]['quality'], 'language': 'en', 'url': i, 'direct': True, 'debridonly': False})
except: pass
return sources
except:
return sources
def sources(self, url, hostDict, hostprDict):
sources = []
try:
if not url:
return sources
referer = urlparse.urljoin(self.base_link, url)
c, h = self.__get_cookies(referer)
try: post = urlparse.parse_qs(urlparse.urlparse(referer).query).values()[0][0]
except: post = referer.strip('/').split('/')[-1].split('watch_', 1)[-1].rsplit('#')[0].rsplit('.')[0]
post = urllib.urlencode({'v': post})
url = urlparse.urljoin(self.base_link, '/video_info/iframe')
r = client.request(url, post=post, headers=h, cookie=c, XHR=True, referer=referer)
r = json.loads(r).values()
r = [urllib.unquote(i.split('url=')[-1]) for i in r]
for i in r:
try: sources.append({'source': 'gvideo', 'quality': directstream.googletag(i)[0]['quality'], 'language': 'en', 'url': i, 'direct': True, 'debridonly': False})
except: pass
return sources
except:
return sources
def post(self):
if pubsub_utils.SUBSCRIPTION_UNIQUE_TOKEN != self.request.get('token'):
self.response.status = 404
return
# Store the message in the datastore.
message = json.loads(urllib.unquote(self.request.body).rstrip('='))
message_body = base64.b64decode(str(message['message']['data']))
message = message_body.split(',')
d = datetime.strptime(message[0][:-5],'%Y-%m-%dT%H:%M:%S')
timestamp = time.mktime(d.timetuple())
message = message[1:]
entities = zip(message[::2],map(int,message[1::2]))
data_raw = memcache.get(MC_OSCARS_TOP10)
if data_raw:
data = json.loads(memcache.get(MC_OSCARS_TOP10))
else:
data = None
if data is None or data['timestamp'] < timestamp:
memcache.set(MC_OSCARS_TOP10,json.dumps({
'timestamp': timestamp,
'entities': entities
}))
def register(request):
"""Register the user."""
from .models import RegisteredUser, RegisteredUserForm
email = request.user.email
username = request.user.username
u = RegisteredUser.objects.get(username=username)
next = unquote(request.GET.get('next', reverse('webtzite_dashboard')))
if next == reverse('webtzite_register'):
next = reverse('webtzite_dashboard')
if request.method == "GET":
if u.is_registered:
return redirect(next)
form = RegisteredUserForm()
else:
form = RegisteredUserForm(request.POST, instance=u)
if form.is_valid():
u.is_registered = True
u.institution = form.cleaned_data['institution']
u.first_name = form.cleaned_data['first_name']
u.last_name = form.cleaned_data['last_name']
if os.environ.get('JPY_USER'):
from git.config import GitConfigParser
cfg = os.path.normpath(os.path.expanduser("~/.gitconfig"))
gcp = GitConfigParser(cfg, read_only=False)
full_name = ' '.join([u.first_name, u.last_name])
gcp.set_value('user', 'name', full_name)
gcp.set_value('user', 'email', u.email)
u.is_superuser = bool(RegisteredUser.objects.count() == 1)
u.save()
return redirect(next)
ctx = RequestContext(request)
return render_to_response('register.html', locals(), ctx)