def check_last(self, ctx: commands.Context, name):
""" Shows you how long other users' last session lasted.
:param name: The name (not the nick) of the person to check.
Must use the name#discriminator format.
"""
time_str = printable_time(db_manager.get_user_last_session(name))
if time_str is None:
time_str = "None found."
lib.log("{} queried for {}'s last session time. Result: {}"
.format(lib.get_author_name(ctx, True),
"their" if name == str(ctx.message.author)
else (name + "'s"), time_str))
await self.bot.say("```{}```".format(time_str),
delete_after=self.bot.ans_lifespan * 3)
python类Context()的实例源码
def total(self, ctx: commands.Context, name=None):
""" Shows you the total time a user has used the timer for.
:param name: The name (not the nick) of the person to check.
Must use the name#discriminator format. If none is provided, it will
check your own record.
"""
if name is None:
name = ctx.message.author
time_str = printable_time(db_manager.get_user_total(name))
if time_str is None:
time_str = "None found."
name = str(name)
lib.log("{} queried for {}'s last session time. Result: {}"
.format(lib.get_author_name(ctx, True),
"their" if name == str(ctx.message.author)
else (name + "'s"), time_str))
await self.bot.say("```{}```".format(time_str),
delete_after=self.bot.ans_lifespan * 3)
def toggle_countdown(self, ctx: commands.Context, toggle=None):
""" Turns the timer's countdown setting on or off. If no state is
specified, it will toggle it
:param toggle: True, yes or on to turn the setting on. False, no or off
to turn the setting off. If not specified, or None is given,
it will toggle the setting.
"""
author = ctx.message.author
channel = self.bot.spoof(author, lib.get_channel(ctx))
interface = self.bot.get_interface(channel)
timer = interface.timer
toggle = lib.to_boolean(toggle)
if timer.countdown == toggle:
return # No need to edit it if it's the same.
timer.toggle_countdown(toggle)
await self.bot.edit_message(interface.time_message, timer.time())
await self.bot.say("Successfully toggled the countdown setting {}!"
.format("on" if timer.countdown else "off"),
delete_after=self.bot.ans_lifespan)
def timer_stop(self, ctx: commands.Context):
""" Stops the timer, if it's running.
Resets the current period and time, but keeps the setup.
"""
channel = self.bot.spoof(ctx.message.author, lib.get_channel(ctx))
interface = self.bot.get_interface(channel)
if interface.timer.stop():
send = "Timer will stop soon."
await self.bot.say(send, delete_after=interface.timer.step)
else:
await self.bot.remove_messages(channel)
send = "Timer has stopped."
await self.bot.say(send, tts=interface.tts)
lib.log(send, channel_id=channel.id)
def timer_reset(self, ctx: commands.Context):
""" Resets the timer setup.
"""
channel = self.bot.spoof(ctx.message.author, lib.get_channel(ctx))
interface = self.bot.get_interface(channel)
if interface.timer.get_state() == State.STOPPED:
interface.timer.set_state(None)
interface.timer = None
interface.time_message = None
interface.list_message = None
log = lib.get_author_name(ctx) + " reset the timer."
send = "Successfully reset session configuration."
else:
log = (lib.get_author_name(ctx) + " tried resetting a timer that "
"was running or paused.")
send = "Cannot do that while the timer is not stopped."
lib.log(log, channel_id=channel.id)
await self.bot.say(send, delete_after=self.bot.ans_lifespan)
def timer_superreset(self, ctx: commands.Context):
""" Ignores all conditions and resets the channel's timer.
Requires elevated permissions.
"""
channel = self.bot.spoof(ctx.message.author, lib.get_channel(ctx))
interface = self.bot.get_interface(channel)
if interface.timer.get_state() == State.RUNNING:
self.bot.timers_running -= 1
await self.bot.update_status()
await self.bot.remove_messages(channel)
interface.timer = None
lib.log("Successfully forced a reset on this channel's timer.",
channel_id=channel.id)
await self.bot.say("Timer has been force-reset",
delete_after=self.bot.ans_lifespan)
def admin_shutdown(self, ctx: commands.Context):
""" Exits the program. Administrator only!
"""
lib.log("Shutting down...")
await self.bot.say("Hope I did well, bye!")
for channel, timer in self.bot.valid_timers().items():
if timer.get_state() != State.STOPPED:
timer.stop()
if lib.get_channel(ctx) != channel:
await self.bot.safe_send(
channel,
"I'm sorry, I have to go. See you later!"
)
await self.bot.remove_messages(channel)
self.bot.unsub_all()
await self.bot.logout()
def has_permission(ctx: commands.Context) -> bool:
""" Checks if a user is an administrator or if has the role
that grants elevated permissions.
:param ctx: The context to check the command in.
:type ctx: commands.Context
:return: True if the command succeeds, else raises an exception.
:raises: commands.CheckFailure: If the check fails.
message : "no permissions"
"""
if isinstance(ctx.bot, PomodoroBot) and \
ctx.bot.has_permission(ctx.message.author):
return True
raise commands.CheckFailure(message="no permissions")
def is_admin(ctx: commands.Context) -> bool:
""" Checks if the author of the command is the administrator / owner
of the bot.
:param ctx: The context to check the command in.
:type ctx: commands.Context
:return: True if the command succeeds, else raises an exception.
:raises: commands.CheckFailure: If the check fails.
message : "not admin"
"""
if isinstance(ctx.bot, PomodoroBot) and \
ctx.bot.is_admin(ctx.message.author):
return True
raise commands.CheckFailure(message="not admin")
def channel_has_timer(ctx: commands.Context) -> bool:
""" Checks if a channel has a valid timer set.
:param ctx: The context to check the command in
:type ctx: commands.Context
:return: True if the command succeeds, else raises an exception.
:raises: commands.CheckFailure: If the check fails.
message : "timer not found"
"""
if isinstance(ctx.bot, PomodoroBot):
channel = ctx.bot.spoof(ctx.message.author, lib.get_channel(ctx))
if ctx.bot.get_interface(channel).timer is not None:
return True
raise commands.CheckFailure(message="timer not found")
def unlocked_or_allowed(ctx: commands.Context) -> bool:
""" Checks if a timer is unlocked, or if the author of the command
has permissions to execute such command on a locked timer.
:param ctx: The context to check the command in
:type ctx: commands.Context
:return: True if the command succeeds, else raises an exception.
:raises: commands.CheckFailure: If the check fails.
message : "timer locked"
"""
if isinstance(ctx.bot, PomodoroBot) and \
ctx.bot.is_locked(lib.get_channel(ctx)) and \
not ctx.bot.has_permission(ctx.message.author):
raise commands.CheckFailure(message="timer locked")
return True
def users(self, ctx: commands.Context, count=10):
"""
Show users with the most messages
"""
count = count
# Show at least 1 user and 20 at most
count = max(1, count)
count = min(20, count)
try:
server = self.orm.Server.get(discord_id=ctx.message.server.id)
except DoesNotExist as e:
# If there's no server in the db an exception will be raised
self.config.logger.error(e)
else:
users = await self.orm.query.user_top_list(count, server)
embed = discord.Embed(color=discord.Color(self.config.constants.embed_color),
timestamp=datetime.datetime.now())
for user in users:
# the user might not have a name if s/he hasn't sent a message already
# so in that case use the id instead
name = user.name if user.name != '' else user.discord_id
embed.add_field(name=name, value='Total messages: {}'.format(user.count), inline=False)
await self.bot.say(content='Top active users:', embed=embed)
def telljoke(self, ctx: commands.Context) -> None:
"""
Responds with a random joke from theoatmeal.com
"""
# TODO: get more joke formats (or a better source)
# Send typing as the request can take some time.
await self.bot.send_typing(ctx.message.channel)
# Build a new request object
req: grequests.AsyncRequest = grequests.get('http://theoatmeal.com/djtaf/', timeout=1)
# Send new request
res: List[requests.Response] = grequests.map([req], exception_handler=self.request_exception_handler)
# Since we only have one request we can just fetch the first one.
# res[0].content is the html in bytes
soup = BeautifulSoup(res[0].content.decode(res[0].encoding), 'html.parser')
joke_ul = soup.find(id='joke_0')
joke = joke_ul.find('h2', {'class': 'part1'}).text.lstrip() + '\n' + joke_ul.find(id='part2_0').text
await self.bot.say(joke)
def randomfact(self, ctx: commands.Context) -> None:
"""
Responds with a random fact scraped from unkno.com
"""
# Send typing as the request can take some time.
await self.bot.send_typing(ctx.message.channel)
# Build a new request object
req: grequests.AsyncRequest = grequests.get('http://www.unkno.com/', timeout=1)
# Send new request
res: List[requests.Response] = grequests.map([req], exception_handler=self.request_exception_handler)
# Since we only have one request we can just fetch the first one.
# res[0].content is the html in bytes
soup = BeautifulSoup(res[0].content.decode(res[0].encoding), 'html.parser')
await self.bot.say(soup.find(id='content').text)
def clear(self, ctx: commands.Context, number: int, member: discord.Member = None) -> None:
"""
Purges messages from the channel
:param ctx: The message context
:param number: The number of messages to purge
:param member: The member whose messages will be cleared
"""
if number < 1:
await command_error(ctx, "You must attempt to purge at least 1 message!")
return
def predicate(msg: discord.Message) -> bool:
return msg == ctx.message or member is None or msg.author == member
if number <= 100:
# Add 1 to limit to include command message, subtract 1 from the return to not count it.
msgs = await self.bot.purge_from(ctx.message.channel, limit=number + 1, check=predicate)
send(self.bot, '{} message{} cleared.'.format(len(msgs) - 1, "s" if len(msgs) - 1 != 1 else ""),
ctx.message.channel, True)
else:
await command_error(ctx, 'Cannot delete more than 100 messages at a time.')
def yes_no(ctx: commands.Context,
message: str="Are you sure? Type **yes** within 10 seconds to confirm. o.o"):
"""Yes no helper. Ask a confirmation message with a timeout of 10 seconds.
ctx - The context in which the question is being asked.
message - Optional messsage that the question should ask.
"""
await ctx.send(message)
try:
message = await ctx.bot.wait_for("message", timeout=10,
check=lambda message: message.author == ctx.message.author)
except asyncio.TimeoutError:
await ctx.send("Timed out waiting. :<")
return False
if message.clean_content.lower() not in ["yes", "y"]:
await ctx.send("Command cancelled. :<")
return False
return True
def format_command_error(ex: Exception, context: Context) -> tuple:
"""
Format a command error to display and log.
:param ex: the exception raised.
:param context: the context.
:return: a message to be displayed and logged, and triggered message
"""
triggered = context.message.content
four_space = ' ' * 4
ex_type = type(ex).__name__
msg = (
f'{four_space}Triggered message: {triggered}\n'
f'{four_space}Type: {ex_type}\n'
f'{four_space}Exception: {ex}'
)
return msg, triggered
def convert(self, ctx: commands.Context, argument):
def finder(entry):
try:
user_id = int(argument)
except ValueError:
user_id = None
return (str(entry.user) == argument or # username#discriminator
entry.user.name == argument or # username
entry.user.id == user_id) # id
try:
entry = discord.utils.find(finder, await ctx.guild.bans())
if entry is None:
raise commands.BadArgument(
'Banned user not found. You can specify by ID, username, or username#discriminator.'
)
return entry.user
except discord.Forbidden:
raise commands.BadArgument("I can't view the bans for this server.")
def update_statistics(pg: asyncpg.connection.Connection, ctx: commands.Context):
"""
Updates command statistics for a specific `discord.ext.commands.Context`.
If no record was found for a command, it is created. Otherwise, the ``times_used`` and
``last_used`` fields are updated.
"""
row = await get_statistics(pg, str(ctx.command))
if row is None:
# first time command is being used, insert it into the database
insert = 'INSERT INTO command_statistics VALUES ($1, 1, $2)'
await pg.execute(insert, str(ctx.command), datetime.datetime.utcnow())
logger.info('First command usage for %s', ctx.command)
else:
# command was used before, increment time_used and update last_used
update = ('UPDATE command_statistics SET times_used = times_used + 1, last_used = $2 '
'WHERE command_name = $1')
await pg.execute(update, str(ctx.command), datetime.datetime.utcnow())
def _leaderboard(self, ctx: commands.Context, league_id: str, matchday: str=None):
"""Gets league leaderboard"""
headers = [' ', 'ID', 'Team', 'Points', 'P', 'G', 'GA', 'GD']
data = await self._get_league_leaderboard(ctx.message.server.id, league_id, matchday)
pretty_data = []
# await self.bot.say('```diff\n+ ' + data['leagueCaption'] + '\n- Matchday: ' + str(data['matchday']) + '\n```')
await self.bot.say('```diff\n+ {}\n- Matchday: {}\n```'.format(data['leagueCaption'], data['matchday']))
if 'standing' in data:
for team in data['standing']:
pretty_data.append([team['rank'], team['teamId'], team['team'], team['points'], team['playedGames'], team['goals'], team['goalsAgainst'], team['goalDifference']])
await self.bot.say(box(tabulate(pretty_data, headers=headers)))
elif 'standings' in data:
for group, v in data['standings'].items():
asyncio.sleep(1)
await self.bot.say('```diff\n+ Group ' + group + '```')
pretty_data = []
for team in v:
pretty_data.append([team['rank'], team['team'], team['points'], team['playedGames'], team['goals'], team['goalsAgainst'], team['goalDifference']])
await self.bot.say(box(tabulate(pretty_data, headers=headers)))
def _nextfixtures(self, ctx: commands.Context, league_id: str):
"""Gets last matchday fixtures"""
headers = ['ID', 'Home', ' ', 'Away', 'Date']
data = await self._get_league_fixtures_timeframe(ctx.message.server.id, league_id, 'n7')
await self.bot.say('```diff\n+ Next fixtures```')
pretty_data = []
for fixture in data['fixtures']:
pretty_data.append([
fixture['id'],
'[{}] {}'.format(fixture['homeTeamId'], fixture['homeTeamName']),
' - ',
'[{}] {}'.format(fixture['awayTeamId'], fixture['awayTeamName']),
fixture['date']
])
await self.bot.say(box(tabulate(pretty_data, headers=headers)))
def _matchdayfixtures(self, ctx: commands.Context, league_id: str, matchday: str='1'):
"""Gets specific matchday fixtures
Defaults to matchday 1"""
headers = ['ID', 'Home', ' ', ' ', 'Away']
data = await self._get_league_fixtures_matchday(ctx.message.server.id, league_id, matchday)
await self.bot.say('```diff\n+ Matchday ' + matchday + ' fixtures```')
pretty_data = []
for fixture in data['fixtures']:
pretty_data.append([
fixture['id'],
'[{}] {}'.format(fixture['homeTeamId'], fixture['homeTeamName']),
fixture['result']['goalsHomeTeam'],
fixture['result']['goalsAwayTeam'],
'[{}] {}'.format(fixture['awayTeamId'], fixture['awayTeamName'])
])
await self.bot.say(box(tabulate(pretty_data, headers=headers)))
def _parse_roles(self, ctx: Context, roles: str, is_primary: int = 0) -> List[Tuple]:
roles = roles.rstrip(", \t\n\r")
roles_arr = roles.split(",")
alias = None
rows = []
for r in roles_arr:
if "=" in r:
role, alias = r.split("=")
role = role.strip(" \t\n\r\"'")
alias = alias.strip(" \t\n\r\"'")
else:
role = r.strip(" \t\n\r\"'")
try:
role_conv = RoleConverter(ctx, role).convert()
except BadArgument as e:
# Unable to convert this role
msg = e.args[0]
print(msg)
await self.bot.say("Couldn't find role `{}` on this server".format(role))
continue
rows.append((role_conv, alias, is_primary))
return rows
def listwords(self, ctx: commands.Context, channel: discord.Channel=None):
"""List all words for the specified channel
Optional parameters:
- channel: the channel to show words for (defaults to the current channel)"""
author = ctx.message.author
if not channel:
channel = ctx.message.channel
if author.id not in self.settings or\
"words" not in self.settings[author.id] or\
channel.id not in self.settings[author.id]["words"] or\
not self.settings[author.id]["words"][channel.id]:
await self.bot.say("You haven't set any words to be tracked!")
else:
head = "Tracked words for {}#{} in #{}".format(author.name, author.discriminator, channel.name)
msg = ""
for word in self.settings[author.id]["words"][channel.id]:
msg += "{}\n".format(word)
await self.bot.say(box(msg, lang=head))
def get(self, ctx: commands.Context, subreddit, num_posts=5, category='hot'):
"""Base command for returning data from a subreddit.
Keyword arguments:
posts -- Number of posts to return (default 5)
category -- Category to look at [hot, new, rising, controversial, top] (default hot)
"""
if num_posts > 25:
await self.bot.say('Number of posts must be no greater than 25.')
return
if subreddit.strip():
if category in self.categories:
result = await self.get_subreddit_top(
session=self.session, subreddit=subreddit, num_posts=num_posts, category=category)
await self.bot.say('\n\n'.join(result))
else:
await self.bot.say('Valid categories are {}: '.format(', '.join(self.categories)))
else:
await self.bot.pm_help(ctx)
def removesheet(self, ctx: commands.Context, name: str):
"""Remove a sheet which has been added.
Arguments:
- <name> The name of the sheet to remove"""
scopes = (ctx.message.channel.id,
ctx.message.server.id,
GLOBAL)
for scope in scopes:
try:
self.sheets[scope].pop(name)
except:
pass
else:
dataIO.save_json(SHEETS_PATH, self.sheets)
await self.bot.say("The sheet has been removed.")
return
await self.bot.say("Couldn't find a sheet with that name in your scope.")
def logerrors(self, ctx: commands.Context):
"""Toggle error logging in this channel."""
channel = ctx.message.channel
task = ENABLE
if channel.id in self.log_channels:
task = DISABLE
await self.bot.say("This will {} error logging in this channel. Are you sure about this? Type `yes` to agree".format(task))
message = await self.bot.wait_for_message(author=ctx.message.author)
if message is not None and message.content == 'yes':
if task == ENABLE:
self.log_channels.append(channel.id)
elif task == DISABLE:
self.log_channels.remove(channel.id)
dataIO.save_json(SETTINGS_PATH, self.log_channels)
await self.bot.say("Error logging {}d.".format(task))
else:
await self.bot.say("The operation was cancelled.")
def regedit(self, ctx: commands.Context):
"""Manages valid register roles."""
if ctx.invoked_subcommand is None:
# Send server's current settings
server = ctx.message.server
await self.bot.send_cmd_help(ctx)
if server.id not in self.settings:
msg = box("Register is not enabled in this server. "
"Use [p]regedit addrole to enable it.")
else:
valid_roles = [r.name for r in server.roles if r.id in self.settings[server.id]["roles"]]
delete_after = self.settings[server.id]["delete_after"]
quiet_status = None
if delete_after is None:
quiet_status = "Quiet mode is disabled."
else:
quiet_status = "Register commands are cleaned up after {} seconds".format(delete_after)
msg = box("{}\n"
"Valid register roles:\n"
"{}"
"".format(quiet_status, ", ".join(sorted(valid_roles)) if valid_roles else None)
)
await self.bot.say(msg)
def trigger_set_text(self, ctx: commands.Context, *,
text: str):
"""Trigger if a message contains a word or phrase.
This text is not case sensitive and strips the message of leading
or trailing whitespace.
"""
text = text.strip().lower()
emojis = await self._get_trigger_emojis(ctx)
if emojis:
self.triggers["text_triggers"][text] = emojis
_save(self.triggers)
emojis_str = " ".join(str(self._lookup_emoji(emoji)) for emoji in emojis)
await self.bot.say("Done - I will now react to messages containing `{text}` with"
" {emojis}.".format(text=text,
emojis=emojis_str))
elif text in self.triggers['text_triggers']:
del self.triggers['text_triggers'][text]
_save(self.triggers)
await self.bot.say("Done - I will no longer react to messages containing `{text}`."
"".format(text=text))
else:
await self.bot.say("Done - no triggers were changed.")
def trigger_set_user(self, ctx: commands.Context,
user: discord.User):
"""Trigger if a message is from some user."""
emojis = await self._get_trigger_emojis(ctx)
if emojis:
self.triggers["user_triggers"][user.id] = emojis
_save(self.triggers)
emojis_str = " ".join(str(self._lookup_emoji(emoji)) for emoji in emojis)
await self.bot.say("Done - I will now react to messages from `{user}` with"
" {emojis}.".format(user=str(user),
emojis=emojis_str))
elif user.id in self.triggers['user_triggers']:
del self.triggers['user_triggers'][user.id]
_save(self.triggers)
await self.bot.say("Done - I will no longer react to messages from `{user}`."
"".format(user=str(user)))
else:
await self.bot.say("Done - no triggers were changed.")