def can_fetch(self, useragent, url):
"""using the parsed robots.txt decide if useragent can fetch url"""
if self.disallow_all:
return False
if self.allow_all:
return True
# search for given user agent matches
# the first match counts
parsed_url = urlparse.urlparse(urllib.unquote(url))
url = urlparse.urlunparse(('', '', parsed_url.path,
parsed_url.params, parsed_url.query, parsed_url.fragment))
url = urllib.quote(url)
if not url:
url = "/"
for entry in self.entries:
if entry.applies_to(useragent):
return entry.allowance(url)
# try the default entry last
if self.default_entry:
return self.default_entry.allowance(url)
# agent not found ==> access granted
return True
python类unquote()的实例源码
def do_POST(self):
dc = self.IFACE_CLASS
uri = urlparse.urljoin(self.get_baseuri(dc), self.path)
uri = urllib.unquote(uri)
dbname, dburi = TrytonDAVInterface.get_dburi(uri)
if dburi.startswith('Calendars'):
# read the body
body = None
if 'Content-Length' in self.headers:
l = self.headers['Content-Length']
body = self.rfile.read(atoi(l))
ct = None
if 'Content-Type' in self.headers:
ct = self.headers['Content-Type']
try:
DATA = '%s\n' % dc._get_caldav_post(uri, body, ct)
except DAV_Error, exception:
ec, _ = exception
return self.send_status(ec)
self.send_body(DATA, 200, 'OK', 'OK')
return
return _prev_do_POST(self)
def _entity_path(self, state):
"""Calculate the path to an entity to be returned.
*state* should be the dictionary returned by
:func:`_parse_atom_entry`. :func:`_entity_path` extracts the
link to this entity from *state*, and strips all the namespace
prefixes from it to leave only the relative path of the entity
itself, sans namespace.
:rtype: ``string``
:return: an absolute path
"""
# This has been factored out so that it can be easily
# overloaded by Configurations, which has to switch its
# entities' endpoints from its own properties/ to configs/.
raw_path = urllib.unquote(state.links.alternate)
if 'servicesNS/' in raw_path:
return _trailing(raw_path, 'servicesNS/', '/', '/')
elif 'services/' in raw_path:
return _trailing(raw_path, 'services/')
else:
return raw_path
def read(self, ifile):
""" Reads an input header from an input file.
The input header is read as a sequence of *<name>***:***<value>* pairs separated by a newline. The end of the
input header is signalled by an empty line or an end-of-file.
:param ifile: File-like object that supports iteration over lines.
"""
name, value = None, None
for line in ifile:
if line == '\n':
break
item = line.split(':', 1)
if len(item) == 2:
# start of a new item
if name is not None:
self[name] = value[:-1] # value sans trailing newline
name, value = item[0], unquote(item[1])
elif name is not None:
# continuation of the current item
value += unquote(line)
if name is not None: self[name] = value[:-1] if value[-1] == '\n' else value
def _entity_path(self, state):
"""Calculate the path to an entity to be returned.
*state* should be the dictionary returned by
:func:`_parse_atom_entry`. :func:`_entity_path` extracts the
link to this entity from *state*, and strips all the namespace
prefixes from it to leave only the relative path of the entity
itself, sans namespace.
:rtype: ``string``
:return: an absolute path
"""
# This has been factored out so that it can be easily
# overloaded by Configurations, which has to switch its
# entities' endpoints from its own properties/ to configs/.
raw_path = urllib.unquote(state.links.alternate)
if 'servicesNS/' in raw_path:
return _trailing(raw_path, 'servicesNS/', '/', '/')
elif 'services/' in raw_path:
return _trailing(raw_path, 'services/')
else:
return raw_path
def read(self, ifile):
""" Reads an input header from an input file.
The input header is read as a sequence of *<name>***:***<value>* pairs separated by a newline. The end of the
input header is signalled by an empty line or an end-of-file.
:param ifile: File-like object that supports iteration over lines.
"""
name, value = None, None
for line in ifile:
if line == '\n':
break
item = line.split(':', 1)
if len(item) == 2:
# start of a new item
if name is not None:
self[name] = value[:-1] # value sans trailing newline
name, value = item[0], unquote(item[1])
elif name is not None:
# continuation of the current item
value += unquote(line)
if name is not None: self[name] = value[:-1] if value[-1] == '\n' else value
def deletethread(project, thread_id):
thread = Thread.get_by_id(thread_id)
if Comment.query.filter_by(thread_id=thread_id).first():
flash(_('Thread is not empty'), 'error')
else:
thread = Thread.query.filter_by(id=thread_id).first()
if not current_user.is_authenticated:
flash(_('You must be logged in to delete a thread'), 'error')
else:
if (current_user != thread.owner
and current_user != project.get_master().owner):
flash(_('You are not allowed to delete this thread'), 'error')
else:
thread.delete()
db.session.commit()
flash(_('Thread successfully deleted'), 'info')
if 'return_url' in request.args:
return redirect(urllib.unquote(request.args['return_url']))
else:
return redirect(url_for('branches.view', project=project.name,
branch='master', filename='index'))
def editcomment(project, comment_id):
comment = Comment.get_by_id(comment_id)
form = CommentForm(request.form,
comment=comment.content)
if current_user != comment.owner:
flash(_('You are not allowed to edit this comment'), 'error')
if 'return_url' in request.args:
return redirect(urllib.unquote(request.args['return_url']))
else:
return redirect(url_for('branches.view', project=project.name,
branch='master', filename='index'))
if request.method == 'POST' and form.validate():
comment.content = form.comment.data
db.session.commit()
flash(_('Comment modified successfully'), 'info')
if 'return_url' in request.args:
return redirect(urllib.unquote(request.args['return_url']))
else:
return redirect(url_for('branches.view', project=project.name,
branch='master', filename='index'))
threads = (Thread.query.filter_by(id=comment.thread.id)
.order_by(desc(Thread.posted_at)))
return render_template('threads/newcomment.html', form=form)
def deletecomment(project, comment_id):
comment = Comment.get_by_id(comment_id)
if comment.has_replies():
flash(_('This comment has replies and cannot be deleted'), 'error')
else:
if not current_user.is_authenticated:
flash(_('You must be logged in to delete a comment'), 'error')
else:
if (current_user != comment.owner
and current_user != project.get_master().owner):
flash(_('You are not allowed '
'to delete this thread'), 'error')
else:
comment.delete()
db.session.commit()
flash(_('Comment successfully deleted'), 'info')
if 'return_url' in request.args:
return redirect(urllib.unquote(request.args['return_url']))
else:
return redirect(url_for('branches.view', project=project.name,
branch='master', filename='index'))
def _header_to_id(self, header):
"""Convert a Content-ID header value to an id.
Presumes the Content-ID header conforms to the format that _id_to_header()
returns.
Args:
header: string, Content-ID header value.
Returns:
The extracted id value.
Raises:
BatchError if the header is not in the expected format.
"""
if header[0] != '<' or header[-1] != '>':
raise BatchError("Invalid value for Content-ID: %s" % header)
if '+' not in header:
raise BatchError("Invalid value for Content-ID: %s" % header)
base, id_ = header[1:-1].rsplit('+', 1)
return urllib.unquote(id_)
def verifySignature(self):
key = RSA.importKey(open(self.raven_public_key).read())
# Compile the parts to hash together
parts = self.rav_str.split("!")
parts.pop() # Remove the last two items related to signing
parts.pop()
to_hash = "!".join(parts)
# Now hash it and verify
our_hash = SHA.new(to_hash)
#print our_hash
verifier = PKCS1_v1_5.new(key)
# Obtain the correct form of the signature
signature = urllib.unquote(self.raven_signature)
signature = signature.replace("-","+")
signature = signature.replace(".","/")
signature = signature.replace("_","=")
signature = base64.b64decode(signature)
if verifier.verify(our_hash, signature):
return True
else:
return False
def translate_path(self, path):
"""Translate a /-separated PATH to the local filename syntax.
Components that mean special things to the local file system
(e.g. drive or directory names) are ignored. (XXX They should
probably be diagnosed.)
"""
# abandon query parameters
path = path.split('?', 1)[0]
path = path.split('#', 1)[0]
path = posixpath.normpath(urllib.unquote(path))
words = path.split('/')
words = filter(None, words)
path = config.get('jsonrpc', 'data')
for word in words:
drive, word = os.path.splitdrive(word)
head, word = os.path.split(word)
if word in (os.curdir, os.pardir):
continue
path = os.path.join(path, word)
return path
def parse_qs(qs, keep_blank_values=0, strict_parsing=0, unquote=unquote):
"""like cgi.parse_qs, only with custom unquote function"""
d = {}
items = [s2 for s1 in qs.split("&") for s2 in s1.split(";")]
for item in items:
try:
k, v = item.split("=", 1)
except ValueError:
if strict_parsing:
raise
continue
if v or keep_blank_values:
k = unquote(k.replace("+", " "))
v = unquote(v.replace("+", " "))
if k in d:
d[k].append(v)
else:
d[k] = [v]
return d
def process(self):
"Process a request."
# get site from channel
self.site = self.channel.site
# set various default headers
self.setHeader('server', version)
self.setHeader('date', http.datetimeToString())
self.setHeader('content-type', "text/html")
# Resource Identification
self.prepath = []
self.postpath = map(unquote, string.split(self.path[1:], '/'))
try:
resrc = self.site.getResourceFor(self)
self.render(resrc)
except:
self.processingFailed(failure.Failure())
def translate_path(self, path):
"""Translate a /-separated PATH to the local filename syntax.
Components that mean special things to the local file system
(e.g. drive or directory names) are ignored. (XXX They should
probably be diagnosed.)
"""
# abandon query parameters
path = path.split('?',1)[0]
path = path.split('#',1)[0]
# Don't forget explicit trailing slash when normalizing. Issue17324
trailing_slash = path.rstrip().endswith('/')
path = posixpath.normpath(urllib.unquote(path))
words = path.split('/')
words = filter(None, words)
path = os.getcwd()
for word in words:
drive, word = os.path.splitdrive(word)
head, word = os.path.split(word)
if word in (os.curdir, os.pardir): continue
path = os.path.join(path, word)
if trailing_slash:
path += '/'
return path
def is_cgi(self):
"""Test whether self.path corresponds to a CGI script.
Returns True and updates the cgi_info attribute to the tuple
(dir, rest) if self.path requires running a CGI script.
Returns False otherwise.
If any exception is raised, the caller should assume that
self.path was rejected as invalid and act accordingly.
The default implementation tests whether the normalized url
path begins with one of the strings in self.cgi_directories
(and the next character is a '/' or the end of the string).
"""
collapsed_path = _url_collapse_path(urllib.unquote(self.path))
dir_sep = collapsed_path.find('/', 1)
head, tail = collapsed_path[:dir_sep], collapsed_path[dir_sep+1:]
if head in self.cgi_directories:
self.cgi_info = head, tail
return True
return False
def get_wxjs_config(url, wx_tool):
js_api_list = []
try:
obj = Config.objects.get(kind=Config.KIND_JS_API_LIST)
value = json.loads(obj.value)
js_api_list = value.split()
except Config.DoesNotExist:
pass
url = unquote(url)
param = {
"debug": False,
"jsApiList": js_api_list,
"url": url
}
config = wx_tool.get_js_config(param)
return config
def do_GET(self):
if "?payload" in self.path:
query= urllib.splitquery(self.path)
action = query[1].split('=')[1]
print action
action = urllib.unquote(action)
print action
try:
x = cPickle.loads(action) #string argv
content = "command executed"
except Exception,e:
print e
content = e
else:
content = "hello World"
self.send_response(200)
self.send_header("Content-type","text/html")
self.end_headers()
self.wfile.write("<html>")
self.wfile.write(" %s " % content)
self.wfile.write("</html>")
def update_parameter(request):
if request.method == 'GET':
from urllib import unquote
update_parameter_form = StepManipulateForm(request.GET)
if update_parameter_form.is_valid():
cd = update_parameter_form.cleaned_data
step = Protocol.objects.get(id=cd['id'])
if (step.check_owner(request.user.id) or request.user.is_superuser):
step.update_parameter(unquote(cd['parameter']))
step.save()
return success('Your step has been updated.')
else:
return error('Your are not owner of the step.')
else:
return error(str(update_parameter_form.errors))
else:
return error('Method error')
def update_reference(request):
if request.method == 'GET':
from urllib import unquote
update_ref_form = RefManipulateForm(request.GET)
if update_ref_form.is_valid():
cd = update_ref_form.cleaned_data
ref = References.objects.get(id=cd['id'])
if (ref.check_owner(request.user.id) or request.user.is_superuser):
ref.path = unquote(cd['path'])
ref.save()
return success('Your reference has been updated.')
else:
return error('Your are not owner of the step.')
else:
return error(str(update_ref_form.errors))
else:
return error('Method error')
def update_step_order(request):
if request.method == 'GET':
from urllib import unquote
update_order_form = StepOrderManipulateForm(request.GET)
if update_order_form.is_valid():
cd = update_order_form.cleaned_data
relations = list(filter(None, cd['step_order'].split(';')))
error_tag = 0
for relation in relations:
step_id, new_order = relation.split('=')
step = Protocol.objects.get(id=int(step_id), parent=int(cd['protocol']))
if (step.check_owner(request.user.id) or request.user.is_superuser):
step.update_order(int(new_order))
step.save()
else:
return error('Your are not owner of the step.')
if not error_tag:
return success('Your step has been updated.')
else:
return error(str(update_order_form.errors))
else:
return error('Method error')
def oauth_token_info_from_url(url):
"""Exracts an OAuth access token from the redirected page's URL.
Returns:
A tuple of strings containing the OAuth token and the OAuth verifier which
need to sent when upgrading a request token to an access token.
"""
if isinstance(url, (str, unicode)):
url = atom.http_core.Uri.parse_uri(url)
token = None
verifier = None
if 'oauth_token' in url.query:
token = urllib.unquote(url.query['oauth_token'])
if 'oauth_verifier' in url.query:
verifier = urllib.unquote(url.query['oauth_verifier'])
return (token, verifier)
def make_search_form(*args, **kwargs):
"""Factory that instantiates one of the search forms below."""
request = kwargs.pop('request', None)
if request is not None:
sparams_cookie = request.COOKIES.get('pootle-search')
if sparams_cookie:
import json
import urllib
try:
initial_sparams = json.loads(urllib.unquote(sparams_cookie))
except ValueError:
pass
else:
if (isinstance(initial_sparams, dict) and
'sfields' in initial_sparams):
kwargs.update({
'initial': initial_sparams,
})
return SearchForm(*args, **kwargs)
def get_download_url(self):
# fetch and return dict
resp = requests.get('http://www.kirikiri.tv/?m=vod-play-id-4414-src-1-num-2.html').text
data = re.findall("mac_url=unescape\('(.*)?'\)", resp)
if not data:
print_error('No data found, maybe the script is out-of-date.', exit_=False)
return {}
data = unquote(json.loads('["{}"]'.format(data[0].replace('%u', '\\u')))[0])
ret = {}
for i in data.split('#'):
title, url = i.split('$')
ret[parse_episode(title)] = url
return ret
def _header_to_id(self, header):
"""Convert a Content-ID header value to an id.
Presumes the Content-ID header conforms to the format that _id_to_header()
returns.
Args:
header: string, Content-ID header value.
Returns:
The extracted id value.
Raises:
BatchError if the header is not in the expected format.
"""
if header[0] != '<' or header[-1] != '>':
raise BatchError("Invalid value for Content-ID: %s" % header)
if '+' not in header:
raise BatchError("Invalid value for Content-ID: %s" % header)
base, id_ = header[1:-1].rsplit('+', 1)
return urllib.unquote(id_)
def index(**kwargs):
""" Main endpoint, get all templates
"""
filters = {}
if kwargs.get('filters'):
try:
filters = json.loads(unquote(unquote(kwargs['filters'])))
except (ValueError, SyntaxError, TypeError) as ex:
raise BadRequest(str(ex.message))
try:
where = generate_request_filter(filters)
except (AttributeError, KeyError, IndexError, FieldError,
SyntaxError, TypeError, ValueError) as ex:
raise BadRequest(str(ex.message))
try:
templates = MailTemplate.objects.filter(where).order_by('name')
except (AttributeError, KeyError, IndexError, FieldError,
SyntaxError, TypeError, ValueError) as ex:
raise BadRequest(str(ex.message))
return [model_to_dict(t) for t in templates]
def _cheap_response_parse(arg1, arg2):
"""Silly parser for 'name=value; attr=attrvalue' format,
to test out response renders
"""
def crumble(arg):
"Break down string into pieces"
lines = [line for line in arg if line]
done = []
for line in lines:
clauses = [clause for clause in line.split(';')]
import logging
logging.error("clauses %r", clauses)
name, value = re.split(" *= *", clauses[0], 1)
value = unquote(value.strip(' "'))
attrs = [re.split(" *= *", clause, 1) \
for clause in clauses[1:] if clause]
attrs = [attr for attr in attrs \
if attr[0] in Cookie.attribute_names]
attrs = [(k, v.strip(' "')) for k, v in attrs]
done.append((name, value, tuple(attrs)))
return done
result1 = crumble([arg1])
result2 = crumble(arg2)
return result1, result2
def handle_request(self):
try:
data = urllib.unquote(self.get_argument('q'))
with tempfile.NamedTemporaryFile(suffix='.latex') as raw:
raw.write(data)
raw.flush()
filename = latex_to_dvi(raw.name)
svg_data = dvi_to_svg(filename).strip()
png_data, width, height = svg_to_png(svg_data)
png_data = png_data.strip().encode('base64')
return {
'success': True,
'svg': svg_data,
'png': png_data,
'meta': {'width': width,
'height': height}
}
except Exception as error:
print error.message
return {
'success': False,
'error': error.message
}
def main(event, context):
response = {
"statusCode": 200,
"body": 'Usage: curl -XPOST --data-urlencode "identity=$(curl -s http://169.254.169.254/latest/dynamic/instance-identity/pkcs7)" https://limn.company.com/'
}
logger.debug("event: {}".format(json.dumps(event)))
if 'body' in event:
identity = event['body']
if identity.startswith('identity='):
identity = identity[9:]
try:
identity = urllib.unquote(identity).decode('utf8')
trusted_doc = awstrust.verify_pkcs7(identity)
instance = Instance(
trusted_doc['accountId'],
trusted_doc['region'],
trusted_doc['instanceId'])
response['body'] = json.dumps(instance.__dict__)
except Exception as e:
response['statusCode'] = 401
response['body'] = "Error: {}".format(e)
raise
logger.info("response: {}".format(json.dumps(response)))
return response
def translate_path(self, path):
"""Translate a /-separated PATH to the local filename syntax.
Components that mean special things to the local file system
(e.g. drive or directory names) are ignored. (XXX They should
probably be diagnosed.)
"""
# abandon query parameters
path = path.split('?',1)[0]
path = path.split('#',1)[0]
# Don't forget explicit trailing slash when normalizing. Issue17324
trailing_slash = path.rstrip().endswith('/')
path = posixpath.normpath(urllib.unquote(path))
words = path.split('/')
words = filter(None, words)
path = os.getcwd()
for word in words:
drive, word = os.path.splitdrive(word)
head, word = os.path.split(word)
if word in (os.curdir, os.pardir): continue
path = os.path.join(path, word)
if trailing_slash:
path += '/'
return path