def retrieve_json(self,url):
'''
Retrieve data from the Veneer service at the given url path.
url: Path to required resource, relative to the root of the Veneer service.
'''
if PRINT_URLS:
print("*** %s ***" % (url))
if self.protocol=='file':
text = open(self.prefix+url+self.data_ext).read()
else:
conn = hc.HTTPConnection(self.host,port=self.port)
conn.request('GET',quote(url+self.data_ext))
resp = conn.getresponse()
text = resp.read().decode('utf-8')
#text = urlopen(self.base_url + quote(url+self.data_ext)).read().decode('utf-8')
text = self._replace_inf(text)
if PRINT_ALL:
print(json.loads(text))
print("")
return json.loads(text)
python类quote()的实例源码
def retrieve_csv(self,url):
'''
Retrieve data from the Veneer service, at the given url path, in CSV format.
url: Path to required resource, relative to the root of the Veneer service.
NOTE: CSV responses are currently only available for time series results
'''
if PRINT_URLS:
print("*** %s ***" % (url))
req = Request(self.base_url + quote(url+self.data_ext),headers={"Accept":"text/csv"})
text = urlopen(req).read().decode('utf-8')
result = utils.read_veneer_csv(text)
if PRINT_ALL:
print(result)
print("")
return result
def retrieve_json(self,url,**kwargs):
if self.print_urls:
print("*** %s ***" % (url))
try:
text = urlopen(self.base_url + quote(url)).read().decode('utf-8')
except:
self.log("Couldn't retrieve %s"%url)
return None
self.save_data(url[1:],bytes(text,'utf-8'),"json")
if self.print_all:
print(json.loads(text))
print("")
return json.loads(text)
def google_image(message, keywords):
"""
google ????????????
https://github.com/llimllib/limbo/blob/master/limbo/plugins/image.py
"""
query = quote(keywords)
searchurl = "https://www.google.com/search?tbm=isch&q={0}".format(query)
# this is an old iphone user agent. Seems to make google return good results.
useragent = "Mozilla/5.0 (iPhone; U; CPU iPhone OS 4_0 like Mac OS X; en-us) AppleWebKit/532.9 (KHTML, like Gecko) Versio n/4.0.5 Mobile/8A293 Safari/6531.22.7"
result = requests.get(searchurl, headers={"User-agent": useragent}).text
images = list(map(unescape, re.findall(r"var u='(.*?)'", result)))
if images:
botsend(message, choice(images))
else:
botsend(message, "`{}` ???????????????".format(keywords))
def google_map(message, keywords):
"""
google ?????????????
https://github.com/llimllib/limbo/blob/master/limbo/plugins/map.py
"""
query = quote(keywords)
# Slack seems to ignore the size param
#
# To get google to auto-reasonably-zoom its map, you have to use a marker
# instead of using a "center" parameter. I found that setting it to tiny
# and grey makes it the least visible.
url = "https://maps.googleapis.com/maps/api/staticmap?size=800x400&markers={0}&maptype={1}"
url = url.format(query, 'roadmap')
botsend(message, url)
attachments = [{
'pretext': '<http://maps.google.com/maps?q={}|????????>'.format(query),
'mrkdwn_in': ["pretext"],
}]
botwebapi(message, attachments)
def generate_search_url(song, viewsort=False):
""" Generate YouTube search URL for the given song. """
# urllib.request.quote() encodes URL with special characters
song = quote(song)
if viewsort:
url = u"https://www.youtube.com/results?q={0}".format(song)
else:
url = u"https://www.youtube.com/results?sp=EgIQAQ%253D%253D&q={0}".format(song)
return url
def retrieve_resource(self,url,ext):
if self.print_urls:
print("*** %s ***" % (url))
self.save_data(url[1:],urlopen(self.base_url+quote(url)).read(),ext,mode="b")
# Process Run list and results
def youtube(keyword=None):
"""Open youtube.
Args:
keyword (optional): Search word.
"""
if keyword is None:
web.open('https://www.youtube.com/watch?v=L_mBVT2jBFw')
else:
web.open(quote('https://www.youtube.com/results?search_query={}'.format(keyword), RESERVED))
def get_user(self, username):
username_ = urllib2.quote(username)
if six.PY3:
username = base64.encodestring(username_.encode())[:-1]
username = username.decode('utf-8')
if six.PY2:
username = base64.encodestring(username_)[:-1]
return username
def get_user(self, username):
import pdb;pdb.set_trace()
username_ = urllib2.quote(username)
if six.PY3:
username = base64.encodestring(username_.encode())[:-1].decode('utf-8')
if six.PY2:
username = base64.encodestring(username_)[:-1]
return username
def parse_videos(url):
data = b'url=' + urllib.quote(url).encode('ascii')
html = urllib.urlopen('http://yipeiwu.com/getvideo.html', data=data).read().decode('utf-8')
name = _re_name.findall(html)
if name:
name = name[0]
else:
return False
result = _re.findall(html)
return name,result
def play_genres(self, genre_list, player_id=None):
"""Adds then plays a random mix of albums of specified genres"""
gs = genre_list or []
commands = (["playlist clear", "playlist shuffle 1"] +
["playlist addalbum %s * *" % urllib.quote(genre)
for genre in gs if genre] +
["play 2"])
pid = player_id or self.cur_player_id
return self._request(["%s %s" % (pid, com) for com in commands])
def playlist_play(self, path, player_id=None):
"""Play song / playlist immediately"""
self.player_request("playlist play %s" % (urllib.quote(path)),
player_id=player_id)
def playlist_resume(self, name, resume=True, wipe=False, player_id=None):
cmd = ("playlist resume %s noplay:%d wipePlaylist:%d"
% (urllib.quote(name), int(not resume), int(wipe)))
self.player_request(cmd, wait=False, player_id=player_id)
def google(message, keywords):
"""
google ??????????
https://github.com/llimllib/limbo/blob/master/limbo/plugins/google.py
"""
if keywords == 'help':
return
query = quote(keywords)
url = "https://encrypted.google.com/search?q={0}".format(query)
soup = BeautifulSoup(requests.get(url).text, "html.parser")
answer = soup.findAll("h3", attrs={"class": "r"})
if not answer:
botsend(message, "`{}` ???????????????".format(keywords))
try:
_, url = answer[0].a['href'].split('=', 1)
url, _ = url.split('&', 1)
botsend(message, unquote(url))
except IndexError:
# in this case there is a first answer without a link, which is a
# google response! Let's grab it and display it to the user.
return ' '.join(answer[0].stripped_strings)
def xml_set_cdata(_node, _value, _lowercase=False):
"""Helper to set character data in an XML tree"""
if _value is not None and _value != "":
sec = Text()
if _value is str:
_value = quote(_value)
if _lowercase: # Force lowercase.
sec.data = _value.lower()
else:
sec.data = _value
_node.appendChild(sec)
def iriToUri(iri):
parts= urlparse(iri)
pp= [(parti,part) for parti, part in enumerate(parts)]
res=[];
for p in pp:
res.append(p[1] if p[0] != 4 else quote(p[1] ))
return urlunparse(res);
def _get_from_api(self, lang="en"):
word = self.word
baseurl = "https://od-api.oxforddictionaries.com/api/v1"
app_id = "45aecf84"
app_key = "bb36fd6a1259e5baf8df6110a2f7fc8f"
headers = {"app_id": app_id, "app_key": app_key}
word_id = urllib2.quote(word.lower().replace(" ", "_"))
url = baseurl + "/entries/" + lang + "/" + word_id
url = urllib2.Request(url, headers=headers)
response = json.loads(urllib2.urlopen(url).read())
return response["results"]
def quote_base64_encode(text):
"""
Quoting and encoding string using base64 encoding.
"""
quote_text = quote(text)
quote_text = base64.b64encode(bytearray(quote_text, 'utf-8'))
return quote(quote_text)
def escape(s):
return quote(s, safe="~")
def _parse_object_name(object_name):
if isinstance(object_name, list):
object_name = quote(('/'.join(object_name)))
return object_name
def generate_link(user):
return "http://www.codewars.com/{user}".format(user=request.quote(user))
def generate_link(user):
return "http://www.codewars.com/users/{user}".format(user=request.quote(user))
# print(generate_link("matt c"))
def __yahoo_request(query):
"""Request Yahoo Finance information.
Request information from YQL.
`Check <http://goo.gl/8AROUD>`_ for more information on YQL.
"""
query = quote(query)
url = 'https://query.yahooapis.com/v1/public/yql?q=' + query + \
'&format=json&env=store://datatables.org/alltableswithkeys'
response = urlopen(url).read()
return json.loads(response.decode('utf-8'))['query']['results']
def request_quotes(tickers_list, selected_columns=['*']):
"""Request Yahoo Finance recent quotes.
Returns quotes information from YQL. The columns to be requested are
listed at selected_columns. Check `here <http://goo.gl/8AROUD>`_ for more
information on YQL.
>>> request_quotes(['AAPL'], ['Name', 'PreviousClose'])
{
'PreviousClose': '95.60',
'Name': 'Apple Inc.'
}
:param table: Table name.
:type table: string
:param tickers_list: List of tickers that will be returned.
:type tickers_list: list of strings
:param selected_columns: List of columns to be returned, defaults to ['*']
:type selected_columns: list of strings, optional
:returns: Requested quotes.
:rtype: json
:raises: TypeError, TypeError
"""
__validate_list(tickers_list)
__validate_list(selected_columns)
query = 'select {cols} from yahoo.finance.quotes where symbol in ({vals})'
query = query.format(
cols=', '.join(selected_columns),
vals=', '.join('"{0}"'.format(s) for s in tickers_list)
)
response = __yahoo_request(query)
if not response:
raise RequestError('Unable to process the request. Check if the ' +
'columns selected are valid.')
if not type(response['quote']) is list:
return [response['quote']]
return response['quote']
def getStationsFromName(self, userInput):
"""
Query that will return some stations suggestions according to user input
"""
return self.reqhandler.sendrequest("/stations?query=" + urlqt(userInput))
def main():
"""
The entry point for the app. Called when music-scraper is typed in terminal.
Starts the GUI and starts the scraping process after the input is given
"""
curses.initscr()
if curses.COLS < 80 or curses.LINES < 5:
curses.endwin()
print('Terminal\'s dimensions are too small')
return
process = CrawlerProcess({'LOG_ENABLED': False})
def gui_input(screen):
GUI.screen = screen
curses.start_color()
GUI.screen.keypad(1)
curses.init_pair(1, curses.COLOR_BLACK, curses.COLOR_CYAN)
GUI.high_light_text = curses.color_pair(1)
GUI.normal_text = curses.A_NORMAL
GUI.box = curses.newwin(curses.LINES, curses.COLS, 0, 0)
GUI.message = GUI.get_input()
curses.wrapper(gui_input)
s = request.quote(GUI.message)
MusicSpider.start_urls = [
"http://www.google.com/search?q=" + s,
]
process.crawl(MusicSpider)
thread = GUIThread(process, start_gui)
thread.start()
process.start()
if not GUI.gui_stopped:
if len(GUI.strings) == 0:
GUI.box.erase()
GUI.box.addstr(1, 1, "No Results Found... Try with Some other keywords.", GUI.high_light_text)
GUI.add_bottom_menus()
GUI.screen.refresh()
GUI.box.refresh()
else:
GUI.box.addstr(curses.LINES - 2, 1, "Completed Scraping !!", GUI.high_light_text)
GUI.add_bottom_menus()
GUI.screen.refresh()
GUI.box.refresh()
def send_to_es(self, path, method="GET", payload={}):
"""Low-level POST data to Amazon Elasticsearch Service generating a Sigv4 signed request
Args:
path (str): path to send to ES
method (str, optional): HTTP method default:GET
payload (dict, optional): additional payload used during POST or PUT
Returns:
dict: json answer converted in dict
Raises:
#: Error during ES communication
ES_Exception: Description
"""
if not path.startswith("/"):
path = "/" + path
es_region = self.cfg["es_endpoint"].split(".")[1]
# send to ES with exponential backoff
retries = 0
while retries < int(self.cfg["es_max_retry"]):
if retries > 0:
seconds = (2**retries) * .1
# print('Waiting for %.1f seconds', seconds)
time.sleep(seconds)
req = AWSRequest(
method=method,
url="https://%s%s?pretty&format=json" % (self.cfg["es_endpoint"], quote(path)),
data=payload,
headers={'Host': self.cfg["es_endpoint"]})
credential_resolver = create_credential_resolver(get_session())
credentials = credential_resolver.load_credentials()
SigV4Auth(credentials, 'es', es_region).add_auth(req)
try:
preq = req.prepare()
session = Session()
res = session.send(preq)
if res.status_code >= 200 and res.status_code <= 299:
# print("%s %s" % (res.status_code, res.content))
return json.loads(res.content)
else:
raise ES_Exception(res.status_code, res._content)
except ES_Exception as e:
if (e.status_code >= 500) and (e.status_code <= 599):
retries += 1 # Candidate for retry
else:
raise # Stop retrying, re-raise exception