def moneyconvert(self, amount: float, base: str, to: str):
"""Currency converter
Set the amount, currency FROM (base) and currency TO
Available currencies for conversion:
AUD BGN BRL CAD CHF CNY CZK DKK EUR GBP HKD HRK HUF IDR ILS INR JPY KRW MXN MYR NOK NZD PHP PLN RON RUB SEK SGD THB TRY USD ZAR
***WARNING***
Conversion may not be exact"""
if base.upper() not in self.currencies or to.upper() not in self.currencies:
await self.bot.say('One or both of the currencies selected are invalid')
else:
api = 'http://api.fixer.io/latest?base={}'.format(base.upper())
async with aiohttp.request("GET", api) as r:
result = await r.json()
rate = result['rates'][to.upper()]
converted_amount = amount * rate
pre_conv = '{0:.2f}'.format(amount)
post_conv = '{0:.2f}'.format(converted_amount)
await self.bot.say('`{} {} = {} {}`'.format(base.upper(), pre_conv, to.upper(), post_conv))
python类request()的实例源码
def get_async(ym, x):
'''
??GET????xls????
parameters
==========
ym : str
??
x : dict
???????
'''
with (await sem):
name, sid = x['name'], x['sid']
url = 'http://www.chinabond.com.cn/DownLoadxlsx?sId={}&sBbly={}&sMimeType=4'.format(
sid, ym)
outfilename = 'data/{}{}{}.xls'.format(sid, name, ym)
logging.info('downloading %s', outfilename)
response = await aiohttp.request('GET', url, headers=headers)
with closing(response), open(outfilename, 'wb') as file:
while True: # ????
chunk = await response.content.read(1024)
if not chunk:
break
file.write(chunk)
logging.info('done %s', outfilename)
def send_http_request(self, app: str, service: str, version: str, method: str, entity: str, params: dict):
"""
A convenience method that allows you to send a well formatted http request to another service
"""
host, port, node_id, service_type = self._registry_client.resolve(service, version, entity, HTTP)
url = 'http://{}:{}{}'.format(host, port, params.pop('path'))
http_keys = ['data', 'headers', 'cookies', 'auth', 'allow_redirects', 'compress', 'chunked']
kwargs = {k: params[k] for k in http_keys if k in params}
query_params = params.pop('params', {})
if app is not None:
query_params['app'] = app
query_params['version'] = version
query_params['service'] = service
response = yield from aiohttp.request(method, url, params=query_params, **kwargs)
return response
def _request_sender(self, packet: dict):
"""
Sends a request to a server from a ServiceClient
auto dispatch method called from self.send()
"""
node_id = self._get_node_id_for_packet(packet)
client_protocol = self._client_protocols.get(node_id)
if node_id and client_protocol:
if client_protocol.is_connected():
packet['to'] = node_id
client_protocol.send(packet)
return True
else:
self._logger.error('Client protocol is not connected for packet %s', packet)
raise ClientDisconnected()
else:
# No node found to send request
self._logger.error('Out of %s, Client Not found for packet %s, restarting server...',
self._client_protocols.keys(), packet)
raise ClientNotFoundError()
def get_flag(base_url, cc):
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
resp = yield from aiohttp.request('GET', url)
with contextlib.closing(resp):
if resp.status == 200:
image = yield from resp.read()
return image
elif resp.status == 404:
raise web.HTTPNotFound()
else:
raise aiohttp.HttpProcessingError(
code=resp.status, message=resp.reason,
headers=resp.headers)
# BEGIN FLAGS2_ASYNCIO_EXECUTOR
def http_get(url):
res = yield from aiohttp.request('GET', url)
if res.status == 200:
ctype = res.headers.get('Content-type', '').lower()
if 'json' in ctype or url.endswith('json'):
data = yield from res.json() # <1>
else:
data = yield from res.read() # <2>
return data
elif res.status == 404:
raise web.HTTPNotFound()
else:
raise aiohttp.errors.HttpProcessingError(
code=res.status, message=res.reason,
headers=res.headers)
def do_request():
proxy_url = 'http://198.23.237.213:1080' # your proxy address
response = yield from aiohttp.request(
'GET', 'http://google.com',
proxy=proxy_url,
)
return response
def _send_to_external_chat(self, bot, event, config):
if event.from_bot:
# don't send my own messages
return
conversation_id = event.conv_id
conversation_text = event.text
user_id = event.user_id
url = config["HUBOT_URL"] + conversation_id
payload = {"from" : str(user_id.chat_id), "message" : conversation_text}
headers = {'content-type': 'application/json'}
connector = aiohttp.TCPConnector(verify_ssl=False)
asyncio.ensure_future(
aiohttp.request('post', url, data=json.dumps(payload),
headers=headers, connector=connector)
)
def _cookie(self):
"""Retrieves a random fortune cookie fortune."""
regex = ["class=\"cookie-link\">([^`]*?)<\/a>", "<p>([^`]*?)<\/p>",
"(?:\\\\['])", "<strong>([^`]*?)<\/strong>",
"<\/strong><\/a>([^`]*?)<br>",
"3\)<\/strong><\/a>([^`]*?)<\/div>"]
url = "http://www.fortunecookiemessage.com"
await self.file_check()
async with aiohttp.request("GET", url, headers={"encoding": "utf-8"}) as resp:
test = str(await resp.text())
fortune = re.findall(regex[0], test)
fortest = re.match("<p>", fortune[0])
if fortest is not None:
fortune = re.findall(regex[1], fortune[0])
title = re.findall(regex[3], test)
info = re.findall(regex[4], test)
info[0] = html.unescape(info[0])
dailynum = re.findall(regex[5], test)
self.fortune_process(fortune[0])
await self.bot.say("Your fortune is:")
await self.bot.upload("data/horoscope/cookie-edit.png")
await self.bot.say("\n" + title[1] +
info[1] + "\n" + title[2] + dailynum[0])
os.remove("data/horoscope/cookie-edit.png")
def _font(self, url: str=None):
"""Allows you to set the font that the fortune cookies are shown in.
Only accepts ttf."""
option = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0)'
' Gecko/20100101 Firefox/40.1'}
if url is None :
url = "https://cdn.discordapp.com/attachments/218222973557932032/240223136447070208/FortuneCookieNF.ttf"
if os.is_file("data/horoscope/FortuneCookieNF.ttf"):
return
else:
async with aiohttp.request("GET", url, headers=option) as resp:
test = await resp.read()
with open("data/horoscope/FortuneCookieNF.ttf", "wb") as f:
f.write(test)
elif not url.endswith("ttf"):
await self.bot.say("This is not a .ttf font, please use a .ttf font. Thanks")
return
await self.bot.say("Font has been saved")
def _strawpoll(self, ctx, *, question, options=None):
"""Makes a poll based on questions and choices or options. must be divided by "; "
Examples:
[p]strawpoll What is this person?; Who is this person?; Where is this person?; When is this person coming?
[p]strawpoll What; Who?; Where?; When?; Why?"""
options_list = question.split('; ')
title = options_list[0]
options_list.remove(title)
if len(options_list) < 2:
await self.bot.say("You need to specify 2 or more options")
else:
normal = {"title": title, "options": options_list}
request = dict(normal, **self.settings)
async with aiohttp.request('POST', 'http://strawpoll.me/api/v2/polls',
headers={'content-type': 'application/json'},
data=json.dumps(request)) as resp:
test = await resp.content.read()
test = json.loads(test.decode())
sid = test["id"]
await self.bot.say("Here's your strawpoll link: http://strawpoll.me/{}".format(sid))
def handle_request(self, message, payload):
url = message.path
logger.info('{0} {1}'.format(message.method, url))
if message.method in ('POST', 'PUT', 'PATCH'):
data = yield from payload.read()
else:
data = None
message, data = self.intercept_request(message, data)
if not message:
return
response = yield from aiohttp.request(message.method, url, headers=message.headers,
data=data)
response_content = yield from response.content.read()
response, response_content = self.intercept_response(response, response_content)
yield from self.response_to_proxy_response(response, response_content)
def get_flag(base_url, cc):
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
resp = yield from aiohttp.request('GET', url)
with contextlib.closing(resp):
if resp.status == 200:
image = yield from resp.read()
return image
elif resp.status == 404:
raise web.HTTPNotFound()
else:
raise aiohttp.HttpProcessingError(
code=resp.status, message=resp.reason,
headers=resp.headers)
# BEGIN FLAGS2_ASYNCIO_EXECUTOR
def get_flag(base_url, cc):
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
resp = yield from aiohttp.request('GET', url)
with contextlib.closing(resp):
if resp.status == 200:
image = yield from resp.read()
return image
elif resp.status == 404:
raise web.HTTPNotFound()
else:
raise aiohttp.HttpProcessingError(
code=resp.status, message=resp.reason,
headers=resp.headers)
# BEGIN FLAGS2_ASYNCIO_EXECUTOR
def get_page(self, _url):
''' ????????
return str '''
header = { 'Accept-Encoding': 'gzip' }
header['User-Agent'] = self.ualist[random.randint(0, len(self.ualist)-1)]
if opts['user_agent']: header['User-Agent'] = opts['user_agent']
with (yield from semaphore):
response = yield from aiohttp.request('GET', _url, headers = header)
page = yield from response.read()
try:
if self.url_type == "2": return "None Content"
if self.url_type == "4": return gzip.decompress(page).decode('gb2312').encode('utf-8')
else: return gzip.decompress(page)
except OSError:
return page
def get(self, url, param):
data = {}
_url = self.req["protocol"] + self.req["host"] + ":" + str(self.req["port"]) + url
print(_url +" get?????:"+str(param))
try:
response = yield from aiohttp.request("GET", _url, headers=self.req["header"], params=param)
string = (yield from response.read()).decode('utf-8')
if response.status == 200:
data = json.loads(string)
else:
print("data fetch failed for")
print(response.content, response.status)
data["status_code"] = response.status
print(data)
except asyncio.TimeoutError:
print("????")
except UnicodeDecodeError:
print("?????")
return data
def post(self,url, param):
data = {}
_url = self.req["protocol"] + self.req["host"] + ':' + str(self.req["port"]) + url
print(_url + " post?????:" + str(param))
requests.post(_url,files=None, data=json.dumps(param), headers=self.req["header"])
response = yield from aiohttp.request('POST', _url, data=json.dumps(param), headers=self.req["header"])
string = (yield from response.read()).decode('utf-8')
if response.status == 200:
data = json.loads(string)
else:
print("data fetch failed for")
print(response.content, response.status)
data["status_code"] = response.status
print(data)
return data
def ping_coroutine(self, payload=None):
try:
res = yield from request('get', self._url)
if res.status == 200:
self.pong_received(payload=payload)
res.close()
except Exception:
self.logger.exception('Error while ping')
def test_aiohttp(self):
try:
import aiohttp
except ImportError:
raise SkipTest("Requires aiohttp")
from aiohttp import web
zmq.asyncio.install()
@asyncio.coroutine
def echo(request):
print(request.path)
return web.Response(body=str(request).encode('utf8'))
@asyncio.coroutine
def server(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/', echo)
srv = yield from loop.create_server(app.make_handler(),
'127.0.0.1', 8080)
print("Server started at http://127.0.0.1:8080")
return srv
@asyncio.coroutine
def client():
push, pull = self.create_bound_pair(zmq.PUSH, zmq.PULL)
res = yield from aiohttp.request('GET', 'http://127.0.0.1:8080/')
text = yield from res.text()
yield from push.send(text.encode('utf8'))
rcvd = yield from pull.recv()
self.assertEqual(rcvd.decode('utf8'), text)
loop = asyncio.get_event_loop()
loop.run_until_complete(server(loop))
print("servered")
loop.run_until_complete(client())
observer_async.py 文件源码
项目:Software-Architecture-with-Python
作者: PacktPublishing
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def callback(self, channel, data):
""" Callback method """
# The data is a URL
url = data
# We return the response immediately
print('Fetching URL',url,'...')
future = aiohttp.request('GET', url)
self.futures.append(future)
return future
def fetch (url):
r = yield from aiohttp.request ('GET', url)
print ('%s %s' % (r.status, r.reason), end = " ")
return (yield from r.read ())
def fetch (url):
r = yield from aiohttp.request ('GET', url)
print ('%s %s' % (r.status, r.reason), end = " ")
return (yield from r.read ())
def get_flag(base_url, cc): # <2>
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
resp = yield from aiohttp.request('GET', url)
with contextlib.closing(resp):
if resp.status == 200:
image = yield from resp.read()
return image
elif resp.status == 404:
raise web.HTTPNotFound()
else:
raise aiohttp.HttpProcessingError(
code=resp.status, message=resp.reason,
headers=resp.headers)
def get_flag(base_url, cc): # <2>
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
with closing(await aiohttp.request('GET', url)) as resp:
if resp.status == 200:
image = await resp.read()
return image
elif resp.status == 404:
raise web.HTTPNotFound()
else:
raise aiohttp.HttpProcessingError(
code=resp.status, message=resp.reason,
headers=resp.headers)
def get_flag(cc):
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
res = yield from aiohttp.request('GET', url)
image = yield from res.read()
return image
def download(url):
response = yield from aiohttp.request('GET', url)
for k, v in response.items():
print('{}: {}'.format(k, v[:80]))
data = yield from response.read()
print('\nReceived {} bytes.\n'.format(len(data)))
def _request(self, url_, method_, params=None, data=None, **kwargs):
"""Perform request via `aiohttp.request`_.
.. _aiohttp.request: http://aiohttp.readthedocs.org/en/v0.9.2/client.html#make-a-request
"""
response = yield from aiohttp.request(method_.lower(), url_,
params=params, data=data, loop=self._loop,
**utils.norm_aiohttp_kwargs(**kwargs))
return {
'code': response.status,
'body': (yield from response.text()),
'error': None, # TODO: how errors statues are described in aiohttp?
}
def _handle_forwarding(bot, event, command):
"""Handle message forwarding"""
# Test if message forwarding is enabled
if not bot.get_config_suboption(event.conv_id, 'forwarding_enabled'):
return
forward_to_list = bot.get_config_suboption(event.conv_id, 'forward_to')
if forward_to_list:
logger.debug("{}".format(forward_to_list))
for _conv_id in forward_to_list:
html_identity = "<b><a href='https://plus.google.com/u/0/{}/about'>{}</a></b><b>:</b> ".format(event.user_id.chat_id, event.user.full_name)
html_message = event.text
if not event.conv_event.attachments:
yield from bot.coro_send_message( _conv_id,
html_identity + html_message )
for link in event.conv_event.attachments:
filename = os.path.basename(link)
r = yield from aiohttp.request('get', link)
raw = yield from r.read()
image_data = io.BytesIO(raw)
image_id = None
try:
image_id = yield from bot._client.upload_image(image_data, filename=filename)
if not html_message:
html_message = "(sent an image)"
yield from bot.coro_send_message( _conv_id,
html_identity + html_message,
image_id=image_id )
except AttributeError:
yield from bot.coro_send_message( _conv_id,
html_identity + html_message + " " + link )
def image_upload_single(image_uri, bot):
logger.info("getting {}".format(image_uri))
filename = os.path.basename(image_uri)
r = yield from aiohttp.request('get', image_uri)
raw = yield from r.read()
image_data = io.BytesIO(raw)
image_id = yield from bot._client.upload_image(image_data, filename=filename)
return image_id
def handle(request):
coroutines = [aiohttp.request('get', url) for url in REQEUST_URLS]
results = await asyncio.gather(*coroutines, return_exceptions=True)
response_data = {
url: not isinstance(result, Exception) and result.status == 200
for url, result in zip(REQEUST_URLS, results)
}
body = json.dumps(response_data).encode('utf-8')
return web.Response(body=body, content_type="application/json")