def can_fetch(self, useragent, url):
"""using the parsed robots.txt decide if useragent can fetch url"""
if self.disallow_all:
return False
if self.allow_all:
return True
# search for given user agent matches
# the first match counts
parsed_url = urlparse.urlparse(urllib.unquote(url))
url = urlparse.urlunparse(('', '', parsed_url.path,
parsed_url.params, parsed_url.query, parsed_url.fragment))
url = urllib.quote(url)
if not url:
url = "/"
for entry in self.entries:
if entry.applies_to(useragent):
return entry.allowance(url)
# try the default entry last
if self.default_entry:
return self.default_entry.allowance(url)
# agent not found ==> access granted
return True
python类quote()的实例源码
def webpage():
url = request.args.get('url')
if not url:
# redirect with url query param so that user can navigate back later
next_rec = service.get_next_unlabelled()
if next_rec:
return redirect("/?url=%s" % (urllib.quote(next_rec['url'])))
else:
featured_content = "No Unlabelled Record Found."
else:
featured_content = get_next(url)
data = {
'featured_content': featured_content,
'status': service.overall_status()
}
return render_template('index.html', **data)
def get_speech(self, phrase):
getinfo_url = 'http://www.peiyinge.com/make/getSynthSign'
voice_baseurl = 'http://proxy.peiyinge.com:17063/synth?ts='
data = {
'content': phrase.encode('utf8')
}
result_info = requests.post(getinfo_url, data=data).json()
content = urllib.quote(phrase.encode('utf8'))
ts = result_info['ts']
sign = result_info['sign']
voice_url = voice_baseurl + ts + '&sign=' + sign + \
'&vid=' + self.vid + '&volume=&speed=0&content=' + content
r = requests.get(voice_url)
with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as f:
f.write(r.content)
tmpfile = f.name
return tmpfile
def urlparts(self):
""" The :attr:`url` string as an :class:`urlparse.SplitResult` tuple.
The tuple contains (scheme, host, path, query_string and fragment),
but the fragment is always empty because it is not visible to the
server. """
env = self.environ
http = env.get('HTTP_X_FORWARDED_PROTO') \
or env.get('wsgi.url_scheme', 'http')
host = env.get('HTTP_X_FORWARDED_HOST') or env.get('HTTP_HOST')
if not host:
# HTTP 1.1 requires a Host-header. This is for HTTP/1.0 clients.
host = env.get('SERVER_NAME', '127.0.0.1')
port = env.get('SERVER_PORT')
if port and port != ('80' if http == 'http' else '443'):
host += ':' + port
path = urlquote(self.fullpath)
return UrlSplitResult(http, host, path, env.get('QUERY_STRING'), '')
def application_uri(environ):
"""Return the application's base URI (no PATH_INFO or QUERY_STRING)"""
url = environ['wsgi.url_scheme']+'://'
from urllib import quote
if environ.get('HTTP_HOST'):
url += environ['HTTP_HOST']
else:
url += environ['SERVER_NAME']
if environ['wsgi.url_scheme'] == 'https':
if environ['SERVER_PORT'] != '443':
url += ':' + environ['SERVER_PORT']
else:
if environ['SERVER_PORT'] != '80':
url += ':' + environ['SERVER_PORT']
url += quote(environ.get('SCRIPT_NAME') or '/')
return url
def getEntries(person):
""" Fetch a Advogato member's diary and return a dictionary in the form
{ date : entry, ... }
"""
parser = DiaryParser()
f = urllib.urlopen("http://www.advogato.org/person/%s/diary.xml" % urllib.quote(person))
s = f.read(8192)
while s:
parser.feed(s)
s = f.read(8192)
parser.close()
result = {}
for d, e in map(None, parser.dates, parser.entries):
result[d] = e
return result
def open_file(self, url):
path = urllib.url2pathname(urllib.unquote(url))
if os.path.isdir(path):
if path[-1] != os.sep:
url = url + '/'
indexpath = os.path.join(path, "index.html")
if os.path.exists(indexpath):
return self.open_file(url + "index.html")
try:
names = os.listdir(path)
except os.error, msg:
raise IOError, msg, sys.exc_traceback
names.sort()
s = MyStringIO("file:"+url, {'content-type': 'text/html'})
s.write('<BASE HREF="file:%s">\n' %
urllib.quote(os.path.join(path, "")))
for name in names:
q = urllib.quote(name)
s.write('<A HREF="%s">%s</A>\n' % (q, q))
s.seek(0)
return s
return urllib.FancyURLopener.open_file(self, url)
def fbUserSearch(self, query):
"""
facebook user search.
:type query: str
:param query:
:rtype: object
:return: query data
"""
query = urllib.quote(query)
query = \
self.http.request("fbsearch/topsearch/?context=blended&query=" + query + "&rank_token=" + self.rank_token)[
1]
if query['status'] != 'ok':
raise InstagramException(query['message'] + "\n")
return query
def searchFBLocation(self, query):
"""
Get locations.
:type query: str
:param query: search query
:rtype: object
:return: Location location data
"""
query = urllib.quote(query)
endpoint = "fbsearch/places/?rank_token=" + self.rank_token + "&query=" + query
locationFeed = self.http.request(endpoint)[1]
if locationFeed['status'] != 'ok':
raise InstagramException(locationFeed['message'] + "\n")
return locationFeed
def head_object(self, Bucket, Key, **kwargs):
"""??????
:param Bucket(string): ?????.
:param Key(string): COS??.
:param kwargs(dict): ????headers.
:return(dict): ???metadata??.
"""
headers = mapped(kwargs)
url = self._conf.uri(bucket=Bucket, path=quote(Key, '/-_.~'))
logger.info("head object, url=:{url} ,headers=:{headers}".format(
url=url,
headers=headers))
rt = self.send_request(
method='HEAD',
url=url,
auth=CosS3Auth(self._conf._secret_id, self._conf._secret_key, Key),
headers=headers)
return rt.headers
def abort_multipart_upload(self, Bucket, Key, UploadId, **kwargs):
"""???????????????????????????.
:param Bucket(string): ?????.
:param Key(string): COS??.
:param UploadId(string): ???????UploadId.
:param kwargs(dict): ????headers.
:return: None.
"""
headers = mapped(kwargs)
url = self._conf.uri(bucket=Bucket, path=quote(Key, '/-_.~')+"?uploadId={UploadId}".format(UploadId=UploadId))
logger.info("abort multipart upload, url=:{url} ,headers=:{headers}".format(
url=url,
headers=headers))
rt = self.send_request(
method='DELETE',
url=url,
auth=CosS3Auth(self._conf._secret_id, self._conf._secret_key, Key),
headers=headers)
return None
def package():
sent_package = {}
sent_package['get_requests'] = get_requests
def has_requests(project, branch):
return len(get_requests(project, branch)) > 0
sent_package['has_requests'] = has_requests
sent_package['get_log_diff'] = get_log_diff
sent_package['last_modified'] = last_modified
sent_package['get_branch_by_name'] = get_branch_by_name
sent_package['hash'] = lambda x: hashlib.sha256(x).hexdigest()
sent_package['_'] = _
sent_package['url_encode'] = lambda x: urllib.quote(x, safe='')
sent_package['current_user'] = current_user
sent_package['floor'] = math.floor
sent_package['len'] = len
sent_package['getattr'] = getattr
sent_package['commit_diff'] = commit_diff
return sent_package
def _tostring_path(varname, value, explode, operator, safe=""):
joiner = operator
if type(value) == type([]):
if explode == "+":
return joiner.join([varname + "." + urllib.quote(x, safe) for x in value])
elif explode == "*":
return joiner.join([urllib.quote(x, safe) for x in value])
else:
return ",".join([urllib.quote(x, safe) for x in value])
elif type(value) == type({}):
keys = value.keys()
keys.sort()
if explode == "+":
return joiner.join([varname + "." + urllib.quote(key, safe) + joiner + urllib.quote(value[key], safe) for key in keys])
elif explode == "*":
return joiner.join([urllib.quote(key, safe) + joiner + urllib.quote(value[key], safe) for key in keys])
else:
return ",".join([urllib.quote(key, safe) + "," + urllib.quote(value[key], safe) for key in keys])
else:
if value:
return urllib.quote(value, safe)
else:
return ""
def __get__(self, inst, cls):
from trytond.model import Model
from trytond.wizard import Wizard
from trytond.report import Report
url_part = {}
if issubclass(cls, Model):
url_part['type'] = 'model'
elif issubclass(cls, Wizard):
url_part['type'] = 'wizard'
elif issubclass(cls, Report):
url_part['type'] = 'report'
else:
raise NotImplementedError
url_part['name'] = cls.__name__
url_part['database'] = Transaction().cursor.database_name
local_part = urllib.quote('%(database)s/%(type)s/%(name)s' % url_part)
if isinstance(inst, Model) and inst.id:
local_part += '/%d' % inst.id
return 'tryton://%s/%s' % (HOSTNAME, local_part)
def execute(self, cmd):
cmd = quote(cmd)
url = "{}:{}/userRpm/DebugResultRpm.htm?cmd={}&usr=osteam&passwd=5up".format(self.target, self.port, cmd)
response = http_request(method="GET", url=url, auth=(self.username, self.password))
if response is None:
return ""
if response.status_code == 200:
regexp = 'var cmdResult = new Array\(\n"(.*?)",\n0,0 \);'
res = re.findall(regexp, response.text)
if len(res):
# hard to extract response
return "\n".join(res[0].replace("\\r\\n", "\r\n").split("\n")[1:])
return ""
def renameListGroup(self, groupID, newName):
"""
Used to rename an existing list group.
A default callback is added to the returned
Deferred which updates the contacts attribute
of the factory.
@param groupID: the ID of the desired group to rename.
@param newName: the desired new name for the group.
@return: A Deferred, the callback for which will be called
when the server clarifies the renaming.
The callback argument will be a tuple of 3 elements,
the new list version (int), the group id (int) and
the new group name (str).
"""
id, d = self._createIDMapping()
self.sendLine("REG %s %s %s 0" % (id, groupID, quote(newName)))
def _cb(r):
self.factory.contacts.version = r[0]
self.factory.contacts.setGroup(r[1], r[2])
return r
return d.addCallback(_cb)
def changeScreenName(self, newName):
"""
Used to change your current screen name.
A default callback is added to the returned
Deferred which updates the screenName attribute
of the factory and also updates the contact list
version.
@param newName: the new screen name
@return: A Deferred, the callback for which will be called
when the server sends an adequate reply.
The callback argument will be a tuple of 2 elements:
the new list version and the new screen name.
"""
id, d = self._createIDMapping()
self.sendLine("REA %s %s %s" % (id, self.factory.userHandle, quote(newName)))
def _cb(r):
self.factory.contacts.version = r[0]
self.factory.screenName = r[1]
return r
return d.addCallback(_cb)
def wmfactory_listing(self, request):
if self.dirs is None:
directory = os.listdir(self.path)
directory.sort()
else:
directory = self.dirs
files = []; dirs = []
for path in directory:
url = urllib.quote(path, "/")
if os.path.isdir(os.path.join(self.path, path)):
url = url + '/'
dirs.append({'link':{"text": path + "/", "href":url},
'type': '[Directory]', 'encoding': ''})
else:
mimetype, encoding = getTypeAndEncoding(path, self.contentTypes,
self.contentEncodings,
self.defaultType)
files.append({
'link': {"text": path, "href": url},
'type': '[%s]' % mimetype,
'encoding': (encoding and '[%s]' % encoding or '')})
return files + dirs
def application_uri(environ):
"""Return the application's base URI (no PATH_INFO or QUERY_STRING)"""
url = environ['wsgi.url_scheme']+'://'
from urllib import quote
if environ.get('HTTP_HOST'):
url += environ['HTTP_HOST']
else:
url += environ['SERVER_NAME']
if environ['wsgi.url_scheme'] == 'https':
if environ['SERVER_PORT'] != '443':
url += ':' + environ['SERVER_PORT']
else:
if environ['SERVER_PORT'] != '80':
url += ':' + environ['SERVER_PORT']
url += quote(environ.get('SCRIPT_NAME') or '/')
return url
def makeInternalLink(title, label):
colon = title.find(':')
if colon > 0 and title[:colon] not in acceptedNamespaces:
return ''
if colon == 0:
# drop also :File:
colon2 = title.find(':', colon + 1)
if colon2 > 1 and title[colon + 1:colon2] not in acceptedNamespaces:
return ''
if Extractor.keepLinks:
return '<a href="%s">%s</a>' % (quote(title.encode('utf-8')), label)
else:
return label
# ----------------------------------------------------------------------
# External links
# from: https://doc.wikimedia.org/mediawiki-core/master/php/DefaultSettings_8php_source.html
def _encode_params(**kw):
'''
do url-encode parameters
>>> _encode_params(a=1, b='R&D')
'a=1&b=R%26D'
>>> _encode_params(a=u'\u4e2d\u6587', b=['A', 'B', 123])
'a=%E4%B8%AD%E6%96%87&b=A&b=B&b=123'
'''
args = []
for k, v in kw.iteritems():
if isinstance(v, basestring):
qv = v.encode('utf-8') if isinstance(v, unicode) else v
args.append('%s=%s' % (k, urllib.quote(qv)))
elif isinstance(v, collections.Iterable):
for i in v:
qv = i.encode('utf-8') if isinstance(i, unicode) else str(i)
args.append('%s=%s' % (k, urllib.quote(qv)))
else:
qv = str(v)
args.append('%s=%s' % (k, urllib.quote(qv)))
return '&'.join(args)
def code(self, var=None, **params): # pylint: disable=unused-argument
code = quote(cherrypy.request.params['code'])
callback = cherrypy.url()
payload = {
"client_id": config['alexa']['Client_ID'],
"client_secret": config['alexa']['Client_Secret'],
"code": code,
"grant_type": "authorization_code",
"redirect_uri": callback
}
url = "https://api.amazon.com/auth/o2/token"
response = requests.post(url, data=payload)
resp = response.json()
alexapi.config.set_variable(['alexa', 'refresh_token'], resp['refresh_token'])
return "<h2>Success!</h2>" \
"<p>The refresh token has been added to your config file.</p>" \
"<p>Now:</p>" \
"<ul>" \
"<li>close your this browser window,</li>" \
"<li>exit the setup script as indicated,</li>" \
"<li>and follow the Post-installation steps.</li>" \
"</ul>"
def _string_expansion(self, name, value, explode, prefix):
if value is None:
return None
tuples, items = is_list_of_tuples(value)
if list_test(value) and not tuples:
return ','.join(quote(v, self.safe) for v in value)
if dict_test(value) or tuples:
items = items or sorted(value.items())
format_str = '%s=%s' if explode else '%s,%s'
return ','.join(
format_str % (
quote(k, self.safe), quote(v, self.safe)
) for k, v in items
)
value = value[:prefix] if prefix else value
return quote(value, self.safe)
def execute(self, cmd):
cmd = quote(cmd)
url = "{}:{}/userRpm/DebugResultRpm.htm?cmd={}&usr=osteam&passwd=5up".format(self.target, self.port, cmd)
response = http_request(method="GET", url=url, auth=(self.username, self.password))
if response is None:
return ""
if response.status_code == 200:
regexp = 'var cmdResult = new Array\(\n"(.*?)",\n0,0 \);'
res = re.findall(regexp, response.text)
if len(res):
# hard to extract response
return "\n".join(res[0].replace("\\r\\n", "\r\n").split("\n")[1:])
return ""
def Weather(req):
if req.get("result").get("action") != "WeatherRequest": ## DEFINING THE PREFIX TO EXECUTE FUNCTION
return {}
city = req.get("result").get("parameters").get("geo-city").encode("utf8")
clean = re.compile('ã') ## REMOVING ERROR UTF8
city = re.sub(clean, 'a', city)
city = urllib.quote(city.encode("utf8")) ## REMOVING ACCENTUATION
result = urllib.urlopen(WeatherRequest.format(cidade=city, key=WeatherKey)).read() ## DEFINING URL
query = json.loads(result) ## LOADING JSON TO SELECT SOME PARAMETERS
main = query.get('main')
speech = lang.WeatherMSG.format(cidade=query.get('name'), temperatura=main.get('temp') + 3)
return {
"speech": speech,
"displayText": speech,
"source": source
}
def list_podcast_programs(programs):
program_list = []
# iterate over the contents of the list of programs
for program in programs:
url = build_url({'mode': 'podcasts-radio-program', 'foldername': urllib.quote(program['name'].encode('utf8')), 'url': program['url'], 'name': urllib.quote(program['name'].encode('utf8')), 'radio': program['radio']})
li = xbmcgui.ListItem(program['name'], iconImage='DefaultFolder.png')
program_list.append((url, li, True))
# add list to Kodi per Martijn
# http://forum.kodi.tv/showthread.php?tid=209948&pid=2094170#pid2094170
xbmcplugin.addDirectoryItems(addon_handle, program_list, len(program_list))
# set the content of the directory
xbmcplugin.setContent(addon_handle, 'songs')
xbmcplugin.endOfDirectory(addon_handle)
def _start_upload(
self,
digest,
mount=None
):
"""POST to begin the upload process with optional cross-repo mount param."""
if not mount:
# Do a normal POST to initiate an upload if mount is missing.
url = '{base_url}/blobs/uploads/'.format(base_url=self._base_url())
accepted_codes = [httplib.ACCEPTED]
else:
# If we have a mount parameter, try to mount the blob from another repo.
mount_from = '&'.join(
['from=' + urllib.quote(repo.repository, '') for repo in self._mount])
url = '{base_url}/blobs/uploads/?mount={digest}&{mount_from}'.format(
base_url=self._base_url(),
digest=digest,
mount_from=mount_from)
accepted_codes = [httplib.CREATED, httplib.ACCEPTED]
resp, unused_content = self._transport.Request(
url, method='POST', body=None,
accepted_codes=accepted_codes)
return resp.status == httplib.CREATED, resp.get('location') # type: ignore
def urlparts(self):
''' The :attr:`url` string as an :class:`urlparse.SplitResult` tuple.
The tuple contains (scheme, host, path, query_string and fragment),
but the fragment is always empty because it is not visible to the
server. '''
env = self.environ
http = env.get('HTTP_X_FORWARDED_PROTO') or env.get('wsgi.url_scheme', 'http')
host = env.get('HTTP_X_FORWARDED_HOST') or env.get('HTTP_HOST')
if not host:
# HTTP 1.1 requires a Host-header. This is for HTTP/1.0 clients.
host = env.get('SERVER_NAME', '127.0.0.1')
port = env.get('SERVER_PORT')
if port and port != ('80' if http == 'http' else '443'):
host += ':' + port
path = urlquote(self.fullpath)
return UrlSplitResult(http, host, path, env.get('QUERY_STRING'), '')
def urlparts(self):
''' The :attr:`url` string as an :class:`urlparse.SplitResult` tuple.
The tuple contains (scheme, host, path, query_string and fragment),
but the fragment is always empty because it is not visible to the
server. '''
env = self.environ
http = env.get('HTTP_X_FORWARDED_PROTO') or env.get('wsgi.url_scheme', 'http')
host = env.get('HTTP_X_FORWARDED_HOST') or env.get('HTTP_HOST')
if not host:
# HTTP 1.1 requires a Host-header. This is for HTTP/1.0 clients.
host = env.get('SERVER_NAME', '127.0.0.1')
port = env.get('SERVER_PORT')
if port and port != ('80' if http == 'http' else '443'):
host += ':' + port
path = urlquote(self.fullpath)
return UrlSplitResult(http, host, path, env.get('QUERY_STRING'), '')
def _tostring_path(varname, value, explode, operator, safe=""):
joiner = operator
if type(value) == type([]):
if explode == "+":
return joiner.join([varname + "." + urllib.quote(x, safe) for x in value])
elif explode == "*":
return joiner.join([urllib.quote(x, safe) for x in value])
else:
return ",".join([urllib.quote(x, safe) for x in value])
elif type(value) == type({}):
keys = value.keys()
keys.sort()
if explode == "+":
return joiner.join([varname + "." + urllib.quote(key, safe) + joiner + urllib.quote(value[key], safe) for key in keys])
elif explode == "*":
return joiner.join([urllib.quote(key, safe) + joiner + urllib.quote(value[key], safe) for key in keys])
else:
return ",".join([urllib.quote(key, safe) + "," + urllib.quote(value[key], safe) for key in keys])
else:
if value:
return urllib.quote(value, safe)
else:
return ""