def cmd_info(self, message, author, server):
"""
Usage: {command_prefix}info
Sends a whole buncha info pertaining to the bot to the chat!
"""
return Response(
'I was coded by SexualRhinoceros and modified by MattBSG. I am currently on v{} ! \nFor documentation on my commands or info on how to get my in your'
' server, check out this link! {}'.format(VERSION, DOCUMENTATION_FOR_BOT), reply=True)
# async def cmd_donate(self, message, author, server):
# """
# Usage: {command_prefix}donate
# Sends a whole buncha info pertaining to rhino's patreon to the chat!
# """
# return Response('Thanks for considering donating! If you want to support me monthly, check out my'
# ' Patreon here\n\t{}\nor for one time, you can find my paypal here\n\t{}'
# ''.format(RHINO_PATREON, RHINO_STREAMTIP),
# reply=True)
python类get()的实例源码
def cmd_ignore(self, message, author, server, option, new_id, reason=None):
"""
Usage: {command_prefix}ignore [ + | - | add | remove ] <channel ID> ["reason"]
Adds or removes the channel ID to the list of ignored channels when outputting to the server log
"""
if await self.has_roles(message.channel, author, server, command='ignore'):
if option not in ['+', '-', 'add', 'remove']:
raise CommandError('Invalid option "%s" specified, use +, -, add, or remove' % option)
try:
channel = discord.utils.get(server.channels, id=new_id)
if not channel:
int('this')
except:
raise CommandError('Invalid Channel: {}'.format(new_id))
if option in ['+', 'add']:
self.server_index[server.id][12].append(channel.id)
else:
try:
self.server_index[server.id][12].remove(channel.id)
except ValueError:
raise CommandError('No such channel in ignore list : {}'.format(new_id))
await self.write_to_modlog(message, author, server, reason)
def cmd_dropdeadbeats(self, message, author, server):
"""
Usage: {command_prefix}dropdeadbeats
Removes the bot from all dead beat servers who never register
"""
if author.id == self.config.master_id:
server_leave_array = []
for server in self.servers:
if server.id not in self.server_index:
rh1 = discord.utils.get(server.members, id=self.user.id)
if datetime.utcnow() - timedelta(hours=24) > rh1.joined_at:
server_leave_array.append(server)
servers_list = []
if server_leave_array:
for dbserver in server_leave_array:
print('Leaving Deadbeat Server : {}'.format(dbserver.name))
servers_list.append(dbserver.name)
await self.leave_server(dbserver)
return Response('Dropped servers: ```%s```' % ', '.join(servers_list), reply=True)
return
def cmd_blserver(self, author, key):
"""
Usage: {command_prefix}lurk
Force the bot to lurk in a server rather than send shit to it or leave
after the time is up
"""
if author.id in [self.config.master_id]:
try:
if discord.utils.get(self.servers, name=key):
await self.leave_server(discord.utils.get(self.servers, name=key))
self.globalbans.add(discord.utils.get(self.servers, name=key).id)
elif discord.utils.get(self.servers, id=key):
await self.leave_server(discord.utils.get(self.servers, id=key))
self.globalbans.add(discord.utils.get(self.servers, id=key).id)
else:
print('I did fuck all')
print('Server %s was blacklisted' % key)
sban = open('config/globalbans.txt', 'a')
self.globalbans = str(self.globalbans)[5:0] + key + '\n'
sban.write(self.globalbans)
sban.close()
return Response(':thumbsup:', reply=True)
except:
return Response(':thumbsdown:', reply=True)
return
def _fetch_page(self, request):
try:
with aiohttp.Timeout(10):
async with aiohttp.get(request['url'], params=request['params'], headers=request['headers']) as response:
try:
assert response.status == 200
if request['type'] == 'json':
content = await response.json()
else:
content = await response.text(request['type'])
obj = {'order':request['order'], 'content':content}
redis_push(self.redis, self.content_key, obj)
except AssertionError:
logging.warning('{} {}'.format(response.status, url))
except: # kinds of error, not only asyncio.TimeoutError
#redis_push(self.redis, self.request_key, request)
pass
def twitch_online(self, stream):
session = aiohttp.ClientSession()
url = "https://api.twitch.tv/kraken/streams/" + stream
header = {'Client-ID': self.settings.get("TWITCH_TOKEN", "")}
try:
async with session.get(url, headers=header) as r:
data = await r.json(encoding='utf-8')
await session.close()
if r.status == 400:
return 400
elif r.status == 404:
return 404
elif data["stream"] is None:
return False
elif data["stream"]:
embed = self.twitch_embed(data)
return embed
except:
return "error"
return "error"
def beam_online(self, stream):
url = "https://beam.pro/api/v1/channels/" + stream
try:
async with aiohttp.get(url) as r:
data = await r.json(encoding='utf-8')
if "online" in data:
if data["online"] is True:
data = self.beam_embed(data)
return data
else:
return False
elif "error" in data:
return None
except:
return "error"
return "error"
def gif(self, *text):
"""Retrieves first search result from giphy
gif [keyword]"""
if len(text) > 0:
if len(text[0]) > 1 and len(text[0]) < 20:
try:
msg = "+".join(text)
search = "http://api.giphy.com/v1/gifs/search?q=" + msg + "&api_key=dc6zaTOxFJmzC"
async with aiohttp.get(search) as r:
result = await r.json()
if result["data"] != []:
url = result["data"][0]["url"]
await self.bot.say(url)
else:
await self.bot.say("Your search terms gave no results.")
except:
await self.bot.say("Error.")
else:
await self.bot.say("Invalid search.")
else:
await self.bot.say("gif [text]")
def emoji(self, ctx, emojiname: str):
'''Gibt eine vergrößerte Version eines angegebenen Emojis zurück
Beispiel:
-----------
:emoji Emilia
'''
emoji = discord.utils.find(lambda e: e.name.lower() == emojiname.lower(), self.bot.emojis)
if emoji:
tempEmojiFile = 'tempEmoji.png'
async with aiohttp.ClientSession() as cs:
async with cs.get(emoji.url) as img:
with open(tempEmojiFile, 'wb') as f:
f.write(await img.read())
f = discord.File(tempEmojiFile)
await ctx.send(file=f)
os.remove(tempEmojiFile)
else:
await ctx.send(':x: Konnte das angegebene Emoji leider nicht finden :(')
def nsfw(self, ctx):
'''Vergibt die Rolle um auf die NSFW Channel zugreifen zu können'''
if ctx.guild.id == loadconfig.__botserverid__:
if loadconfig.__selfassignrole__:
role = discord.utils.get(ctx.guild.roles, name=loadconfig.__selfassignrole__)
if role in ctx.author.roles:
try:
await ctx.author.remove_roles(role)
except:
pass
tmp = await ctx.send(f':x: Rolle **{role}** wurde entfernt')
else:
try:
await ctx.author.add_roles(role)
except:
pass
tmp = await ctx.send(f':white_check_mark: Rolle **{role}** wurde hinzugefügt')
else:
tmp = await ctx.send('**:no_entry:** Es wurde keine Rolle für den Bot eingestellt! Wende dich bitte an den Bot Admin')
else:
tmp = await ctx.send(f'**:no_entry:** This command don\'t work on this server!')
await asyncio.sleep(2 * 60)
await tmp.delete()
await ctx.message.delete()
def _fileCheck(msg):
url = msg.attachments[0]['url']
allowedExtension = ['.exe', '.zip', '.rar']
if url[-4:] in allowedExtension:
name = os.path.basename(url)
downloadPath = 'tmp\\' + name
async with aiohttp.get(url) as download:
with open(downloadPath, 'wb') as f:
f.write(await download.read())
stats = os.stat(downloadPath)
size = stats.st_size
KBSize = round(size / 1024, 3)
MBSize = round(size / 1024 / 1024, 3)
MD5 = _getHash(downloadPath, hashlib.md5())
SHA1 = _getHash(downloadPath, hashlib.sha1())
SHA256 = _getHash(downloadPath, hashlib.sha256())
SHA512 = _getHash(downloadPath, hashlib.sha512())
msg = f'**Name:** {name}\n'
msg += f'**Size:** {MBSize} MB ({size} Bytes)\n'
msg += f'**MD5:** `{MD5}`\n'
msg += f'**SHA1:** `{SHA1}`\n'
msg += f'**SHA256:** `{SHA256}`\n'
msg += f'**SHA512:** `{SHA512}`\n'
os.remove(downloadPath)
return msg
def isitdown(self, url):
"""Checks if a website is down or up."""
if url == "":
await self.bot.say("You haven't entered a website to check.")
return
if "http://" not in url or "https://" not in url:
url = "http://" + url
try:
with aiohttp.Timeout(15):
await self.bot.say("Testing " + url + "…")
try:
response = await aiohttp.get(url, headers = { 'user_agent': headers })
if response.status == 200:
await self.bot.say(url + " is up and running.")
else:
await self.bot.say(url + " is down.")
except:
await self.bot.say(url + " is down.")
except asyncio.TimeoutError:
await self.bot.say(url + " is down.")
def __init__(self, bot):
self.bot = bot
self.dota_settings = fileIO("data/dota/settings.json", "load")
# Check for key either in settings or in ENV
if "key" in self.dota_settings.keys() and self.dota_settings["key"] != "":
# If exists in setting and is set
api.set_api_key(self.dota_settings["key"])
self.key = True
elif os.environ.get("DOTA2_API_KEY") is not None:
# If exists in env vars and is set
api.set_api_key(os.environ.get("DOTA2_API_KEY"))
self.key = True
else:
self.key = False
def print_token(self, url, realm):
thumb_url = 'http://wowtokenprices.com/assets/wowtokeninterlaced.png'
try:
async with aiohttp.get(url, headers=self.header) as response:
soup = BeautifulSoup(await response.text(), "html.parser")
data = soup.find('div', class_=realm + '-region-div')
desc = data.div.a.h3.text
buy_price = data.div.find('p', class_='money-text').text
trend = data.div.find('span', class_='money-text-small').text
# Update time broken, returns --:-- -- when requested by bot
#updated = data.div.find('p', id=realm + '-datetime').text
embed = discord.Embed(title='WoW Token Info', description=desc, colour=0xFFD966)
embed.set_thumbnail(url=thumb_url)
embed.add_field(name='Buy Price', value=buy_price, inline=False)
embed.add_field(name='Change', value=trend, inline=False)
#embed.set_footer(text='Updated: ' + updated)
await self.bot.say(embed=embed)
except:
await self.bot.say("Error finding WoW token prices.")
def _steam_url_to_text_id(self, server_id: str,
vanityurl: str) -> str:
api_key = self.settings[server_id]["steam_api_key"]
url = "http://api.steampowered.com/ISteamUser/"
"ResolveVanityURL/v0001/?key={}&vanityurl={}".format(
api_key, vanityurl)
steam64_id = None
async with aiohttp.get(url) as res:
response = json.loads(await res.text())["response"]
if response["success"] != 1:
raise SteamUrlError(
"'{}' is not a Steam vanity URL.".format(vanityurl))
steam64_id = int(response["steamid"])
account_id = steam64_id & ((1 << 32) - 1)
universe = (steam64_id >> 56) & ((1 << 8) - 1)
I = universe
J = account_id & 1
K = (account_id >> 1) & ((1 << 31) - 1)
return "STEAM_{}:{}:{}".format(I, J, K)
def lyrics_from_path(path):
"""Gets the lyrics from a song path"""
with requests.get(path) as page:
html = BeautifulSoup(page.text, "html.parser")
[h.extract() for h in html('script')]
lyrics = html.find("div", class_="lyrics").get_text()
return lyrics
def on_ready(self):
self.emptycogLoaded = os.path.exists('data/cmdcommands/emptycog.py')
if not self.emptycogLoaded:
print('Emptycog.py was not found, trying to download')
try:
async with aiohttp.get("https://raw.githubusercontent.com/PlanetTeamSpeakk/PTSCogs-attributes/master/emptycog.py") as r:
emptycog = await r.content.read()
with open('data/cmdcommands/emptycog.py','wb') as f:
f.write(emptycog)
print('Succesfully downloaded emptycog.py')
except Exception as e:
print(e)
print("Error occured, did not download emptycog.py, go to https://raw.githubusercontent.com/PlanetTeamSpeakk/PTSCogs/master/emptycog.py press ctrl+s and save it in the data/cmdcommands/ folder.")
else:
pass
def botowner(self, ctx):
"""Shows you who's boss!"""
owner = discord.utils.get(self.bot.get_all_members(), id=self.bot.settings.owner)
if owner != None:
await self.bot.say("My owner is {}.".format(owner.mention))
else:
await self.bot.say("I don't know who my owner is ¯\_(?)_/¯.")
def qrcode(self, ctx, url):
"""Creates a qrcode from a link."""
shorten = Shortener('Bitly', bitly_token='dd800abec74d5b12906b754c630cdf1451aea9e0')
short_link = shorten.short(url)
async with aiohttp.get(shorten.qrcode(width=128, height=128)) as r:
file = await r.content.read()
number = random.randint(1000, 9999)
fileloc = "data/useful/qrcode{}.png".format(number)
with open(fileloc, 'wb') as f:
f.write(file)
file = None
f = None
await self.bot.send_file(ctx.message.channel, fp="data/useful/qrcode{}.png".format(number), filename="qrcode{}.png".format(number))
os.remove("data/useful/qrcode{}.png".format(number))
def time(self, ctx, *, place):
"""Get the time of a place somewhere on the earth
Example:
[p]time Los Angeles
[p]time Amsterdam, Netherlands"""
if "geocodingkey" not in self.settings or self.settings['geocodingkey'] == "key_here":
await self.bot.say("The geocoding key is not yet set if you're my owner you can set it with {}setgeocodingkey.".format(ctx.prefix))
elif "timezonekey" not in self.settings or self.settings['timezonekey'] == "key_here":
await self.bot.say("The timezone key is not yet set if you're my owner you can set it with {}settimezonekey.".format(ctx.prefix))
else:
message = await self.bot.say("Getting data...")
request = requests.get("https://maps.googleapis.com/maps/api/geocode/json?address={}&key={}".format(place, self.settings['geocodingkey'])).json()
if request['status'] == "ZERO_RESULTS":
await self.bot.edit_message(message, "Could not find any results for **{}**.".format(place))
elif request['status'] == "OK":
lng = request['results'][0]['geometry']['location']['lng']
lat = request['results'][0]['geometry']['location']['lat']
fulladdr = request['results'][0]['formatted_address']
timestamp = int(datetime.datetime.utcnow().timestamp())
request = requests.get("https://maps.googleapis.com/maps/api/timezone/json?location={},{}×tamp={}&key={}".format(lat, lng, timestamp, self.settings['timezonekey'])).json()
if request['status'] != "OK":
await self.bot.say("An unknown error occured while getting the time and timezone from the Google API.")
else:
timestamp += request['dstOffset'] + request['rawOffset']
time = datetime.datetime.fromtimestamp(timestamp)
await self.bot.edit_message(message, "**{}**\n\t{} ({})".format(time.strftime("%d %b %Y %H:%M:%S"), fulladdr, request['timeZoneName']))
else:
await self.bot.say("An unknown error occured while getting the longitude and latitude from the Google API.")