def get(url, *args, **kwargs):
url = url.split('#')[0]
with (yield from sem):
response = yield from aiohttp.request('GET', url, *args, **kwargs)
body = yield from response.read_and_close()
body = body.decode('utf-8')
print('GET', url)
return body
python类request()的实例源码
def fetch(self, url, retry=3):
'''?? URL?????? HTML ??'''
try:
start_time = self.loop.time()
response = await aiohttp.request('GET', url)
time_used = self.loop.time() - start_time
except (TimeoutError, aiohttp.ClientResponseError) as e:
# ??
if retry > 0:
retry -= 1
await asyncio.sleep(1)
return await self.fetch(url, retry)
else:
time_used = self.loop.time() - start_time
logger.error('USE %6.3f s STAT: 500 URL: %s (%s)'
% (time_used, url, e))
return ''
except Exception as e:
time_used = self.loop.time() - start_time
logger.error('USE %6.3f s STAT: 500 URL: %s (%s)'
% (time_used, url, e))
return ''
if not (200 <= response.status < 300):
logger.error('USE %6.3f s STAT: %s URL: %s'
% (time_used, response.status, url))
# ?? html ??????
body = await response.read()
try:
return body.decode('utf-8')
except UnicodeDecodeError:
try:
return body.decode('gbk')
except UnicodeDecodeError:
return body
def file_check(self):
urls = ["https://images-2.discordapp.net/.eJwNwcENwyAMAMBdGABDCWCyDSKIoCYxwuZVdff27qPWvNSuTpHBO8DRudA8NAvN3Kp"
"uRO2qeXTWhW7IIrmcd32EwQbjMCRMaJNxPmwILxcRg_9Da_yWYoQ3dV5z6fE09f0BC6EjAw.B0sII_QLbL9kJo6Zbb4GuO4MQNw",
"https://cdn.discordapp.com/attachments/218222973557932032/240223136447070208/FortuneCookieNF.ttf"]
option = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0)'
' Gecko/20100101 Firefox/40.1'}
if os.path.exists("data/horoscope/cookie.png"):
async with aiohttp.request("GET", urls[0], headers=option) as resp:
test = await resp.read()
meow = False
with open("data/horoscope/cookie.png", "rb") as e:
if len(test) != len(e.read()):
meow = True
if meow:
with open("data/horoscope/cookie.png", "wb") as f:
f.write(test)
elif not os.path.exists("data/horoscope/cookie.png"):
async with aiohttp.request("GET", urls[0], headers=option) as resp:
test = await resp.read()
with open("data/horoscope/cookie.png", "wb") as f:
f.write(test)
if not os.path.exists("data/horoscope/FortuneCookieNF.ttf"):
async with aiohttp.request("GET", urls[1], headers=option) as resp:
test = await resp.read()
with open("data/horoscope/FortuneCookieNF.ttf", "wb") as f:
f.write(test)
def add(self, ctx, name, url):
"""Allows you to add emotes to the emote list
[p]emotes add pan http://i.imgur.com/FFRjKBW.gifv"""
server = ctx.message.server
name = name.lower()
option = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36'}
if server.id not in self.servers:
# default off
self.servers[server.id] = dict({"status": False})
if "emotes" not in self.servers[server.id]:
self.servers[server.id]["emotes"] = dict()
dataIO.save_json(self.data_path, self.servers)
if not url.endswith((".gif", ".gifv", ".png")):
await self.bot.say("Links ending in .gif, .png, and .gifv are the only ones accepted."
"Please try again with a valid emote link, thanks.")
return
if name in self.servers[server.id]["emotes"]:
await self.bot.say("This keyword already exists, please use another keyword.")
return
if url.endswith(".gifv"):
url = url.replace(".gifv", ".gif")
try:
await self.bot.say("Downloading {}.".format(name))
async with aiohttp.request("GET", url, headers=option) as r:
emote = await r.read()
with open(self.emote+"{}.{}".format(name, url[-3:]), 'wb') as f:
f.write(emote)
await self.bot.say("Adding {} to the list.".format(name))
self.servers[server.id]["emotes"][name] = "{}.{}".format(name, url[-3:])
self.servers[server.id]["emotes"]
dataIO.save_json(self.data_path, self.servers)
await self.bot.say("{} has been added to the list".format(name))
except Exception as e:
print(e)
await self.bot.say("It seems your url is not valid,"
" please make sure you are not typing names with spaces as they are and then the url."
" If so, do [p]emotes add name_with_spaces url")
def _quotes(self, ctx, *, author: str):
"""Retrieves a specified number of quotes from a specified author. Max number of quotes at a time is 5.
Examples:
[p]quotes Morgan Freeman; 5
[p]quotes Margaret Thatcher; 2"""
regex = "title=\"view quote\">([^`]*?)<\/a>"
url = 'http://www.brainyquote.com/quotes/authors/'
try:
author = author.split('; ')
title = author[0]
number = int(author[1])
if number > 5:
number = 5
await self.bot.reply("Heck naw brah. 5 is max. Any more and you get killed.")
url = url + title.lower()[0] + "/" + title.lower().replace(" ", "_") + ".html"
async with aiohttp.request("GET", url) as resp:
test = str(await resp.text())
quote_find = list(set(re.findall(regex, test)))
for i in range(number):
random_quote = choice(quote_find)
quote_find.remove(random_quote)
while random_quote == title:
random_quote = choice(quote_find)
random_quote = random_quote.replace("'", "'") if "'" in random_quote else random_quote
await self.bot.say(box(random_quote))
await asyncio.sleep(1)
except IndexError:
await self.bot.say("Your search is not valid, please follow the examples."
"Make sure the names are correctly written\n"
"[p]quotes Margaret Thatcher; 5\n[p]quotes Morgan Freeman; 5")
def dadjoke(self):
"""Gets a random dad joke."""
api = 'https://icanhazdadjoke.com/'
async with aiohttp.request('GET', api, headers={'Accept': 'text/plain'}) as r:
result = await r.text()
await self.bot.say('`' + result + '`')
def typeracer(self, user: str):
"""Get user stats from typeracer API"""
api = 'http://data.typeracer.com/users?id=tr:{}'.format(user)
async with aiohttp.request("GET", api) as r:
if r.status == 200:
result = await r.json()
random_colour = int("0x%06x" % random.randint(0, 0xFFFFFF), 16)
last_scores = '\n'.join(str(int(x)) for x in result['tstats']['recentScores'])
embed = discord.Embed(colour=random_colour)
embed.set_author(name=result['name'])
embed.add_field(name='Country', value=':flag_{}:'.format(result['country']))
embed.add_field(name='Level', value=result['tstats']['level'])
embed.add_field(name='Wins', value=result['tstats']['gamesWon'])
embed.add_field(name='Recent WPM', value=int(result['tstats']['recentAvgWpm']))
embed.add_field(name='Average WPM', value=int(result['tstats']['wpm']))
embed.add_field(name='Best WPM', value=int(result['tstats']['bestGameWpm']))
embed.add_field(name='Recent scores', value=last_scores)
embed.set_footer(text='typeracer.com')
embed.url = 'http://play.typeracer.com/'
await self.bot.say(embed=embed)
else:
await self.bot.say('`Unable to retieve stats for user {}`'.format(user))
def _authenticate(self, data):
async with aiohttp.request('GET', '{}/oauth2/profile'.format(settings.OSF['ACCOUNTS_URL']), headers={
'Authorization': 'Bearer {}'.format(data.get('access_token'))
}) as resp:
if resp.status != 200:
raise exceptions.Unauthorized()
return 'user', 'osf', (await resp.json())['id']
# TODO Fuzz test me
def send_remote(self, url, data, headers=None, callback=None):
headers = headers or {}
if not self.state.should_try():
message = self._get_log_message(data)
self.error_logger.error(message)
return
async def _future():
resp = await aiohttp.request('POST', url, data=data, headers=headers)
await resp.release()
asyncio.ensure_future(_future())
def http_request(self, http_method: str, url: str, send_json: dict,
expected_statuses: List[str]=None) -> Tuple[int, dict, str]:
"""
Internal use.
Method to make PATCH/POST requests to server using requests library.
"""
self.assert_sync()
import requests
logger.debug('%s request: %s', http_method.upper(), send_json)
expected_statuses = expected_statuses or HttpStatus.ALL_OK
response = requests.request(http_method, url, json=send_json,
headers={'Content-Type': 'application/vnd.api+json'},
**self._request_kwargs)
if response.status_code not in expected_statuses:
raise DocumentError(f'Could not {http_method.upper()} '
f'({response.status_code}): '
f'{error_from_response(response)}',
errors={'status_code': response.status_code},
response=response,
json_data=send_json)
return response.status_code, response.json() \
if response.content \
else {}, response.headers.get('Location')
def http_request_async(
self,
http_method: str,
url: str,
send_json: dict,
expected_statuses: List[str]=None) \
-> Tuple[int, dict, str]:
"""
Internal use. Async version.
Method to make PATCH/POST requests to server using aiohttp library.
"""
self.assert_async()
logger.debug('%s request: %s', http_method.upper(), send_json)
expected_statuses = expected_statuses or HttpStatus.ALL_OK
content_type = '' if http_method == HttpMethod.DELETE else 'application/vnd.api+json'
async with self._aiohttp_session.request(
http_method, url, data=json.dumps(send_json),
headers={'Content-Type':'application/vnd.api+json'},
**self._request_kwargs) as response:
if response.status not in expected_statuses:
raise DocumentError(f'Could not {http_method.upper()} '
f'({response.status}): '
f'{error_from_response(response)}',
errors={'status_code': response.status},
response=response,
json_data=send_json)
response_json = await response.json(content_type=content_type)
return response.status, response_json or {}, response.headers.get('Location')
def get_body(url):
response = await aiohttp.request('GET', url)
return await response.read()
def request(self, method, url, **kwargs):
"""Perform HTTP request."""
return _RequestContextManager(self._request(method, url, **kwargs))
def get(self, url, *, allow_redirects=True, **kwargs):
"""Perform HTTP GET request."""
return _RequestContextManager(
self._request(hdrs.METH_GET, url,
allow_redirects=allow_redirects,
**kwargs))
def options(self, url, *, allow_redirects=True, **kwargs):
"""Perform HTTP OPTIONS request."""
return _RequestContextManager(
self._request(hdrs.METH_OPTIONS, url,
allow_redirects=allow_redirects,
**kwargs))
def head(self, url, *, allow_redirects=False, **kwargs):
"""Perform HTTP HEAD request."""
return _RequestContextManager(
self._request(hdrs.METH_HEAD, url,
allow_redirects=allow_redirects,
**kwargs))
def post(self, url, *, data=None, **kwargs):
"""Perform HTTP POST request."""
return _RequestContextManager(
self._request(hdrs.METH_POST, url,
data=data,
**kwargs))
def patch(self, url, *, data=None, **kwargs):
"""Perform HTTP PATCH request."""
return _RequestContextManager(
self._request(hdrs.METH_PATCH, url,
data=data,
**kwargs))
def delete(self, url, **kwargs):
"""Perform HTTP DELETE request."""
return _RequestContextManager(
self._request(hdrs.METH_DELETE, url,
**kwargs))
def get(url, **kwargs):
warnings.warn("Use ClientSession().get() instead", DeprecationWarning)
return request(hdrs.METH_GET, url, **kwargs)