def _delete_msg(self, args):
"""
Deletes the given message.
Syntax: delete_msg (id)
"""
try:
chanstr = args[0]
except IndexError:
raise ConsoleCommandSyntaxError("No arguments provided.")
try:
id = int(args[1])
if id < 1:
raise ValueError
except ValueError:
raise ConsoleCommandSyntaxError("Invalid amount.")
except IndexError:
raise ConsoleCommandSyntaxError("Missing amount argument.")
try:
guild, chan = re.match(r"(\d+).(\d+)", chanstr).group(1, 2)
except AttributeError:
raise ConsoleCommandSyntaxError("Invalid indices string.")
try:
guild = self.client.guilds[int(guild)]
except IndexError:
raise ConsoleCommandSyntaxError("Invalid guild index.")
try:
chan = guild.text_channels[int(chan)]
except IndexError:
raise ConsoleCommandSyntaxError("Invalid channel index.")
try:
msg = chan.get_message(id)
except NotFound:
raise ConsoleCommandSyntaxError(f"Message with ID {id} not found.")
try:
await msg.delete()
except Forbidden:
raise ConsoleCommandSyntaxError("Cannot delete message; no permissions.")
python类NotFound()的实例源码
def command_note_delete(self, data, data_string, message):
if not self.has_permission(message.author):
return log.debug("Permission denied") # No perms
if len(data) < 1:
return await self.send_message(
message.channel, "{} Usage: `note-delete \"<note number>\"`".format(message.author.mention)
)
index = data[0]
note = self.data_manager.get_note(message.server, index)
if not note:
return await self.send_message(
message.channel, "{} Unknown note ID: {}".format(message.author.mention, index)
)
self.data_manager.delete_note(message.server, index)
self.data_manager.save_server(message.server)
channel = self.data_manager.get_notes_channel(message.server)
if channel:
try:
note_message = await self.get_message(self.get_channel(channel), note["message_id"])
except discord.NotFound:
note_message = None
if note_message:
await self.delete_message(note_message)
await self.send_message(
message.channel,
"{} Note deleted: `{}`".format(
message.author.mention, index
)
)
# Aliases
def base_purge(self, ctx: DogbotContext, limit: int, check=None, **kwargs):
# check if it's too much
if limit > 5000:
await ctx.send('Too many messages to purge. 5,000 is the maximum.')
return
# purge the actual command message too
limit += 1
try:
msgs = await ctx.channel.purge(limit=limit, check=check, **kwargs)
await ctx.send(f'Purge complete. Removed {len(msgs)} message(s).', delete_after=2.5)
except discord.NotFound:
pass # ignore not found errors
def clean(self, ctx: DogbotContext):
"""Removes recent messages posted by the bot."""
try:
await ctx.channel.purge(limit=50, check=lambda m: m.author == ctx.bot.user)
except discord.NotFound:
pass
def rps_error(self, _ctx, error):
if not isinstance(error, discord.NotFound):
log.debug('RPS message deleted, ignoring!')
error.should_suppress = True
def assign_roles(self, autorole_type: str, member: Member):
async with self.bot.pgpool.acquire() as pg:
# fetch autoroles for that user
record = await pg.fetchrow('SELECT * FROM autoroles WHERE guild_id = $1 AND type = $2', member.guild.id,
autorole_type)
if not record:
return []
# get the role ids we need
role_ids = record['roles']
# collect roles to add
roles_to_add = [discord.utils.get(member.guild.roles, id=role_id) for role_id in role_ids]
# filter out dead roles
roles_to_add = list(filter(lambda r: r is not None, roles_to_add))
if not roles_to_add:
# no roles to add
return
log.debug('Adding {} roles to {}.'.format(len(roles_to_add), member))
try:
# add roles
await member.add_roles(*roles_to_add)
except discord.Forbidden:
log.warning('Failed to autorole %s. Forbidden!', member)
except discord.NotFound:
log.warning('Failed to autorole %s, not found?', member)
else:
return roles_to_add
def __auto_join_channels(self):
for url in self.settings['auto join']['invite urls']:
log('Auto-joining {}'.format(url))
try:
await self.client.accept_invite(url)
except discord.NotFound:
log('Invite has expired')
except discord.HTTPException as e:
log('Got an HTTP exception: {}'.format(e))
except discord.Forbidden:
log('Forbidden')
return tuple()
def _loop(self):
start = time.perf_counter()
while True:
colour = header = None
try:
message = await self.ctx.bot.wait_for('message', timeout=120, check=self.check_message)
except asyncio.TimeoutError:
await self.edit_board(0, header='Took too long...')
raise
parsed = self.parse_message(message.content)
if parsed is None: # garbage input, ignore.
continue
x, y, thing = parsed
with contextlib.suppress(discord.NotFound):
await message.delete()
try:
if thing.value:
getattr(self.board, thing.value)(x, y)
else:
self.board.show(x, y)
except HitMine:
self.board.explode(x, y)
await self.edit_board(0xFFFF00, header='BOOM!')
await asyncio.sleep(random.uniform(0.5, 1))
self.board.reveal_mines()
colour, header = 0xFF0000, 'Game Over!'
raise
except Exception as e:
await self.ctx.send(f'An error happened.\n```\y\n{type(e).__name__}: {e}```')
raise
else:
if self.board.is_solved():
colour, header = 0x00FF00, "A winner is you!"
self.board.reveal_mines(success=True)
return time.perf_counter() - start
finally:
await self.edit_board(colour, header=header)
def _loop(self):
self.edit_screen()
self._message = await self.ctx.send(embed=self._screen)
await self.add_buttons()
while True:
try:
message = await self.ctx.bot.wait_for('message', timeout=120, check=self.check_message)
except asyncio.TimeoutError:
if self._state == State.default:
raise
continue
try:
x, y, number = self.parse_message(message.content)
except ValueError:
continue
try:
self.board[x, y] = number
except (IndexError, ValueError):
continue
with suppress(discord.NotFound):
await message.delete()
self.edit_screen()
await self._message.edit(embed=self._screen)
def reason(self, ctx, num: CaseNumber, *, reason):
"""Sets the reason for a particular case.
You must own this case in order to edit the reason.
Negative numbers are allowed. They count starting from
the most recent case. e.g. -1 will show the newest case,
and -10 will show the 10th newest case.
"""
case = await self._get_case(ctx.session, ctx.guild.id, num)
if case is None:
return await ctx.send(f"Case #{num} doesn't exist.")
if case.mod_id != ctx.author.id:
return await ctx.send("This case is not yours.")
channel = ctx.guild.get_channel(case.channel_id)
if not channel:
return await ctx.send('This channel no longer exists... :frowning:')
message = await _get_message(channel, case.message_id)
if not message:
return await ctx.send('Somehow this message was deleted...')
embed = message.embeds[0]
reason_field = embed.fields[-1]
embed.set_field_at(-1, name=reason_field.name, value=reason, inline=False)
try:
await message.edit(embed=embed)
except discord.NotFound:
# In case the message was cached, and the message was deleted
# While it was still in the cache.
return await ctx.send('Somehow this message was deleted...')
case.reason = reason
await ctx.session.add(case)
await ctx.send('\N{OK HAND SIGN}')
def safe_delete_message(self, message, *, quiet=False):
try:
return await self.delete_message(message)
except discord.Forbidden:
if not quiet:
self.safe_print("Warning: Cannot delete message \"%s\", no permission" % message.clean_content)
except discord.NotFound:
if not quiet:
self.safe_print("Warning: Cannot delete message \"%s\", message not found" % message.clean_content)
def safe_edit_message(self, message, new, *, send_if_fail=False, quiet=False):
try:
return await self.edit_message(message, new)
except discord.NotFound:
if not quiet:
self.safe_print("Warning: Cannot edit message \"%s\", message not found" % message.clean_content)
if send_if_fail:
if not quiet:
print("Sending instead")
return await self.safe_send_message(message.channel, new)
def on_message(self, message):
"""Message listener"""
if not message.channel.is_private:
if message.server.id in self.settings and self.settings[message.server.id]:
this_trigger = self.settings[message.server.id]
if this_trigger in message.content and "pintrigger" not in message.content:
try:
await self.bot.pin_message(message)
except discord.Forbidden:
print("No permissions to do that!")
except discord.NotFound:
print("That channel or message doesn't exist!")
except discord.HTTPException:
print("Something went wrong. Maybe check the number of pinned messages?")
def show_page(self, page, *, first=False):
self.current_page = page
entries = self.get_page(page)
p = []
for t in enumerate(entries, 1 + ((page - 1) * self.per_page)):
p.append('%s. %s' % t)
self.embed.set_footer(text='Page %s/%s (%s entries)' % (page, self.maximum_pages, len(self.entries)))
if not self.paginating:
self.embed.description = '\n'.join(p)
return await self.bot.send_message(self.message.channel, embed=self.embed)
if not first:
self.embed.description = '\n'.join(p)
await self.bot.edit_message(self.message, embed=self.embed)
return
# verify we can actually use the pagination session
if not self.permissions.add_reactions:
raise CannotPaginate('Bot does not have add reactions permission.')
if not self.permissions.read_message_history:
raise CannotPaginate('Bot does not have Read Message History permission.')
p.append('')
p.append('Confused? React with \N{INFORMATION SOURCE} for more info.')
self.embed.description = '\n'.join(p)
self.message = await self.bot.send_message(self.message.channel, embed=self.embed)
for (reaction, _) in self.reaction_emojis:
if self.maximum_pages == 2 and reaction in ('\u23ed', '\u23ee'):
# no |<< or >>| buttons if we only have two pages
# we can't forbid it if someone ends up using it but remove
# it from the default set
continue
try:
await self.bot.add_reaction(self.message, reaction)
except discord.NotFound:
# If the message isn't found, we don't care about clearing anything
return
def xban(self, ctx, user_id: int):
"""
Allows the banning of a user not int he guild via ID
"""
try:
await ctx.bot.http.ban(user_id, ctx.message.guild.id, 0)
except discord.Forbidden:
await ctx.channel.send(":x: 403 FORBIDDEN")
except discord.NotFound:
await ctx.channel.send(":x: User not found.")
else:
await ctx.channel.send(f":negative_squared_cross_mark: Banned user {user_id} - <@{user_id}>.")
def guild(self, ctx, guild_id: int):
"""
Searchs for a guild via ID
"""
try:
with ctx.channel.typing():
guild = self.bot.get_guild(guild_id)
if guild is None:
raise discord.NotFound
embed = discord.Embed(colour=discord.Colour(0x30f9c7), description=f"ID: {guild.id}",
timestamp=datetime.datetime.now())
embed.set_thumbnail(
url=guild.icon_url)
embed.set_author(name=guild.name)
embed.set_footer(text="Who Is: Guild")
embed.add_field(name="Members", value=str(len(guild.members)))
embed.add_field(name="Roles", value=str(len(guild.roles) - 1))
embed.add_field(name="Channels", value=str(len(guild.channels)))
embed.add_field(name="AFK Channel", value=guild.afk_channel)
embed.add_field(name="AFK Timeout", value=str(guild.afk_timeout / 60))
embed.add_field(name="Owner", value=guild.owner)
embed.add_field(name="Creation Date", value=guild.created_at)
embed.add_field(name="Region", value=guild.region)
embed.add_field(name="Verification Level", value=guild.verification_level)
await ctx.send(embed=embed)
except discord.NotFound:
await ctx.send("`No guild found under this ID`")
except discord.HTTPException:
await ctx.send("`Error collecting guild information`")
return
def edit_invite_blocker(ev, before, after):
if after.guild:
if isinstance(after.author, discord.Member):
if not after.author.permissions_in(after.channel).manage_guild:
active = ev.db.get_guild_settings(after.guild.id, 'BlockInvites')
if active is None:
active = False
if active:
arguments = after.content.split(' ')
invite_found = False
for arg in arguments:
triggers = ['.gg', '.com', 'http']
for trigger in triggers:
if trigger in arg:
try:
invite_found = await ev.bot.get_invite(arg)
break
except discord.NotFound:
pass
if invite_found:
title = '? Invite links are not allowed on this server.'
response = discord.Embed(color=0xF9F9F9, title=title)
await after.delete()
try:
await after.author.send(embed=response)
except discord.Forbidden:
pass
log_embed = discord.Embed(color=0xF9F9F9)
author = f'{after.author.name}#{after.author.discriminator}'
log_embed.set_author(name=f'I removed {author}\'s invite link.',
icon_url=user_avatar(after.author))
log_embed.set_footer(
text=f'Posted In: #{after.channel.name} | Leads To: {invite_found.guild.name}')
await log_event(ev.db, after.guild, log_embed)
def send_invite_blocker(ev, message):
if message.guild:
if isinstance(message.author, discord.Member):
if not message.author.permissions_in(message.channel).manage_guild:
active = ev.db.get_guild_settings(message.guild.id, 'BlockInvites')
if active is None:
active = False
if active:
arguments = message.content.split(' ')
invite_found = False
for arg in arguments:
triggers = ['.gg', '.com', 'http']
for trigger in triggers:
if trigger in arg:
try:
invite_found = await ev.bot.get_invite(arg)
break
except discord.NotFound:
pass
if invite_found:
title = '? Invite links are not allowed on this server.'
response = discord.Embed(color=0xF9F9F9, title=title)
await message.delete()
try:
await message.author.send(embed=response)
except discord.Forbidden:
pass
log_embed = discord.Embed(color=0xF9F9F9)
author = f'{message.author.name}#{message.author.discriminator}'
log_embed.set_author(name=f'I removed {author}\'s invite link.',
icon_url=user_avatar(message.author))
log_embed.set_footer(
text=f'Posted In: #{message.channel.name} | Leads To: {invite_found.guild.name}')
await log_event(ev.db, message.guild, log_embed)
def mute_checker(ev, message):
if message.guild:
mute_list = ev.db.get_guild_settings(message.guild.id, 'MutedUsers')
if mute_list is None:
mute_list = []
if message.author.id in mute_list:
try:
await message.delete()
except discord.Forbidden:
pass
except discord.NotFound:
pass
def safe_send_file(self, dest, fp, *, filename=None, comment=None, tts=False, expire_in=0, also_delete=None,
quiet=False):
final_dest = None
for servers in self.servers:
this = discord.utils.get(servers.channels, id=str(dest.id))
if this:
final_dest = this
if not final_dest:
return
try:
msg = None
msg = await self.send_file(final_dest, fp, filename=filename, content=comment, tts=tts)
self.action_dict['messages_sent'] += 1
if msg and expire_in:
asyncio.ensure_future(self._wait_delete_msg(msg, expire_in))
if also_delete and isinstance(also_delete, discord.Message):
asyncio.ensure_future(self._wait_delete_msg(also_delete, expire_in))
except discord.Forbidden:
if not quiet:
print(
"Warning: Cannot send message to %s:%s, no permission" % (
final_dest.name, final_dest.server.name))
except discord.NotFound:
if not quiet:
print("Warning: Cannot send message to %s:%s, invalid channel?" % (
final_dest.name, final_dest.server.name))
except discord.HTTPException:
if not quiet:
print("Warning: I'm being rate limited")
finally:
if msg: return msg