def on_command_error(self, error, ctx):
ignored = (commands.NoPrivateMessage, commands.DisabledCommand, commands.CheckFailure,
commands.CommandNotFound, commands.UserInputError, discord.HTTPException)
error = getattr(error, 'original', error)
if isinstance(error, ignored):
return
if ctx.message.server:
fmt = 'Channel: {0} (ID: {0.id})\nGuild: {1} (ID: {1.id})'
else:
fmt = 'Channel: {0} (ID: {0.id})'
exc = traceback.format_exception(type(error), error, error.__traceback__, chain=False)
description = '```py\n%s\n```' % ''.join(exc)
time = datetime.datetime.utcnow()
name = ctx.command.qualified_name
author = '{0} (ID: {0.id})'.format(ctx.message.author)
location = fmt.format(ctx.message.channel, ctx.message.server)
message = '{0} at {1}: Called by: {2} in {3}. More info: {4}'.format(name, time, author, location, description)
self.bot.logs['discord'].critical(message)
python类UserInputError()的实例源码
def on_command_error(ctx, error):
if isinstance(error, commands.CommandNotFound):
return
# if isinstance(error, commands.CommandInvokeError):
# return print(error)
errors = {
commands.DisabledCommand: 'Command has been disabled.',
commands.MissingPermissions: 'Invoker is missing permissions to run this command.',
commands.BotMissingPermissions: 'Bot is missing permissions to run this command.',
commands.CheckFailure: 'You are not allowed to run this command.'
}
for type, text in errors.items():
if isinstance(error, type):
return await ctx.send(errors[type])
# argument error
if isinstance(error, commands.UserInputError):
bot.formatter.context = ctx
bot.formatter.command = ctx.command
return await ctx.send(f'Invalid argument(s) provided.\n```{bot.formatter.get_command_signature()}```')
await ctx.send(f'An error occured in `{ctx.command.name}` invoked by {ctx.message.author}:\n```{error}```')
#traceback.print_exception(type(exception), exception, exception.__traceback__, file=sys.stderr)
# blacklist check
# check is user is blacklister, or if it's a bot
def mine(self, ctx, *, user: discord.Member):
"""Set a tripmine for someone. Tripmines go off at random.
* user - The person for which the mine will go off.
"""
if user.id == ctx.bot.user.id:
await ctx.send("Nope. :3")
elif (ctx.channel.id in self.tripmines.keys() and
self.tripmines[ctx.channel.id].has_member(user.id)):
raise commands.UserInputError(f"A tripmine is already set for {user.display_name}.")
else:
self.tripmines.setdefault(ctx.channel.id, TripmineChannelArray())
self.tripmines[ctx.channel.id].add_member(user.id)
message = await ctx.send(f"Tripmine set for {user.display_name}! :3")
await asyncio.sleep(3)
await message.delete()
def unmine(self, ctx, *, user: discord.Member):
"""Remove a tripmine from yourself, or from someone else.
* user - The person for which the mine will go off.
"""
if not user:
user = ctx.author
if user.id == ctx.bot.user.id:
await ctx.send("Nope. :3")
elif (ctx.channel.id in self.tripmines.keys() and
self.tripmines[ctx.channel.id].has_member(user.id)):
self.tripmines[ctx.channel.id].remove_member(user.id)
await ctx.send(f"Removed tripmine for {user.display_name}! :3")
else:
raise commands.UserInputError(f"No tripmine is currently set for {user.display_name}.")
def ignore(self, ctx, *channels_or_members: Plonkable):
"""Ignores text channels or members from using the bot.
If no channel or member is specified, the current channel is ignored.
"""
channels_or_members = channels_or_members or [ctx.channel]
if len(channels_or_members) == 1:
thing = one(channels_or_members)
try:
await ctx.session.add(Plonks(guild_id=ctx.guild.id, entity_id=thing.id))
except asyncpg.UniqueViolationError:
await ctx.send(f"I'm already ignoring {thing}...")
raise commands.UserInputError
else:
await self._bulk_ignore_entries(ctx, channels_or_members)
await self._display_plonked(ctx, channels_or_members, plonk=True)
def _peek(self, user_id, *, start=1, playlist_name=None):
if start <= 0:
raise dec.UserInputError('Start must be a positive number')
# offset is start -1
items, playlist_name, total = await self._db.show(user_id, start - 1, 20, playlist_name)
if not items:
if start == 1 or total == 0:
await self._bot.whisper('**Playlist** {} **is empty**'.format(playlist_name))
else:
await self._bot.whisper(
'**There are no songs in the playlist** {} **, starting from the** {} **song**'
.format(playlist_name, self._ordinal(start)))
return
reply = '**{} song(s) (out of {}) from playlist** {}**, starting from the **{}**:**\n **>** ' \
.format(len(items), total, playlist_name, self._ordinal(start)) + \
'\n **>** '.join(['[{}] {}'.format(*item) for item in items])
await self._bot.whisper(reply)
def __error(self, ctx, error):
if isinstance(error, (UnavailableTagCommand, UnableToUseBox)):
await ctx.send(error)
elif isinstance(error, commands.UserInputError):
if ctx.command.qualified_name == 'tag':
await ctx.acquire()
await ctx.show_help()
else:
await ctx.send(error)
# @cache.cache()
# async def get_tag_config(self, guild_id, *, connection=None):
# # tag config is stored as a special server-wide tag, 'config'
# # this 'config' value is serialised as JSON in the content
# query = """SELECT content FROM tags WHERE name = 'config' AND location_id = $1;"""
# con = connection if connection else self.bot.pool
# record = await con.fetchrow(query, guild_id)
# if record is None:
# return TagConfig({})
# return TagConfig(json.loads(record['content']))
def on_command_error(self, ctx, error):
ignored = (commands.NoPrivateMessage, commands.DisabledCommand, commands.CheckFailure,
commands.CommandNotFound, commands.UserInputError, discord.Forbidden)
error = getattr(error, 'original', error)
if isinstance(error, ignored):
return
e = discord.Embed(title='Command Error', colour=0xcc3366)
e.add_field(name='Name', value=ctx.command.qualified_name)
e.add_field(name='Author', value=f'{ctx.author} (ID: {ctx.author.id})')
fmt = f'Channel: {ctx.channel} (ID: {ctx.channel.id})'
if ctx.guild:
fmt = f'{fmt}\nGuild: {ctx.guild} (ID: {ctx.guild.id})'
e.add_field(name='Location', value=fmt, inline=False)
exc = ''.join(traceback.format_exception(type(error), error, error.__traceback__, chain=False))
e.description = f'```py\n{exc}\n```'
e.timestamp = datetime.datetime.utcnow()
ch = self.bot.get_channel(LOGGING_CHANNEL)
await ch.send(embed=e)
def input_number(ctx: commands.Context,
message: str="Please enter a number within 10 seconds.",
*, timeout: int=10, min_value: int=None, max_value: int=None):
"""Input number helper. Ask a confirmation message with a timeout of 10 seconds.
ctx - The context in which the question is being asked.
message - Optional messsage that the question should ask.
timeout - Timeout, in seconds, before automatically failing.
min_value - Minimum accepted value for the input.
max_value - Maximum accepted value for the input.
"""
await ctx.send(message)
def check(message):
"""A checking function dynamically to verify input."""
if message.author != ctx.message.author or not message.clean_content.isdecimal():
return False
number = int(message.clean_content)
if (min_value and number < min_value) or (max_value and number > max_value):
return False
return True
try:
message = await ctx.bot.wait_for("message", timeout=timeout, check=check)
except asyncio.TimeoutError:
raise commands.UserInputError("Timed out waiting.")
return int(message.clean_content)
def input_number(ctx: commands.Context,
message: str="Please enter a number within 10 seconds.",
*, timeout: int=10, min_value: int=None, max_value: int=None):
"""Number input helper, with timeout.
* ctx - The context in which the question is being asked.
* message - Optional messsage that the question should ask.
* timeout - Timeout, in seconds, before automatically failing. Defaults to 10.
* min_value - Minimum accepted value for the input. Defaults to None.
* max_value - Maximum accepted value for the input. Defaults to None.
"""
await ctx.send(message)
def check(message):
"""The check function used in bot.wait_for()."""
if message.author != ctx.message.author or not message.clean_content.isdecimal():
return False
number = int(message.clean_content)
if (min_value and number < min_value) or (max_value and number > max_value):
return False
return True
try:
message = await ctx.bot.wait_for("message", timeout=timeout, check=check)
except asyncio.TimeoutError:
raise commands.UserInputError("Timed out waiting.")
return int(message.clean_content)
def censor(self, ctx, times: int=1):
"""Delete the bot's previous message(s). Bot owner only.
* times - Number of message to delete. Defaults to 1.
"""
if times < 1:
return commands.UserInputError("Can't delete less than 1 message.")
times_executed = 0
async for message in ctx.channel.history():
if times_executed == times:
break
if message.author.id == ctx.bot.user.id:
await message.delete()
times_executed += 1
def update(self, ctx, args='default'):
"""Updates bot."""
try:
branch = self.config.git_branch if args == 'default' else args
# Retrieving latest code from upstream.
process1 = subprocess.check_output("git fetch".format(branch), stderr=subprocess.STDOUT,
shell=True)
process = subprocess.check_output("git checkout origin/{}".format(branch), stderr=subprocess.STDOUT,
shell=True)
await ctx.author.send("```Git pull from '{}' success```".format(branch))
except Exception as e:
await ctx.author.send("```py\nError while git pulling\n```")
raise commands.UserInputError(ctx)
def branches(self, ctx, args='default'):
"""Lists branches."""
try:
from git import Repo
repo = Repo(os.path.join(os.getcwd(), ".git"))
remotes = repo.remotes[0].refs
for item in remotes:
await ctx.author.send(item.remote_head)
except Exception as e:
await ctx.author.send("```py\nError listing git branches\n```")
raise commands.UserInputError(ctx)
def say(self, ctx, channel_name: str, *msg):
"""Says something as Nya."""
channel = None
for chan in ctx.guild.channels:
if chan.name == channel_name:
channel = chan
if channel is not None:
await channel.send(" ".join(str(x) for x in msg))
else:
await ctx.author.send("```py\n'{}' channel has not been found\n```".format(channel_name))
raise commands.UserInputError(ctx, 'Channel not found')
def song(self, subcommand: str, *arguments: str):
raise dec.UserInputError('Command *song* has no subcommand named {}. Please use `{}help song` to list all '
'the available subcommands.'
.format(subcommand, self._bot.config['ddmbot']['delimiter']))
def failed(self):
raise dec.UserInputError('You need to provide a subcommand to the *song failed* command')
def failed_clear(self, song_id: int = None):
raise dec.UserInputError('You need to provide a subcommand to the *song failed* command')
def bot(self, subcommand: str, *arguments: str):
raise dec.UserInputError('Command *bot* has no subcommand named {}. Please use `{}help bot` to list all the '
'available subcommands.'
.format(subcommand, self._bot.config['ddmbot']['delimiter']))
def join(self, ctx):
if self._bot.player.streaming or self._bot.player.stopped:
raise dec.UserInputError('Player is not in the DJ mode')
await self._bot.users.join_queue(int(ctx.message.author.id))
def user(self, subcommand: str, *arguments: str):
raise dec.UserInputError('Command *user* has no subcommand named {}. Please use `{}help user` to list all '
'the available subcommands.'
.format(subcommand, self._bot.config['ddmbot']['delimiter']))
def ignore(self, user: discord.User):
if self._bot.is_operator(user):
raise dec.UserInputError('User {} is an operator and cannot be ignored'.format(user))
await self._db.ignore(int(user.id))
await self._bot.message('User {} has been added to the ignore list'.format(user))
def move(self, user: discord.User, position: int):
if self._bot.player.streaming or self._bot.player.stopped:
raise dec.UserInputError('Player is not in the DJ mode')
inserted, position = await self._bot.users.move_listener(int(user.id), position)
if inserted:
await self._bot.message('User {} was added to the DJ queue to the {} position'
.format(user, self._ordinal(position)))
else:
await self._bot.message('User {} was moved to the {} position in the DJ queue'
.format(user, self._ordinal(position)))
def playlist(self, ctx, subcommand: str, *arguments: str):
# TODO: CHECK FOR THE PLAYLIST NAME
if not arguments:
raise dec.UserInputError('Command *playlist* has no subcommand named {}. Please use `{}help playlist` to '
'list all the available subcommands.'.format(subcommand, ctx.prefix))
# will try to treat subcommand as a playlist name from now on
playlist_name = subcommand
if not await self._db.exists(int(ctx.message.author.id), playlist_name):
raise dec.UserInputError('Command *playlist* has no subcommand named {0}, nor is it a name of your '
'playlist. Please use `{1}help playlist` to list all the available subcommands '
'or `{1}playlist list` to list all your playlists.'
.format(playlist_name, ctx.prefix))
# now we try to execute a subcommand depending on the input
# arguments[0] == subcommand name without postfix
# playlist_name + arguments[1:] == subcommand arguments
# two-step approach -- tackling aliases
subcommand = ctx.command.get_command(arguments[0])
if subcommand is None:
raise dec.UserInputError('Command *playlist* has no subcommand named {}. Please use `{}help playlist` to '
'list all the available subcommands.'.format(arguments[0], ctx.prefix))
subcommand = subcommand.name
# now try to call explicit version
subcommand = ctx.command.get_command('{}_explicit'.format(subcommand))
if subcommand is None:
raise dec.UserInputError('The subcommand {} does not support optional playlist specification. Please use '
'`{}help playlist` to list all the available subcommands and their arguments.'
.format(arguments[0], self._bot.config['ddmbot']['delimiter']))
# replace the string view with the swapped one and invoke the correct subcommand
swapped_command = '{}{} {} {}'.format(ctx.prefix, ctx.invoked_with, subcommand, playlist_name)
if arguments[1:]:
swapped_command += ' ' + ' '.join('"{}"'.format(arg) for arg in arguments[1:])
ctx.view = decw.StringView(swapped_command)
ctx.view.index = len(ctx.prefix) + len(ctx.invoked_with) + len(str(subcommand)) + 1
ctx.view.previous = ctx.view.index
ctx.invoked_with = arguments[0]
# now invoke
return await subcommand.invoke(ctx)
def color(self, ctx, *, color: str=None):
"""Display a color. Accepts CSS color names and hex input.
* color - Either a CSS color or hex input.
"""
try:
color = webcolors.name_to_hex(color)
except (ValueError, AttributeError):
pass
try:
if color:
color = color.lstrip("#") # Remove the pound sign.
color = int(f"0x{color}", 16)
else:
color = systemrandom.randint(0, 16777215)
color = discord.Color(color)
except ValueError:
raise commands.UserInputError(("Not a valid color. "
"Color must either be A) in hex format (e.g. `808080`)"
" and between `FFFFFF` and `000000`, or B) A named CSS"
" color (e.g. `red` or `purple`."))
color_hex_value = "%0.2X%0.2X%0.2X" % (color.r, color.g, color.b)
embed = discord.Embed()
embed.colour = color
image_url = BASE_URL_COLOR_API.format(color_hex_value)
embed.set_thumbnail(url=image_url)
color_as_rgb = color.to_rgb()
color_as_rgba = color_as_rgb + (1.0,)
embed.add_field(name="RGB", value=f"rgb{color_as_rgb}")
embed.add_field(name="RGBA", value=f"rgba{color_as_rgba}")
embed.add_field(name="HSV*", value=f"{rgb_to_hsv(*color_as_rgb)}")
embed.add_field(name="HLS*", value=f"{rgb_to_hls(*color_as_rgb)}")
embed.add_field(name="Hex code", value=f"#{color_hex_value}")
embed.add_field(name="Images",
value=BASE_URL_TINEYE_MULTICOLR.format(color_hex_value.lower()))
embed.add_field(name="Information",
value=BASE_URL_COLOR_HEX.format(color_hex_value.lower()))
embed.add_field(name="Notes",
value="* These values may be slightly wrong due to floating point errors.",
inline=False)
embed.set_footer(text="Thumbnail provided by AlexFlipnote's API")
await ctx.send(embed=embed)
def on_command_error(ctx, error):
"""The event triggered when an error is raised while invoking a command.
ctx : Context
error : Exception"""
if hasattr(ctx.command, 'on_error'):
return
cog = ctx.cog
if cog:
attr = f'_{cog.__class__.__name__}__error'
if hasattr(cog, attr):
return
error = getattr(error, 'original', error)
ignored = (commands.CommandNotFound, commands.UserInputError)
if isinstance(error, ignored):
return
handler = {
discord.Forbidden: '**I do not have the required permissions to run this command.**',
commands.DisabledCommand: f'{ctx.command} has been disabled.',
commands.NoPrivateMessage: f'{ctx.command} can not be used in Private Messages.',
commands.CheckFailure: '**You aren\'t allowed to use this command!**',
ExplicitCheckFailure: f'This command can only be used in a **NSFW** channel.',
InvalidChannelCheck: f'{ctx.command} can only be used in a server',
BotPermissionsCheck: 'For **any** of the moderation commands, the bot must be given\n'
'Manage Messages, Manage Nicknames, Kick Members and Ban Members'
}
try:
message = handler[type(error)]
except KeyError:
pass
else:
return await ctx.send(message)
embed = discord.Embed(title=f'Command Exception', color=discord.Color.red())
embed.set_footer(text='Occured on')
embed.timestamp = datetime.datetime.utcnow()
exc = ''.join(traceback.format_exception(type(error), error, error.__traceback__, chain=False))
exc = exc.replace('`', '\u200b`')
embed.description = f'```py\n{exc}\n```'
embed.add_field(name='Command', value=ctx.command.qualified_name)
embed.add_field(name='Invoker', value=ctx.author)
embed.add_field(name='Location', value=f'Guild: {ctx.guild}\nChannel: {ctx.channel}')
embed.add_field(name='Message', value=ctx.message.content)
await ctx.bot.get_channel(368477860387880960).send(embed=embed)