def _get_league_fixtures_timeframe(self, server_id: str, league_id: str, timeframe: str):
"""Retrieves specific league matchday fixtures from API
Optional timeframe parameter:
The value of the timeFrame argument must start with either p(ast) or n(ext), representing a timeframe either in the past or future. It is followed by a number in the range 1..99. It defaults to n7 in the fixture resource and is unset for fixture as a subresource.
For instance: p6 would return all fixtures in the last 6 days, whereas n23 would result in returning all fixtures in the next 23 days."""
params = {'timeFrame': timeframe}
url = self.api_url + 'competitions/{}/fixtures'.format(league_id)
return await self._make_request(url, params, server_id)
python类ext()的实例源码
def on_message(message):
msgchan = message.channel
if await command(message, "help", True):
cmd = message.content[len(prefix + "help "):]
help_cmds = ""
if cmd == "":
for ext in cmds.keys():
help_cmds += "\n**{}**:\n".format(ext)
for cmda in cmds[ext].keys():
if len(cmda + cmds[ext][cmda]['help']) > 70:
help_cmds += "\t- `{}`: {}...\n".format(cmda, cmds[ext][cmda]['help'][:70])
else:
help_cmds += "\t- `{}`: {}\n".format(cmda, cmds[ext][cmda]['help'])
if len(help_cmds) > 1750:
await say(msgchan, help_cmds)
help_cmds = ""
await say(msgchan, help_cmds)
help_cmds = ""
await say(msgchan, "To get information of a specific command type {}help <command>".format(prefix))
else:
error = 0
for ext in cmds.keys():
try:
temp = cmds[ext][cmd]['help']
await say(msgchan, "`{}` ({}):\n{}\n\nUsage:\n`{}`".format(cmd, ext, cmds[ext][cmd]['help'], prefix + cmds[ext][cmd]['usage']))
temp = None
except:
temp = None
error += 1
if error == len(cmds.keys()):
await say(msgchan, "The command you entered ({}) could not be found.".format(cmd))
def get_source(self, thing):
"""returns a source object of a thing
thing may be a non-builtin module, class, method, function, traceback, frame, or code object,
or a space separated discord.ext.commands call,
or a period deliminated file/module path as used when importing
"""
if isinstance(thing, str):
if '.' in thing: # import
modules = thing.split('.')
def get_last_attr(prev, attrs):
try:
return prev.callback
except AttributeError:
if not attrs:
return prev
return get_last_attr(getattr(prev, attrs.pop(0)),
attrs)
thing = get_last_attr(__import__(modules.pop(0)), modules)
else: # space delimited command call
names = thing.split()
thing = self.bot.commands[names.pop(0)]
for name in names:
thing = thing.commands[name]
thing = thing.callback
return Source(thing)
def handle_message_mention(self, origin, message):
perms = origin.permissions_in(message.channel)
if not perms.read_messages:
# if you can't read the messages, then you shouldn't get PM'd.
return
messages = []
async for msg in self.bot.logs_from(message.channel, limit=3, before=message):
messages.append(msg)
messages.reverse()
messages.append(message)
if origin.status != discord.Status.online:
# wait 30 seconds for context if it's not always on
await asyncio.sleep(30)
# get an updated reference, server references don't change
member = message.server.get_member(origin.id)
if member is None or member.status == discord.Status.online:
# they've come online, so they might have checked the mention
return
# get the messages after this one
ext = []
async for msg in self.bot.logs_from(message.channel, limit=3, after=message):
ext.append(msg)
ext.reverse()
messages.extend(ext)
try:
fmt = 'You were mentioned in {0.channel.mention}:\n{1}'
fmt = fmt.format(message, '\n'.join(map(self.format_message, messages)))
await self.bot.send_message(origin, fmt)
except:
# silent failure best failure
pass
def audioplayer(self, ctx, error_on_none=True):
# TODO: ACCOUNT FOR WHEN THIS MESSAGE IS A PM
if isinstance(ctx, discord.ext.commands.Context):
if ctx.message.guild is None: # This is a private channel, so give it user
ctx = ctx.message.author
else:
ctx = ctx.message.guild
if isinstance(ctx, discord.User):
author = ctx
for audioplayer in self.audioplayers:
member = audioplayer.guild.get_member(author.id)
if member and member.voice and audioplayer.voice and audioplayer.voice.channel.id == member.voice.channel.id:
if botdata.guildinfo(audioplayer.guild).is_banned(member):
raise AudioPlayerNotFoundError("Nice try, but you're banned in the voice channel that I'm in")
return audioplayer
if error_on_none:
raise AudioPlayerNotFoundError("You're not in any voice channels that I'm in")
else:
return None
elif isinstance(ctx, discord.Guild):
guild = ctx
elif isinstance(ctx, discord.abc.GuildChannel):
guild = ctx.guild
else:
raise ValueError(f"Incorrect type '{type(ctx)}' given to audioplayer function")
for audioplayer in self.audioplayers:
if audioplayer.guild == guild:
return audioplayer
if error_on_none:
raise AudioPlayerNotFoundError(f"I'm not in a voice channel on this server/guild. Have an admin do `{self.bot.command_prefix}summon` to put me in one.")
else:
return None
# Connects an audioplayer for the correct guild to the indicated channel
def load_all(self):
"""Load all extensions in the extensions list,
but make sure all dependencies are matched for every cog."""
for extension in extensions:
try:
self.load_extension(f'ext.{extension}')
except Exception as err:
log.error(f'Failed to load {extension}', exc_info=True)
sys.exit(1)
def midi(self, ctx: commands.Context,
tempo: int=120, *, data: str=None):
"""
Convert text to MIDI. Multiple channels can be used by splitting text with |.
Letters are converted to their pitch values using a mapping.
Full documentation about it is not provided, read the code at
https://github.com/lnmds/jose/blob/master/ext/midi.py
To give longer input than a discord message allows you may upload a .txt file of up to 20 KiB.
"""
if data is None:
try:
data = await self.download_data(ctx.message)
except Exception as err:
log.exception('error downloading file at midi')
raise self.SayException('We had an error while downloading '
'the file, are you sure it is text?')
before = time.monotonic()
midi_file = await self.make_midi(tempo, data)
duration = (time.monotonic() - before) * 1000
if midi_file is None:
return await ctx.send('Failed to generate a MIDI file!')
file = io.BytesIO()
await self.loop.run_in_executor(None, midi_file.writeFile, file)
file.seek(0)
wrapped = discord.File(file, filename='boop.midi')
await ctx.send(f'Took {duration:.3f}ms!', file=wrapped)
def __init__(self, bot):
self.bot = bot
self.player = dataIO.load_json("data/biomes/player.json")
self.system = dataIO.load_json("data/biomes/system.json")
self.charge = dataIO.load_json("data/biomes/charge.json")
self.sponsor = dataIO.load_json("data/biomes/sponsor.json")
defpar = {"SPONSOR_NB": 0, "STOP" : False, "EXT" : None,"OPEN" : False, "PLAYING" : False,"PLAYERS" : 0,"MAX_PLAYERS" : 12, "SAISON" : 1}
extdiv = "data/biomes/ext/{}.txt"
def ext(self, ctx, val:str):
"""Change l'extension de la session."""
nom = val
val += ".txt"
exts = os.listdir("data/biomes/ext/")
if val in exts:
self.system["EXT"] = nom
self.save("system")
await self.bot.say("Extension {} selectionnée ".format(nom.title()))
else:
await self.bot.say("Cette extension n'existe pas.")
def check_folders():
if not os.path.exists("data/biomes"):
print("Creation du fichier Biomes...")
os.makedirs("data/biomes")
if not os.path.exists("data/biomes/ext/"):
print("Creation du fichier Extensions Biomes...")
os.makedirs("data/biomes/ext/")
def build_rtfm_lookup_table(self):
cache = {}
page_types = {
'rewrite': (
'http://discordpy.rtfd.io/en/rewrite/api.html',
'http://discordpy.rtfd.io/en/rewrite/ext/commands/api.html'
),
'latest': (
'http://discordpy.rtfd.io/en/latest/api.html',
)
}
for key, pages in page_types.items():
sub = cache[key] = {}
for page in pages:
async with self.bot.session.get(page) as resp:
if resp.status != 200:
raise RuntimeError('Cannot build rtfm lookup table, try again later.')
text = await resp.text(encoding='utf-8')
root = etree.fromstring(text, etree.HTMLParser())
nodes = root.findall(".//dt/a[@class='headerlink']")
for node in nodes:
href = node.get('href', '')
as_key = href.replace('#discord.', '').replace('ext.commands.', '')
sub[as_key] = page + href
self._rtfm_cache = cache
def steam_error(self, error, ctx):
if isinstance(error, commands.errors.CommandOnCooldown):
seconds = str(error)[34:]
await self.bot.say(f':alarm_clock: Cooldown! Versuche es in {seconds} erneut')
if isinstance(error, discord.ext.commands.errors.CommandInvokeError):
await self.bot.say(':x: Konnte keine Verbindung zum Steam API aufbauen')
def init_extensions(self):
for ext in os.listdir('extensions'):
if ext.endswith('.py') and not ext.startswith('core'):
try:
self.bot.load_extension(f'extensions.{ext[:-3]}')
self.settings['extensions'].append(f'extensions.{ext[:-3]}')
except:
pass
def download(url, ext : str = "jpg", sizeLimit : int = 8000000, ua : str = 'CorpNewt DeepThoughtBot'):
"""Download the passed URL and return the file path."""
# Set up a temp directory
dirpath = tempfile.mkdtemp()
tempFileName = url.rsplit('/', 1)[-1]
# Strip question mark
tempFileName = tempFileName.split('?')[0]
imagePath = dirpath + "/" + tempFileName
try:
rImage = requests.get(url, stream = True, headers = {'User-agent': ua})
except:
remove(dirpath)
return None
with open(imagePath, 'wb') as f:
for chunk in rImage.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
# Check if the file exists
if not os.path.exists(imagePath):
remove(dirpath)
return None
# Let's make sure it's less than the passed limit
imageSize = os.stat(imagePath)
while int(imageSize.st_size) > sizeLimit:
try:
# Image is too big - resize
myimage = Image.open(imagePath)
xsize, ysize = myimage.size
ratio = sizeLimit/int(imageSize.st_size)
xsize *= ratio
ysize *= ratio
myimage = myimage.resize((int(xsize), int(ysize)), Image.ANTIALIAS)
myimage.save(imagePath)
imageSize = os.stat(imagePath)
except Exception:
# Image too big and can't be opened
remove(dirpath)
return None
try:
# Try to get the extension
img = Image.open(imagePath)
ext = img.format
img.close()
except Exception:
# Not something we understand - error out
remove(dirpath)
return None
if ext:
os.rename(imagePath, '{}.{}'.format(imagePath, ext))
return '{}.{}'.format(imagePath, ext)
else:
return imagePath
def load_cogs(bot):
defaults = ("alias", "audio", "customcom", "downloader", "economy",
"general", "image", "mod", "streams", "trivia")
try:
registry = dataIO.load_json("data/red/cogs.json")
except:
registry = {}
bot.load_extension('cogs.owner')
owner_cog = bot.get_cog('Owner')
if owner_cog is None:
print("The owner cog is missing. It contains core functions without "
"which Red cannot function. Reinstall.")
exit(1)
if bot.settings._no_cogs:
bot.logger.debug("Skipping initial cogs loading (--no-cogs)")
if not os.path.isfile("data/red/cogs.json"):
dataIO.save_json("data/red/cogs.json", {})
return
failed = []
extensions = owner_cog._list_cogs()
if not registry: # All default cogs enabled by default
for ext in defaults:
registry["cogs." + ext] = True
for extension in extensions:
if extension.lower() == "cogs.owner":
continue
to_load = registry.get(extension, False)
if to_load:
try:
owner_cog._load_cog(extension)
except Exception as e:
print("{}: {}".format(e.__class__.__name__, str(e)))
bot.logger.exception(e)
failed.append(extension)
registry[extension] = False
dataIO.save_json("data/red/cogs.json", registry)
if failed:
print("\nFailed to load: {}\n".format(" ".join(failed)))
def __init__(self, ctx, bot: commands.Bot, idx_emoji, idx_embed):
"""
Create a new PagedEmbed.
Arguments:
ctx : discord.ext.commands.Context
The context in which the command
to send the PagedEmbed was invoked.
bot : discord.ext.commands.Bot
The bot that the application is
logged in with. In Cogs, this is
usually saved as an instance
attribute `bot`.
idx_emoji : str
The Emoji which is used to navigate
to the initial page, constructed
through arguments and keyword
arguments passed to this constructor.
Example:
embed = PagedEmbed(ctx, bot, "??",
title="Index Page",
description="This page gets shown initially."
)
embed.add_page("??", discord.Embed(
title="Second Page",
description="This page gets shown when you click the ?? emoji."
))
"""
self._ctx = ctx
self._bot = bot
self._msg = None
# Dictionary to contain embed Pages in the following format:
# {
# "idx_emoji": discord.Embed,
# "emoji": discord.Embed,
# "emoji": discord.Embed,
# ...
# }
self._pages = {
idx_emoji: idx_embed
}
self.current_page = self._pages[idx_emoji]
# Attach our event listener to the bot
self._bot.add_listener(self.on_reaction_add)
def _addimage(self, ctx: commands.Context, category: str,
image_url: str=None):
"""Adds a new image to the specified category."""
await self.bot.type()
server = ctx.message.server
if server.id not in os.listdir(self.base):
self.settings[server.id] = default_settings
dataIO.save_json(self.settings_path, self.settings)
os.makedirs(os.path.join(self.base, server.id))
if category not in self._list_image_dirs(server.id):
await self.bot.reply(cf.error("Category not found."))
return
attach = ctx.message.attachments
if len(attach) > 1 or (attach and image_url):
await self.bot.reply(cf.error("Please only provide one file."))
return
url = ""
filename = ""
if attach:
a = attach[0]
url = a["url"]
filename = a["filename"]
elif image_url:
url = image_url
filename = os.path.basename(
"_".join(url.split()).replace("%20", "_"))
else:
await self.bot.reply(cf.error(
"You must provide either a Discord attachment"
" or a direct link to an image."))
return
new_id = str(self.settings[server.id]["next_ids"][category])
self.settings[server.id]["next_ids"][category] += 1
dataIO.save_json(self.settings_path, self.settings)
ext = os.path.splitext(filename)[1]
filepath = os.path.join(
self.base, server.id, category, "{}_{}.{}".format(
category, new_id, ext))
async with aiohttp.get(url) as new_sound:
f = open(filepath, "wb")
f.write(await new_sound.read())
f.close()
await self.bot.reply(cf.info("Image added."))