def getServerStat(self, server, stat):
# Make sure our server exists in the list
self.checkServer(server)
# Check for our server
for x in self.serverDict["Servers"]:
if x["ID"] == server.id:
# Found the server, check for the stat
if stat in x:
return x[stat]
else:
return None
# Server was not located - return None
return None
# Set the provided stat
python类Server()的实例源码
def new_channel(self, channel, guild=None):
"""Creates a new Dwarf ?Guild? object and connects it to the database.
Parameters
----------
channel
Can be a Discord ?Channel? object or a channel ID.
guild : Optional
Can be a Discord ?Server? object or a guild ID.
Is not an optional parameter if ?channel? is not a Discord ?Channel? object.
"""
if isinstance(channel, discord.Channel):
return Channel(id=channel.id, guild=channel.server.id)
else:
if guild is None:
raise ValueError("Either a Channel object or both channel ID "
"and guild ID must be given as argument(s).")
return Channel(id=channel, guild=guild)
def new_role(self, role, guild=None):
"""Creates a new Dwarf ?Role? object and connects it to the database.
Parameters
----------
role
Can be a Discord ?Role? object or a role ID.
guild : Optional
Can be a Discord ?Server? object or a guild ID.
Is not an optional parameter if ?role? is not a Discord ?Role? object.
"""
if isinstance(role, discord.Role):
return Role(id=role.id)
else:
if guild is None:
raise ValueError("Either a Role object or both role ID "
"and guild ID must be given as argument(s)")
return Role(id=role)
def get_member(self, member=None, user=None, guild=None):
"""Retrieves a Dwarf ?Member? object from the database.
Either ?member? or both ?user? and ?guild? must be given as arguments.
Parameters
----------
member : Optional
Has to be a Discord ?Member? object.
user : Optional
Can be a Discord `User` object or a user ID.
guild : Optional
Can be a Discord ?Server? object or a guild ID.
"""
if isinstance(member, discord.Member):
user_id = member.id
guild_id = member.server.id
else:
if user is None or guild is None:
raise ValueError("Either a Member object or both user ID "
"and guild ID must be given as argument(s).")
if isinstance(user, discord.User):
user_id = user.id
else:
user_id = user
if isinstance(guild, discord.Server):
guild_id = guild.id
else:
guild_id = guild
return Member.objects.get(user=user_id, guild=guild_id)
def on_server_update(self, before, after):
entries = []
if before.owner != after.owner:
entries.append('Server owner changed from {0} (id {0.id}) to {1} '
'(id {1.id})'.format(before.owner, after.owner))
if before.region != after.region:
entries.append('Server region changed from %s to %s' %
(before.region, after.region))
if before.name != after.name:
entries.append('Server name changed from %s to %s' %
(before.name, after.name))
if before.icon_url != after.icon_url:
entries.append('Server icon changed from %s to %s' %
(before.icon_url, after.icon_url))
for e in entries:
await self.log(before, e)
def scalefactor(message: discord.Message, factor: float=default_scale_factor):
""" Set the image scaling factor for your server. If no factor is given, the default is set. /
**This command requires the `Manage Server` permission.**"""
assert not factor == 0, "If you wish to disable images, remove the `Attach Files` permission from this bot."
assert factor <= max_scale_factor, "The factor **{}** is too high **(max={})**.".format(factor, max_scale_factor)
assert min_scale_factor <= factor, "The factor **{}** is too low **(min={})**.".format(factor, min_scale_factor)
if message.server.id not in pokedex_config.data:
pokedex_config.data[message.server.id] = {}
# Handle specific scenarios
if factor == default_scale_factor:
if "scale-factor" in pokedex_config.data[message.server.id]:
del pokedex_config.data[message.server.id]["scale-factor"]
reply = "Pokédex image scale factor reset to default: **{factor}**."
else:
reply = "Pokédex image scale factor is **{factor}** (default)."
else:
pokedex_config.data[message.server.id]["scale-factor"] = factor
reply = "Pokédex image scale factor set to **{factor}**."
pokedex_config.save()
await client.say(message, reply.format(factor=factor))
def format_usage(cmd: Command, server: discord.Server):
""" Format the usage string of the given command. Places any usage
of a sub command on a newline.
:param cmd: Type Command.
:param server: The server to generate the usage in.
:return: str: formatted usage.
"""
if cmd.hidden and cmd.parent is not None:
return
command_prefix = config.server_command_prefix(server)
usage = [cmd.usage(server)]
for sub_command in cmd.sub_commands:
# Recursively format the usage of the next sub commands
formatted = format_usage(sub_command, server)
if formatted:
usage.append(formatted)
return "\n".join(s for s in usage if s is not None).format(pre=command_prefix) if usage else None
def insert(self, name: str, command: str, server: Server):
""" Inserts a new command into the table.
"""
if not name or not command: # pragma: no cover
# TODO: raise some exception
return False
if self.sql_type is SQLType.sqlite:
return await self._insert_lite(name, command, server)
else: # pragma: no cover
self.cursor.execute(
self.query("INSERT INTO commands VALUES (%(name)s, %(command)s, %(server)s) ON CONFLICT DO NOTHING"),
{"name": name, "command": command, "server": server.id})
self.connection.commit()
if self.cursor.rowcount > 0:
return True
return False
def _get_all(self, server: Server, is_primary: int = None) -> List:
""" Internal helper method to fetch role and alias
"""
primary_in = "(0,1)"
if is_primary == 1:
primary_in = "(1)"
if is_primary == 0:
primary_in = "(0)"
self.cursor.execute(
self.query(
"SELECT role, alias FROM roles WHERE is_primary IN {0} AND server_id = %(server)s".format(primary_in)),
{"server": server.id})
rows = self.cursor.fetchall()
ret_list = []
for r in rows:
role, alias = r
ret_list.append((str(role), alias))
return ret_list
def blacklist_add(self, ctx, value:str, *, obj:MultiMention):
if isinstance(obj, discord.Server):
kwargs = dict(server_id=int(obj.id))
elif isinstance(obj, discord.Channel):
kwargs = dict(channel_id=int(obj.id))
elif isinstance(obj, discord.Role):
kwargs = dict(role_id=int(obj.id))
elif isinstance(obj, discord.Member):
kwargs = dict(user_id=int(obj.id))
with self.bot.db_scope() as session:
blacklist_obj = session.query(sql.Blacklist).filter_by(**kwargs, data=value).first()
if blacklist_obj is not None:
await self.bot.say(f"{obj.__class__.__name__} **{str(obj)}** has already been blacklisted for `{value}`")
return
else:
blacklist_obj = sql.Blacklist(**kwargs, data=value)
session.add(blacklist_obj)
await self.bot.say(f"Blacklisted {obj.__class__.__name__} **{str(obj)}** for `{value}`")
def blacklist_remove(self, ctx, value:str, *, obj:MultiMention):
if isinstance(obj, discord.Server):
kwargs = dict(server_id=int(obj.id))
elif isinstance(obj, discord.Channel):
kwargs = dict(channel_id=int(obj.id))
elif isinstance(obj, discord.Role):
kwargs = dict(role_id=int(obj.id))
elif isinstance(obj, discord.Member):
kwargs = dict(user_id=int(obj.id))
with self.bot.db_scope() as session:
blacklist_obj = session.query(sql.Blacklist).filter_by(**kwargs, data=value).first()
if blacklist_obj is None:
await self.bot.say(f"{obj.__class__.__name__} **{str(obj)}** is not blacklisted for `{value}`")
return
else:
session.delete(blacklist_obj)
await self.bot.say(f"Removed {obj.__class__.__name__} **{str(obj)}** from blacklist for `{value}`")
def blacklist_search(self, ctx, *, obj:MultiMention):
if isinstance(obj, discord.Server):
kwargs = dict(server_id=int(obj.id))
elif isinstance(obj, discord.Channel):
kwargs = dict(channel_id=int(obj.id))
elif isinstance(obj, discord.Role):
kwargs = dict(role_id=int(obj.id))
elif isinstance(obj, discord.Member):
kwargs = dict(user_id=int(obj.id))
with self.bot.db_scope() as session:
blacklist_objs = session.query(sql.Blacklist).filter_by(**kwargs).all()
if len(blacklist_objs) > 0:
result_text = f"```md\n# {obj.__class__.__name__} {str(obj)} is blacklisted for\n" + "\n".join(f"- {b_obj.data}" for b_obj in blacklist_objs) + "\n```"
else:
result_text = f"```md\n# {obj.__class__.__name__} {str(obj)} is not blacklisted\n```"
await self.bot.say(result_text)
def send_log(self, server: discord.Server, log: Log):
"""Sends a embed corresponding to the log in the log channel of the server"""
if server.id in self.servers_config["servers"]:
if "log channel" in self.servers_config["servers"][server.id]:
embed = log.get_embed(self.bot)
channel = self.servers_config["servers"][server.id]["log channel"]
try:
await self.bot.send_message(destination=channel, embed=embed)
except discord.Forbidden:
await self.bot.send_message(destination=server.owner, content=\
"I'm not allowed to send embeds in the log channel (#" + \
channel.name + "). Please change my permissions.")
except discord.NotFound:
await self.bot.send_message(destination=server.owner, content=\
"I'm not allowed to send embeds in the log channel because " + \
"it doesn't exists anymore. Please set another log channel " + \
"using the `[p]set_log_channel` command.")
except discord.HTTPException:
pass
except discord.InvalidArgument:
pass
def checkServer(self, server):
# Assumes server = discord.Server and serverList is a dict
if not "Servers" in self.serverDict:
# Let's add an empty placeholder
self.serverDict["Servers"] = []
found = False
for x in self.serverDict["Servers"]:
if x["ID"] == server.id:
# We found our server
found = True
# Verify all the default keys have values
for key in self.defaultServer:
if not key in x:
#print("Adding: {} -> {}".format(key, server.name))
if type(self.defaultServer[key]) == dict:
x[key] = {}
elif type(self.defaultServer[key]) == list:
# We have lists/dicts - copy them
x[key] = copy.deepcopy(self.defaultServer[key])
else:
x[key] = self.defaultServer[key]
if not found:
# We didn't locate our server
# print("Server not located, adding...")
# Set name and id - then compare to default server
newServer = { "Name" : server.name, "ID" : server.id }
for key in self.defaultServer:
newServer[key] = self.defaultServer[key]
if type(self.defaultServer[key]) == dict:
newServer[key] = {}
elif type(self.defaultServer[key]) == list:
# We have lists/dicts - copy them
newServer[key] = copy.deepcopy(self.defaultServer[key])
else:
newServer[key] = self.defaultServer[key]
self.serverDict["Servers"].append(newServer)
#self.flushSettings()
# Let's make sure the user is in the specified server
def get_guild(self, guild):
"""Retrieves a Dwarf `Guild` object from the database.
Parameters
----------
guild
Can be a Discord `Server` object or a guild ID.
"""
if isinstance(guild, discord.Server):
return Guild.objects.get(id=guild.id)
else:
return Guild.objects.get(id=guild)
def new_guild(self, guild):
"""Creates a new Dwarf ?Guild? object and connects it to the database.
Parameters
----------
guild
Can be a Discord ?Server? object or a guild ID.
"""
if isinstance(guild, discord.Server):
return Guild(id=guild.id)
else:
return Guild(id=guild)
def get_server(context: Context) -> discord.Server:
""" Gets the server to which a command was sent,
based on the command's context.
:param context: The context in which the command was sent.
:type context: discord.ext.commands.Context
:return: The server
"""
return context.message.server
def kick_user(user, mod, server, bot, reason):
"""
Kicks a user and then logs it to the 'mod_log' channel
:param user: Member object of the user who needs to be kicked
:param mod: Member object of the responsible moderator
:param server: Server object of the server
:param bot: Bot instance to kick and log
:param reason: Reason why user is being kicked
"""
config = bot.cogs['Config']
channel = get_channel_by_name(server, config['aryas']['mod_log_channel_name'])
try:
await bot.kick(user)
msg = '{} was kicked by {}. Reason: {}'.format(user.name, mod.mention, reason)
send(bot, msg, channel, False)
except Exception as e:
config.logger.error(e)
send(bot, 'Failed to kick {} for {}'.format(user.mention, reason), channel, False)
def get_channel_by_name(server: Server, name: str) -> Union[Channel, None]:
"""
Finds a channel by it's name and returns it's channel object
:param server: the server where the channel is in
:param name: the name of the channel
:return: channel object if channel was found, otherwise None
"""
for channel in server.channels:
if channel.name == name:
return channel
return None
def get_all_players(self, server: discord.Server):
return [self.get_player(m) for m in server.members]
def _re_present(self, obj):
"""Determines if any patterns are set for a server or channel"""
if type(obj) is discord.Server:
server = obj
if server.id in self.regexen:
for relist in self.regexen[server.id].values():
if bool(relist): # nonempty list
return True
return False
else:
return False
elif type(obj) is discord.Channel:
server = obj.server
channel = obj
if channel.id in self.regexen[server.id]:
return bool(self.regexen[server.id][channel.id])
else:
return False
elif type(obj) is str: # won't work with ALL_CHANNELS
channel = self.bot.get_channel(obj)
server = channel.server
if channel.id in self.regexen[server.id]:
return bool(self.regexen[server.id][channel.id])
else:
return False
def _ls_excl(self, server):
"""returns a list of channel IDs with exclusive filters"""
clist = []
if type(server) is discord.Server:
server = server.id
if server in self.regexen:
for c, relist in self.regexen[server].items():
if MODE_EXCLUSIVE in relist.values():
clist.append(c)
return clist
# Background cache regexen for speed
def set_default(self, on_off: bool = None):
"""Sets whether logging is on or off where unset.
Server overrides, global override, and attachments don't use this."""
if on_off is not None:
self.settings['default'] = on_off
if self.settings.get('default', False):
await self.bot.say("Logging is enabled by default.")
else:
await self.bot.say("Logging is disabled by default.")
self.save_json()
def should_log(self, location):
if self.settings.get('everything', False):
return True
default = self.settings.get('default', False)
if type(location) is discord.Server:
if location.id in self.settings:
loc = self.settings[location.id]
return loc.get('all', False) or loc.get('events', default)
elif type(location) is discord.Channel:
if location.server.id in self.settings:
loc = self.settings[location.server.id]
opts = [loc.get('all', False), loc.get(location.id, default)]
if location.type is discord.ChannelType.voice:
opts.append(loc.get('voice', False))
return any(opts)
elif type(location) is discord.PrivateChannel:
return self.settings.get('direct', default)
else: # can't log other types
return False
def on_voice_state_update(self, before, after):
if before.voice_channel != after.voice_channel:
if before.voice_channel:
msg = "Voice channel leave: {0} (id {0.id})"
if after.voice_channel:
msg += ' moving to {1.voice_channel}'
await self.log(before.voice_channel, msg.format(before, after))
if after.voice_channel:
msg = "Voice channel join: {0} (id {0.id})"
if before.voice_channel:
msg += ', moved from {0.voice_channel}'
flags = self.get_voice_flags(after)
if flags:
msg += ', flags: %s' % ','.join(flags)
await self.log(after.voice_channel, msg.format(before, after))
if before.deaf != after.deaf:
verb = 'deafen' if after.deaf else 'undeafen'
await self.log(before.voice_channel,
'Server {0}: {1} (id {1.id})'.format(verb, before))
if before.mute != after.mute:
verb = 'mute' if after.mute else 'unmute'
await self.log(before.voice_channel,
'Server {0}: {1} (id {1.id})'.format(verb, before))
if before.self_deaf != after.self_deaf:
verb = 'deafen' if after.self_deaf else 'undeafen'
await self.log(before.voice_channel,
'Server self-{0}: {1} (id {1.id})'.format(verb, before))
if before.self_mute != after.self_mute:
verb = 'mute' if after.self_mute else 'unmute'
await self.log(before.voice_channel,
'Server self-{0}: {1} (id {1.id})'.format(verb, before))
def format_objects(*objects, attr=None, dec: str="", sep: str=None):
""" Return a formatted string of objects (User, Member, Channel or Server) using
the given decorator and the given separator.
:param objects: Any object with attributes, preferably User, Member, Channel or Server.
:param attr: The attribute to get from any object. Defaults to object names.
:param dec: String to decorate around each object.
:param sep: Separator between each argument.
:return: str: the formatted objects.
"""
if not objects:
return
first_object = objects[0]
if attr is None:
if isinstance(first_object, discord.User):
attr = "display_name"
elif isinstance(first_object, discord.Channel) or isinstance(first_object, discord.Role):
attr = "mention"
sep = " "
elif isinstance(first_object, discord.Server):
attr = "name"
sep = sep if sep is not None else ", "
return sep.join(dec + getattr(m, attr) + dec for m in objects)
def set_server_config(server: discord.Server, key: str, value):
""" Set a server config value. """
if server.id not in server_config.data:
server_config.data[server.id] = {}
# Change the value or remove it from the list if the value is None
if value is None:
del server_config.data[server.id][key]
else:
server_config.data[server.id][key] = value
server_config.save()
def server_command_prefix(server: discord.Server):
""" Get the server's command prefix. """
if server is not None and server.id in server_config.data:
return server_config.data[server.id].get("command_prefix", default_command_prefix)
return default_command_prefix
def get_emote(emote_id: str, server: discord.Server):
""" Return the image of a custom emote. """
emote = discord.Emoji(id=emote_id, server=server)
# Return the cached version if possible
if emote.id in emote_cache:
return Image.open(emote_cache[emote.id])
# Otherwise, download the emote, store it in the cache and return
emote_bytes = await utils.download_file(emote.url, bytesio=True)
emote_cache[emote.id] = emote_bytes
return Image.open(emote_bytes)
def format_emoji(text: str, server: discord.Server):
""" Creates a list supporting both emoji and custom emotes. """
char_and_emotes = []
# Loop through each character and compare with custom emotes.
# Add characters to list, along with emote byte-strings
text_iter = iter(enumerate(text))
has_custom = False
for i, c in text_iter:
match = emote_regex.match(text[i:])
if match:
char_and_emotes.append(await get_emote(match.group("id"), server))
has_custom = True
# Skip ahead in the iterator
for _ in range(len(match.group(0)) - 1):
next(text_iter)
else:
char_and_emotes.append(c)
parsed_emoji = list(parse_emoji(char_and_emotes))
# When the size of all emoji next to each other is greater than the max width,
# divide the size to properly fit the max_width at all times
size = default_size
if has_custom:
size = emote_size
else:
if size * len(parsed_emoji) > max_width:
scale = 1 / ((size * len(parsed_emoji) - 1) // max_width + 1)
size *= scale
# Return the list of emoji, and set the respective size (should be managed manually if needed)
return [e if isinstance(e, Image.Image) else get_emoji(e, size=size) for e in parsed_emoji], has_custom