def spank(self, ctx, user : discord.Member = None):
"""Spank"""
author = ctx.message.author
if not user:
with aiohttp.ClientSession() as session:
async with session.get("https://images-ext-1.discordapp.net/eyJ1cmwiOiJodHRwczovL2Nkbi5kaXNjb3JkYXBwLmNvbS9hdHRhY2htZW50cy8xMDc5NDI2NTIyNzU2MDE0MDgvMTA3OTQ1MDg3MzUwMDc5NDg4L1R1SEdKLmdpZiJ9.-XeFHSFOR0nv53M34HeUBqQc7Wc.gif") as resp:
test = await resp.read()
with open("data/commands/Images/imgres.gif", "wb") as f:
f.write(test)
await self.bot.say("{} spanked someone! :scream:".format(author.mention))
await self.bot.upload("data/commands/Images/imgres.gif")
else:
with aiohttp.ClientSession() as session:
async with session.get("https://images-ext-1.discordapp.net/eyJ1cmwiOiJodHRwczovL2Nkbi5kaXNjb3JkYXBwLmNvbS9hdHRhY2htZW50cy8xMDc5NDI2NTIyNzU2MDE0MDgvMTA3OTQ1MDg3MzUwMDc5NDg4L1R1SEdKLmdpZiJ9.-XeFHSFOR0nv53M34HeUBqQc7Wc.gif") as resp:
test = await resp.read()
with open("data/commands/Images/imgres.gif", "wb") as f:
f.write(test)
await self.bot.say("{} spanked {}! :scream:".format(author.mention, user.mention))
await self.bot.upload("data/commands/Images/imgres.gif")
python类ext()的实例源码
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs, formatter=DogbotHelpFormatter())
# configuration dict
self.cfg = kwargs.get('cfg', {})
# aiohttp session used for fetching data
self.session = aiohttp.ClientSession(loop=self.loop)
# boot time (for uptime)
self.boot_time = datetime.datetime.utcnow()
# aioredis connection
self.redis = None
# asyncpg
self.database = self.cfg['db']['postgres']['database']
self.pgpool = None
# load core extensions
self._exts_to_load = []
self.load_extensions('dog/core/ext', 'Core recursive load')
def load_extensions(self, directory: str, prefix: str = 'Recursive load'):
"""Loads extensions from a directory recursively."""
IGNORE = {'__init__.py', '__pycache__', '.DS_Store'}
base = directory.replace('/', '.')
# build list of extension stems
extension_stems = [
path.stem for path in Path(directory).resolve().iterdir() \
if path.name not in IGNORE and path.suffix != 'pyc'
]
logger.debug('Extensions to load: %s', extension_stems)
for ext in extension_stems:
load_path = base + '.' + ext
logger.info('%s: %s', prefix, load_path)
self.load_extension(load_path)
# keep track of a list of extensions to load
self._exts_to_load = list(self.extensions.keys()).copy()
def update_statistics(pg: asyncpg.connection.Connection, ctx: commands.Context):
"""
Updates command statistics for a specific `discord.ext.commands.Context`.
If no record was found for a command, it is created. Otherwise, the ``times_used`` and
``last_used`` fields are updated.
"""
row = await get_statistics(pg, str(ctx.command))
if row is None:
# first time command is being used, insert it into the database
insert = 'INSERT INTO command_statistics VALUES ($1, 1, $2)'
await pg.execute(insert, str(ctx.command), datetime.datetime.utcnow())
logger.info('First command usage for %s', ctx.command)
else:
# command was used before, increment time_used and update last_used
update = ('UPDATE command_statistics SET times_used = times_used + 1, last_used = $2 '
'WHERE command_name = $1')
await pg.execute(update, str(ctx.command), datetime.datetime.utcnow())
def emote(self, ctx, emote: str):
"""Get a Twitch, FrankerFaceZ, BetterTTV, or Discord emote.
Usage: emote [name of emote]"""
emote = emote.replace(':', '')
with async_timeout.timeout(13):
try:
async with self.bot.cog_http.get('https://static-cdn.jtvnw.net/emoticons/v1/' + str(self.bot.emotes['twitch'][emote]['image_id']) + '/1.0') as resp:
emote_img = await resp.read()
except KeyError: # let's try frankerfacez
try:
async with self.bot.cog_http.get('https://cdn.frankerfacez.com/emoticon/' + str(self.bot.emotes['ffz'][emote]) + '/1') as resp:
emote_img = await resp.read()
except KeyError: # let's try BetterTTV
try:
async with self.bot.cog_http.get(self.bot.emotes['bttv'][emote]) as resp:
emote_img = await resp.read()
except KeyError: # let's try Discord
await ctx.send('**No such emote!** I can fetch from Twitch, FrankerFaceZ, BetterTTV, or Discord (soon).')
return False
img_bytes = io.BytesIO(emote_img)
ext = imghdr.what(img_bytes)
await ctx.send(file=discord.File(img_bytes, f'emote.{ext}'))
def rtfs(self, ctx, *, thing: str):
"""tries to show the source file of a thing
thing may be a non-builtin module, class, method, function, traceback, frame, or code object,
or if surrounded by single or double quotes,
a space separated discord.ext.commands call,
or a period deliminated file/module path as used when importing"""
msg = ctx.message
variables = {
'ctx': ctx,
'bot': self.bot,
'message': msg,
'server': msg.server,
'channel': msg.channel,
'author': msg.author
}
def call(thing, variables):
return self.repl_format_source(eval(thing, variables))
closure = _close(call, thing, variables)
await self.print_results(ctx, _call_catch_fmt(closure, 0))
def add_jose_cog(self, cls: 'class'):
"""Add a cog but load its requirements first."""
requires = cls._cog_metadata.get('requires', [])
log.debug('requirements for %s: %r', cls, requires)
for _req in requires:
req = f'ext.{_req}'
if not self.extensions.get(req):
log.debug('loading %r from requirements', req)
self.load_extension(req)
else:
log.debug('%s already loaded', req)
# We instantiate here because
# instantiating on the old add_cog
# is exactly the cause of the problem
cog = cls(self)
super().add_cog(cog)
def charge_ext(self):
extdiv = "data/biomes/ext/{}.txt"
chemin = extdiv.format(self.system["EXT"])
if os.path.isfile(chemin):
liste = chemin
with open(liste, "r", encoding="ISO-8859-1") as f:
liste = f.readlines()
self.charge = {}
for line in liste:
line = line.replace("\n", "")
tag = line[:3]
item = line[4:]
if tag not in self.charge:
self.charge[tag] = {"TAG" : tag, "ITEMS" : []}
self.charge[tag]["ITEMS"].append(item)
else:
self.save("charge")
return True
else:
return False
def on_command_error(ctx, error):
if isinstance(error, discord.ext.commands.errors.CommandNotFound):
pass # ...don't need to know if commands don't exist
elif isinstance(error, discord.ext.commands.errors.CheckFailure):
await ctx.send("You don't have permission to use this command.")
elif isinstance(error, discord.ext.commands.errors.MissingRequiredArgument):
formatter = commands.formatter.HelpFormatter()
await ctx.send("You are missing required arguments.\n{}".format(formatter.format_help_for(ctx, ctx.command)[0]))
elif isinstance(error, discord.ext.commands.errors.CommandOnCooldown):
await ctx.message.delete()
await ctx.send("This command is on cooldown, don't you dare try it again.", delete_after=10)
else:
if ctx.command:
await ctx.send("An error occurred while processing the `{}` command.".format(ctx.command.name))
print('Ignoring exception in command {0.command} in {0.message.channel}'.format(ctx))
tb = traceback.format_exception(type(error), error, error.__traceback__)
error_trace = "".join(tb)
print(error_trace)
embed = discord.Embed(description=error_trace.translate(bot.escape_trans))
await bot.err_logs_channel.send("An error occurred while processing the `{}` command in channel `{}`.".format(ctx.command.name, ctx.message.channel), embed=embed)
def imgur_remove(self,albumName,imageLink):
"""removes image from album, pass direct image"""
albumName = albumName.lower()
for album in imgurClient.get_account_albums('me'):
if albumName == album.title.lower():
#image = imgurClient.upload_from_url(args[1], config=None, anon=True)
#imgurClient.album_add_images(album.id,image['id'])
images = imgurClient.get_album_images(album.id)
#imageID = urlparse(imageLink).path[1::] #remove / at start
imageID, ext = splitext(urlparse(imageLink).path)
imageID = imageID[1::]
for image in images:
if imageID == image.id:
imgurClient.album_remove_images(album.id,imageID)
await self.bot.say("removed {} from album".format(image.id))
break
else:
#album not found
await self.bot.say("Album: " + albumName + " not found")
def get_ext(url):
"""Return the filename extension from url, or ''."""
parsed = urlparse(url)
root, ext = splitext(parsed.path)
return ext[1:] # or ext if you want the leading '.'
def get_extensions(self, path, excl):
extensions = []
for root, dir, files in os.walk(path):
for items in fnmatch.filter(files, "*"):
temp_extensions = items.rfind(".")
ext = items[temp_extensions+1:]
if ext not in extensions:
if ext in excl:
extensions.append(ext)
pass
return extensions
def on_command_error(error, ctx):
if isinstance(error, discord.ext.commands.errors.CommandNotFound):
pass # ...don't need to know if commands don't exist
if isinstance(error, discord.ext.commands.errors.CheckFailure):
await bot.send_message(ctx.message.channel, "{} You don't have permission to use this command.".format(ctx.message.author.mention))
elif isinstance(error, discord.ext.commands.errors.MissingRequiredArgument):
formatter = commands.formatter.HelpFormatter()
await bot.send_message(ctx.message.channel, "{} You are missing required arguments.\n{}".format(ctx.message.author.mention, formatter.format_help_for(ctx, ctx.command)[0]))
else:
if ctx.command:
await bot.send_message(ctx.message.channel, "An error occured while processing the `{}` command.".format(ctx.command.name))
print('Ignoring exception in command {}'.format(ctx.command), file=sys.stderr)
traceback.print_exception(type(error), error, error.__traceback__, file=sys.stderr)
def get_server(context: Context) -> discord.Server:
""" Gets the server to which a command was sent,
based on the command's context.
:param context: The context in which the command was sent.
:type context: discord.ext.commands.Context
:return: The server
"""
return context.message.server
def get_server_id(context: Context) -> str:
""" Gets the ID of the server to which a command was sent,
based on the command's context.
:param context: The context in which the command was sent.
:type context: discord.ext.commands.Context
:return: The server's ID
"""
server = get_server(context)
return None if server is None else server.id
def get_channel(context: Context) -> discord.Channel:
""" Gets a channel to which a command was sent, based on the command's
context.
:param context: The context in which the command was sent.
:type context: discord.ext.commands.Context
:return: The channel
"""
return context.message.channel
def get_channel_id(context: Context) -> str:
""" Gets the ID of the channel to which a command was sent,
based on the command's context.
:param context: The context in which the command was sent.
:type context: discord.ext.commands.Context
:return: The channel's ID
"""
return get_channel(context).id
def get_channel_name(context: Context) -> str:
""" Gets the name of the channel to which a command was sent,
based on the command's context.
:param context: The context in which the command was sent.
:type context: discord.ext.commands.Context
:return: The channel's name
"""
return get_channel(context).name
def get_author_name(context: Context, nick=False) -> str:
""" Gets the name of the author of a command, based on the command's
context.
:param context: The context in which the command was sent.
:type context: discord.ext.commands.Context
:param nick: Whether the given value should be the real user name,
or the member's nick (if available). Defaults to False
:type nick: bool
:return: The author's name
"""
return get_name(context.message.author, nick)
def author_has_role(context: commands.Context, role_id: str) -> bool:
""" Checks within a command's authors roles for one that has a matching ID
to the one given.
:param context: The context in which the command was sent.
:type context: discord.ext.commands.Context
:param role_id: The ID of a role to check for.
:type role_id: str
:return: True if the author has a role with the given ID, false otherwise.
"""
return has_role(context.message.author, role_id)
def to_boolean(value) -> bool:
"""Parses a string to boolean. Only meant to be used for command arguments.
:param value: The string to evaluate.
:return: The parsed value.
:raise: discord.ext.commands.errors.BadArgument: If the value
cannot be parsed.
.. note::
The valid values are not just 'True' or 'False'.
It can be either 'true', 'on', 'yes' or 'y' for True
or 'false', 'off', 'no' or 'n' for False
and is not case-sensitive (so something like TruE is valid).
"""
if isinstance(value, bool):
return value
value = str(value).lower()
if value in ['1', 'true', 'on', 'yes', 'y']:
return True
elif value in ['0', 'false', 'off', 'no', 'n']:
return False
else:
raise TypeError("Could not parse {} to boolean".format(value))
def on_command_error(self, exception, context):
"""
Override bot.on_command_error function.
Error handling function.
"""
if isinstance(exception, discord.ext.commands.errors.CommandNotFound):
msg = """Sorry, this command is unknown to me... :japanese_ogre:\
\nDo you need help? \
\nIf so, just type ***!help*** :sunglasses:"""
elif isinstance(exception,
discord.ext.commands.errors.DisabledCommand):
msg = ":sleeping:"
elif isinstance(exception,
discord.ext.commands.errors.CheckFailure):
msg = """:octagonal_sign: you are not allowed to do this.\
\nOnly an :alien: can wake me up..."""
elif isinstance(exception,
discord.ext.commands.errors.MissingRequiredArgument):
msg = """:thought_balloon: You forgot parameters.\
\nType !help [command] to get help :interrobang:"""
elif isinstance(exception,
APIconnectionError):
msg = """It seems we can't contact the API...:frowning2: \
\nTake a rest and retry later.:play_pause: """
elif isinstance(exception,
discord.ext.commands.errors.CommandInvokeError):
msg = "oups... " + str(exception)
print(msg)
else:
msg = "oups inconnu... " + str(type(exception)) + str(exception)
print(msg)
await self.send_message(context.message.channel, msg)
def kys(self, ctx):
"""Kill Yourself"""
author = ctx.message.author
with aiohttp.ClientSession() as session:
async with session.get("https://images-ext-2.discordapp.net/eyJ1cmwiOiJodHRwczovL2Nkbi5tZW1lLmFtL2luc3RhbmNlcy81MDB4LzY3NTY4NTg0LmpwZyJ9.oNix5m3kkZkmQK5V8uDlLD_fVe0.jpg") as resp:
test = await resp.read()
with open("data/commands/Images/imgres.png", "wb") as f:
f.write(test)
await self.bot.upload("data/commands/Images/imgres.png")
def datboi(self, ctx):
"""Dait Boi"""
author = ctx.message.author
foo = ["http://i1.kym-cdn.com/photos/images/newsfeed/001/112/704/8a8.jpg", "http://2static.fjcdn.com/pictures/Origins_3c4898_5920950.jpg", "http://i3.kym-cdn.com/photos/images/original/001/116/633/8e9.jpg", "http://www.kappit.com/img/pics/201605_1023_iagcb_sm.jpg", "https://img0.etsystatic.com/125/1/12078849/il_340x270.969775168_rzq8.jpg", "https://i.ytimg.com/vi/fs5Sudmauks/maxresdefault.jpg", "http://i1.kym-cdn.com/photos/images/original/001/118/006/5b1.jpg", "http://i3.kym-cdn.com/photos/images/newsfeed/001/118/078/22b.jpeg", "http://i3.kym-cdn.com/photos/images/newsfeed/001/118/014/e78.png", "https://images-ext-1.discordapp.net/eyJ1cmwiOiJodHRwOi8vY2RuLnNtb3NoLmNvbS9zaXRlcy9kZWZhdWx0L2ZpbGVzLzIwMTYvMDUvZGF0LWJvaS1tZW1lcy1ib3ktd2hvLWxpdmVkLmpwZyJ9.WBSEhlT69xiEWq0KRgR90j9YwDA.jpg?width=243&height=249", "http://i2.kym-cdn.com/photos/images/newsfeed/001/112/712/650.jpg"]
with aiohttp.ClientSession() as session:
async with session.get("{}".format(random.choice(foo))) as resp:
test = await resp.read()
with open("data/commands/Images/imgres.png", "wb") as f:
f.write(test)
await self.bot.say("Here comes dat boi! o shit waddup!:\n")
await self.bot.upload("data/commands/Images/imgres.png")
def reload_modules(self):
"""Reloads all Dogbot related modules."""
# get applicable modules to reload
modules = {k: m for k, m in sys.modules.items() if ('dog' in k and 'datadog' not in k) and 'ext' not in k and
k != 'dog'}
for name, module in modules.items():
logger.info('Reloading bot module: %s', name)
importlib.reload(module)
logger.info('Finished reloading bot modules!')
def get_statistics(pg: asyncpg.connection.Connection, command_name: str) -> \
Union[List[asyncpg.Record], None]:
"""
Fetches statistics for a specific ``discord.ext.commands.Context``.
If no record was found, ``None`` is returned.
"""
return await pg.fetchrow('SELECT * FROM command_statistics WHERE command_name = $1',
command_name)
def get_nitro_embed(self):
emb = discord.Embed(color=0x505e80, description='Discord Nitro is **required** to view this message.')
emb.set_thumbnail(url='https://images-ext-2.discordapp.net/eyJ1cmwiOiJodHRwczovL2Nkbi5kaXNjb3'
'JkYXBwLmNvbS9lbW9qaXMvMjY0Mjg3NTY5Njg3MjE2MTI5LnBuZyJ9.2'
'6ZJzd3ReEjyptc_N8jX-00oFGs')
emb.set_author(name='Discord Nitro Message', icon_url='https://images-ext-1.discordapp.net/eyJ1'
'cmwiOiJodHRwczovL2Nkbi5kaXNjb3JkYXBwLmNvbS9lbW9qaXMvMjYz'
'MDQzMDUxMzM5OTA3MDcyLnBuZyJ9.-pis3JTckm9LcASNN16DaKy9qlI')
return emb
def _reload(self, ctx, *, ext: str = None):
"""Reloads a module. Can only be used by the owner."""
if ext:
self.bot.unload_extension(ext)
self.bot.load_extension(ext)
else:
for m in self.bot.initial_extensions:
log.info('Reloading cog %s', m)
self.bot.unload_extension(m)
self.bot.load_extension(m)
def __init__(self):
super().__init__(command_prefix=_callable_prefix,
formatter=_chiaki_formatter,
description=config.description,
pm_help=None)
# loop is needed to prevent outside coro errors
self.session = aiohttp.ClientSession(loop=self.loop)
self.table_base = None
try:
with open('data/command_image_urls.json') as f:
self.command_image_urls = __import__('json').load(f)
except FileNotFoundError:
self.command_image_urls = {}
self.message_counter = 0
self.command_counter = collections.Counter()
self.custom_prefixes = JSONFile('customprefixes.json')
self.cog_aliases = {}
self.reset_requested = False
psql = f'postgresql://{config.psql_user}:{config.psql_pass}@{config.psql_host}/{config.psql_db}'
self.db = asyncqlio.DatabaseInterface(psql)
self.loop.run_until_complete(self._connect_to_db())
self.db_scheduler = DatabaseScheduler(self.db, timefunc=datetime.utcnow)
self.db_scheduler.add_callback(self._dispatch_from_scheduler)
for ext in config.extensions:
# Errors should never pass silently, if there's a bug in an extension,
# better to know now before the bot logs in, because a restart
# can become extremely expensive later on, especially with the
# 1000 IDENTIFYs a day limit.
self.load_extension(ext)
self._game_task = self.loop.create_task(self.change_game())
def on_command_error(ctx, error):
if isinstance(error, discord.ext.commands.errors.CommandNotFound):
await ctx.author.send(
"{}, this command does not exist!```{}```".format(ctx.message.author.mention, ctx.message.content))
elif isinstance(error, discord.ext.commands.errors.NotOwner):
await ctx.author.send("{}, only my Owner can ask me to do that, nya!```{}```".format(ctx.message.author.mention,
ctx.message.content))
elif isinstance(error, discord.ext.commands.errors.UserInputError):
await ctx.author.send(
"{}, Input error```py\n{}: {}\n```".format(ctx.message.author.mention, type(error).__name__, str(error)))
elif isinstance(error, discord.ext.commands.errors.NoPrivateMessage):
await ctx.author.send(
"{}, this command cannot be send in a PM!```{}```".format(ctx.message.author.mention, ctx.message.content))
elif isinstance(error, discord.ext.commands.errors.CheckFailure):
await ctx.author.send(
"You don\'t have the permission to use this command, {}```{}```".format(ctx.message.author.mention,
ctx.message.content))
else:
await ctx.author.send(
"{}, error```py\n{}: {}\n```".format(ctx.message.author.mention, type(error).__name__, str(error)))
has_mention = False
for arg in ctx.args:
if '@' in str(arg):
has_mention = True
break
if has_mention is True:
return False
await ctx.message.delete()