def slash(self, ctx, user: discord.Member = None):
"""Slash a fool!"""
author = ctx.message.author
if user:
with aiohttp.ClientSession() as session:
async with session.get("https://cdn.discordapp.com/attachments/175246808967151616/217342324919894017/7e469f8443630de4f8cedb1c87b161d3.jpg") as resp:
test = await resp.read()
with open("data/commands/Images/imgres.png", "wb") as f:
f.write(test)
await self.bot.say("{} just slashed {}!\nFOOL!".format(author.mention, user.mention))
await self.bot.upload("data/commands/Images/imgres.png")
else:
with aiohttp.ClientSession() as session:
async with session.get("https://cdn.discordapp.com/attachments/175246808967151616/217342324919894017/7e469f8443630de4f8cedb1c87b161d3.jpg") as resp:
test = await resp.read()
with open("data/commands/Images/imgres.png", "wb") as f:
f.write(test)
await self.bot.say("{} just slashed someone. Who knows, he forgot the darn mention.".format(author.mention))
await self.bot.upload("data/commands/Images/imgres.png")
python类ClientSession()的实例源码
def suckmydonut(self, ctx, user : discord.Member = None):
"""Suck My Donuts Beeyatch!"""
author = ctx.message.author
if not user:
with aiohttp.ClientSession() as session:
async with session.get("http://owned.com/media/_cache/adjusted/postblock/image/4/6/7/2/4672.jpg.png") as resp:
test = await resp.read()
with open("data/commands/Images/imgres.png", "wb") as f:
f.write(test)
await self.bot.say("{} says to:".format(author.mention))
await self.bot.upload("data/commands/Images/imgres.png")
else:
with aiohttp.ClientSession() as session:
async with session.get("http://owned.com/media/_cache/adjusted/postblock/image/4/6/7/2/4672.jpg.png") as resp:
test = await resp.read()
with open("data/commands/Images/imgres.png", "wb") as f:
f.write(test)
await self.bot.say("{} tells {} to:".format(author.mention, user.mention))
await self.bot.upload("data/commands/Images/imgres.png")
def spank(self, ctx, user : discord.Member = None):
"""Spank"""
author = ctx.message.author
if not user:
with aiohttp.ClientSession() as session:
async with session.get("https://images-ext-1.discordapp.net/eyJ1cmwiOiJodHRwczovL2Nkbi5kaXNjb3JkYXBwLmNvbS9hdHRhY2htZW50cy8xMDc5NDI2NTIyNzU2MDE0MDgvMTA3OTQ1MDg3MzUwMDc5NDg4L1R1SEdKLmdpZiJ9.-XeFHSFOR0nv53M34HeUBqQc7Wc.gif") as resp:
test = await resp.read()
with open("data/commands/Images/imgres.gif", "wb") as f:
f.write(test)
await self.bot.say("{} spanked someone! :scream:".format(author.mention))
await self.bot.upload("data/commands/Images/imgres.gif")
else:
with aiohttp.ClientSession() as session:
async with session.get("https://images-ext-1.discordapp.net/eyJ1cmwiOiJodHRwczovL2Nkbi5kaXNjb3JkYXBwLmNvbS9hdHRhY2htZW50cy8xMDc5NDI2NTIyNzU2MDE0MDgvMTA3OTQ1MDg3MzUwMDc5NDg4L1R1SEdKLmdpZiJ9.-XeFHSFOR0nv53M34HeUBqQc7Wc.gif") as resp:
test = await resp.read()
with open("data/commands/Images/imgres.gif", "wb") as f:
f.write(test)
await self.bot.say("{} spanked {}! :scream:".format(author.mention, user.mention))
await self.bot.upload("data/commands/Images/imgres.gif")
def _listen(self):
"""Listen to the WebSocket URL."""
with ClientSession() as session:
ws = yield from session._ws_connect(self.rtm['url'])
self.ws = ws
try:
while True:
msg = yield from ws.receive()
if msg.tp == MsgType.close:
break
assert msg.tp == MsgType.text
message = json.loads(msg.data)
yield from self.queue.put(message)
finally:
yield from ws.close()
def make_request(self, method, path, params=None, data=None,
authenticated=True, auth_type='Basic', headers={},
token=testing.ADMIN_TOKEN, accept='application/json'):
settings = {}
headers = headers.copy()
settings['headers'] = headers
if accept is not None:
settings['headers']['ACCEPT'] = accept
if authenticated and token is not None:
settings['headers']['AUTHORIZATION'] = '{} {}'.format(
auth_type, token)
settings['params'] = params
settings['data'] = data
async with aiohttp.ClientSession(loop=self.loop) as session:
operation = getattr(session, method.lower(), None)
async with operation(self.server.make_url(path), **settings) as resp:
try:
value = await resp.json()
status = resp.status
except: # noqa
value = await resp.read()
status = resp.status
return value, status, resp.headers
def get_packages_info(max_pkgs=MAX_PKGS, start_time=None):
await asyncio.sleep(1) # ensure the server is highly responsive on bootup
fmt = 'Gathering Python 3 support info on the top {:,} PyPI packages...'
print(fmt.format(max_pkgs))
start_time = start_time or time.time()
packages = []
with aiohttp.ClientSession() as session:
tasks = create_tasks(session, max_pkgs)
while tasks:
current_block, tasks = tasks[:200], tasks[200:]
packages += await asyncio.gather(*current_block)
if len(packages) == 200:
html = create_index(packages).splitlines()
html = '\n'.join(line.rstrip() for line in html if line.strip())
with open('index.html', 'w') as out_file:
out_file.write(html)
print('index.html written with {:,} packages after {:.2f} '
'seconds.'.format(len(packages),
time.time() - start_time))
return enhance_packages(packages), datetime.datetime.utcnow()
def get_remote_data(self, url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
if resp.status != 200:
raise web.HTTPBadGateway
session.close()
try:
return await resp.json() or ''
except:
raise web.HTTPBadGateway
def create_session(self):
self.session = await aiohttp.ClientSession(
cookie_jar=self.cookie_jar,
headers={
'User-Agent': 'PyPlanet/{}'.format(pyplanet_version),
'X-ManiaPlanet-ServerLogin': self.server_login
}
).__aenter__()
def test_aiodownload_init_client():
download = AioDownload(client=ClientSession())
assert isinstance(download.client, ClientSession)
def __init__(self, username, secret, uri=EMARSYS_URI):
super().__init__(username, secret, uri)
self.session = aiohttp.ClientSession()
def get(self, resource):
url = self._uri + resource
logger.debug("REST.Get> {}".format(url))
with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
self._validate(url, response)
response = await response.json()
logger.debug("REST.Resp> Response: {}".format(response))
return response
def post(self, resource, params={}):
url = self._uri + resource
logger.debug("REST.Post> {}".format(url))
with aiohttp.ClientSession() as session:
if self._need_auth:
params.update(self._auth.get_params())
async with session.post(url, data=params) as response:
self._validate(url, response)
response = await response.json()
logger.debug("REST.Resp> {}".format(response))
return response
def shorten(self, ctx, url):
"""Shorten url"""
key = self.loadapi["ApiKey"]
shorten = 'https://www.googleapis.com/urlshortener/v1/url?key=' + key
payload = {"longUrl": url}
headers = {'content-type': 'application/json'}
async with aiohttp.ClientSession() as session:
async with session.post(shorten,data=json.dumps(payload),headers=headers) as resp:
print(resp.status)
yes = await resp.json()
await self.bot.say(yes['id'])
def expand(self, ctx, url):
"""Expand goo.gl url"""
key = self.loadapi["ApiKey"]
async with aiohttp.ClientSession() as session:
async with session.get('https://www.googleapis.com/urlshortener/v1/url?key=' + key + '&shortUrl=' + url) as resp:
print(resp.status)
yes = await resp.json()
await self.bot.say(yes['longUrl'])
def analytics(self, ctx, url):
"""Analytics for url"""
key = self.loadapi["ApiKey"]
async with aiohttp.ClientSession() as session:
async with session.get('https://www.googleapis.com/urlshortener/v1/url?key=' + key + '&shortUrl=' + url + '&projection=FULL') as resp:
print(resp.status)
yes = await resp.json()
embed = discord.Embed(colour=discord.Colour.blue())
embed.add_field(name="**Shortened Url:**",value=yes['id'])
embed.add_field(name="**Long Url:**",value=yes['longUrl'])
embed.add_field(name="**Date Created:**",value=yes['created'])
embed.add_field(name="**Clicks:**",value=yes['analytics']['allTime']['shortUrlClicks'])
embed.set_image(url="https://www.ostraining.com/cdn/images/coding/google-url-shortener-tool.jpg")
await self.bot.say(embed=embed)
def __init__(self, bot):
self.bot = bot
self.session = aiohttp.ClientSession(loop=self.bot.loop)
def alias(self, ctx, alias):
"""Create D.io link"""
key = self.loadapi["ApiKey"]
inv = await self.bot.create_invite(ctx.message.server, unique=False)
async with aiohttp.ClientSession() as session:
async with session.get('https://discord.io/api?api={0}&url={1}'.format(key, inv) + "&custom=" + alias + '&format=text') as resp:
print(resp.status)
try:
await self.bot.say(await resp.text())
except discord.errors.HTTPException:
await self.bot.say("Link already exists, or the alias is reserved for Pro users.")
def fetch_user_mal(self, name, url, cmd):
with aiohttp.ClientSession() as session:
async with session.get(url.format(name)) as response:
data = await response.text()
try:
root = ET.fromstring(data)
except ET.ParseError:
return '', ''
else:
if len(root) == 0:
return '', ''
collection = {x.find('series_title').text for x in root.findall(cmd)}
entry = root.find('myinfo')
if cmd == "anime":
info = [entry.find(x).text for x in ['user_watching', 'user_completed',
'user_onhold', 'user_dropped',
'user_days_spent_watching']]
return collection, info
else:
info = [entry.find(x).text for x in ['user_reading', 'user_completed',
'user_onhold', 'user_dropped',
'user_days_spent_watching']]
return collection, info
def get_xml(self, nature, name):
username = self.credentials["Username"]
password = self.credentials["Password"]
name = name.replace(" ", "_")
auth = aiohttp.BasicAuth(login=username, password=password)
url = 'https://myanimelist.net/api/{}/search.xml?q={}'.format(nature, name)
with aiohttp.ClientSession(auth=auth) as session:
async with session.get(url) as response:
data = await response.text()
return data
def _location_pokemon(self, *, poke: str):
"""Get a Pokémon's catch location.
Example !pokedex location voltorb
"""
location_url = "http://pokemondb.net/pokedex/{}".format(poke)
with aiohttp.ClientSession() as session:
async with session.get(location_url) as response:
soup = BeautifulSoup(await response.text(), "html.parser")
loc = []
version = []
div = soup.find('div', attrs={'class': 'col desk-span-7 lap-span-12'})
table = div.find('table', attrs={'class': 'vitals-table'})
cols = [ele.text.strip() for ele in table.find_all('td') if ele]
loc.append(cols)
tcols = [ele.strings for ele in table.find_all('th') if ele]
version.append([', '.join(x) for x in tcols])
# We have to extract out the base index, because it scrapes as
# a list of a list. Then we can stack and tabulate.
extract_loc = loc[0]
extract_version = version[0]
m = list(zip(extract_version, extract_loc))
t = tabulate(m, headers=["Game Version", "Location"])
await self.bot.say("```{}```".format(t))