def nostalgia(self, ctx, date: date = None, *, channel: discord.TextChannel = None):
"""Pins an old message from a specific date.
If a date is not given, then pins first message from the channel.
If a channel is not given, then pins from the channel the
command was ran on.
The format of the date must be either YYYY-MM-DD or YYYY/MM/DD.
"""
if channel is None:
channel = ctx.channel
if date is None:
date = channel.created_at
async for m in ctx.history(after=date, limit=1):
try:
await m.pin()
except:
await ctx.send('\N{THUMBS DOWN SIGN} Could not pin message.')
python类TextChannel()的实例源码
def dad_jokes(message, match):
name = ' '.join(match.group(5).replace('@everyone', '@\N{ZERO WIDTH SPACE}everyone').replace('@here', '@\N{ZERO WIDTH SPACE}here').split()[0:4])
if name.startswith((',', '.', '!')):
name = name[1:].lstrip()
for mem in message.mentions:
name = name.replace(mem.mention, [sub for sub in [mem.display_name, mem.name, '<\\@%s>' % mem.id] if '<@' not in sub][0])
for role in message.role_mentions:
name = name.replace(role.mention, role.name if '<@' not in role.name else '<\\@&%s>' % role.id)
response = await message.channel.send('Hello **%s**, I\'m %s!' % (name, message.guild.me.display_name if isinstance(message.channel, discord.TextChannel) else message.channel.me.display_name))
try:
await bot.wait_for('message_delete', check=lambda m: m == message, timeout=60)
except asyncio.TimeoutError:
pass
else:
await response.delete()
### LONGCAT
def afk_send(ctx, message_key, *args, **kwargs):
global afk_targets
if afk_targets is None:
afk_targets = {channel.id: channel for channel in ctx.bot.get_all_channels() if isinstance(channel, discord.TextChannel)}
afk_targets.update({mem.id: mem for mem in ctx.bot.get_all_members() if mem.bot})
for info in ctx.bot.config['afk_messages']:
if message_key in info:
trigger = await afk_targets[info['dest']].send(info[message_key].format(*args, **kwargs))
try:
response = await ctx.bot.wait_for('message', check=lambda m: m.channel == trigger.channel, timeout=10)
await response.ack()
except asyncio.TimeoutError:
pass
await ctx.message.delete()
def create_reminder(self, ctx, due, note):
cid = ctx.channel.id if isinstance(ctx.channel, discord.TextChannel) else ctx.author.id
await self.bot.pgpool.execute(
"""
INSERT INTO reminders
(author_id, channel_id, note, due)
VALUES ($1, $2, $3, $4)
""",
ctx.author.id, cid, note, due
)
logger.debug('Creating reminder -- due=%s note=%s cid=%d aid=%d', due, note, cid, ctx.author.id)
# we just created a reminder, we definitely have one now!
self.queue.has_item.set()
# check if it's earlier
if self.queue.current_item and self.queue.current_item['due'] > due:
logger.debug('Got a reminder that is due earlier than the current one, rebooting task!')
self.queue.reboot()
def watch(self, ctx, channel: discord.TextChannel, subreddit: str):
"""
Sets up a channel for me to forward hot posts to, from a subreddit of your choosing.
Only Dogbot Moderators may use this command.
"""
# check that there isn't too many feeds
async with self.bot.pgpool.acquire() as conn:
count = (await conn.fetchrow('SELECT COUNT(*) FROM reddit_feeds WHERE guild_id = $1', ctx.guild.id))['count']
logger.debug('Guild %s (%d) has %d feeds', ctx.guild.name, ctx.guild.id, count)
if count >= 2:
# they have 2 feeds, which is the max
return await ctx.send(
f'You have too many feeds! You can only have two at a time. Use `{ctx.prefix}reddit feeds` '
'check the feeds in this server.'
)
await conn.execute('INSERT INTO reddit_feeds VALUES ($1, $2, $3)', ctx.guild.id, channel.id, subreddit)
await ctx.ok()
def is_publicly_visible(bot: Dogbot, channel: discord.TextChannel) -> bool:
"""
Returns whether a text channel should be considered as "publicly visible".
If the guild has been configured to log all message events, this will always return True.
Args:
bot: The bot instance.
channel: The channel to check for.
Returns:
Whether the text channel is considered "publicly visible".
"""
# guild is configured to log all message events
if await bot.config_is_set(channel.guild, 'log_all_message_events'):
return True
# find the @everyone overwrite for the channel
everyone_overwrite = discord.utils.find(lambda t: t[0].name == '@everyone', channel.overwrites)
return everyone_overwrite is None or everyone_overwrite[1].read_messages is not False
def _display_plonked(self, ctx, entries, plonk):
# things = channels, members
colour, action = _plonk_embed_mappings[plonk]
embed = (discord.Embed(colour=colour)
.set_author(name=f'{action.title()} successful!', icon_url=PLONK_ICON)
)
for thing in map(list, partition(lambda e: isinstance(e, discord.TextChannel), entries)):
if not thing:
continue
name = f'{_get_class_name(thing[0])}{"s" * (len(thing) != 1)} {action}ed'
value = truncate(', '.join(map(str, thing)), 1024, '...')
embed.add_field(name=name, value=value, inline=False)
await ctx.send(embed=embed)
def modattach(self, ctx, memberlog: discord.TextChannel, modlog: discord.TextChannel):
"""Attach channels to the moderation system"""
modconfig = await self.modcfg_get(ctx.guild)
if modconfig is not None:
raise self.SayException('Mod config already exists.')
modconfig = {
'guild_id': ctx.guild.id,
'member_log_id': memberlog.id,
'mod_log_id': modlog.id,
'last_action_id': 0,
}
log.info('[modlog:attach] Creating a modcfg @ g=%s[gid=%d]',
ctx.guild.name, ctx.guild.id)
res = await self.modcfg_coll.insert_one(modconfig)
await ctx.success(res.acknowledged)
def starattach(self, ctx, starboard_chan: discord.TextChannel):
"""Attach an existing channel as a starboard.
With this command you can create your starboard
without needing José to automatically create the starboard for you
"""
config = await self.get_starconfig(ctx.guild.id)
if config:
return await ctx.send('You already have a starboard config setup.')
config = empty_starconfig(ctx.guild)
config['starboard_id'] = starboard_chan.id
res = await self.starconfig_coll.insert_one(config)
if not res.acknowledged:
raise self.SayException('Failed to create starboard config (no ack)')
return
await ctx.send('Done!')
await ctx.ok()
def mentionspam_ignore(self, ctx, *channels: discord.TextChannel):
"""Specifies what channels ignore mentionspam auto-bans.
If a channel is given then that channel will no longer be protected
by auto-banning from mention spammers.
To use this command you must have the Ban Members permission.
"""
query = """UPDATE guild_mod_config
SET safe_mention_channel_ids =
ARRAY(SELECT DISTINCT * FROM unnest(COALESCE(safe_mention_channel_ids, '{}') || $2::bigint[]))
WHERE id = $1;
"""
if len(channels) == 0:
return await ctx.send('Missing channels to ignore.')
channel_ids = [c.id for c in channels]
await ctx.db.execute(query, ctx.guild.id, channel_ids)
self.get_guild_settings.invalidate(self, ctx.guild.id)
await ctx.send(f'Mentions are now ignored on {", ".join(c.mention for c in channels)}.')
def mentionspam_unignore(self, ctx, *channels: discord.TextChannel):
"""Specifies what channels to take off the ignore list.
To use this command you must have the Ban Members permission.
"""
if len(channels) == 0:
return await ctx.send('Missing channels to protect.')
query = """UPDATE guild_mod_config
SET safe_mention_channel_ids =
ARRAY(SELECT element FROM unnest(safe_mention_channel_ids) AS element
WHERE NOT(element = ANY($2::bigint[])))
WHERE id = $1;
"""
await ctx.db.execute(query, ctx.guild.id, [c.id for c in channels])
self.get_guild_settings.invalidate(self, ctx.guild.id)
await ctx.send('Updated mentionspam ignore list.')
def reaction_action(self, fmt, emoji, message_id, channel_id, user_id):
if str(emoji) != '\N{WHITE MEDIUM STAR}':
return
channel = self.bot.get_channel(channel_id)
if not isinstance(channel, discord.TextChannel):
return
method = getattr(self, f'{fmt}_message')
user = self.bot.get_user(user_id)
if user is None or user.bot:
return
async with self.bot.pool.acquire() as con:
config = self.bot.get_cog('Config')
if config:
plonked = await config.is_plonked(channel.guild.id, user_id, channel_id=channel_id, connection=con)
if plonked:
return
try:
await method(channel, message_id, user_id, connection=con)
except StarError:
pass
def on_raw_message_delete(self, message_id, channel_id):
if message_id in self._about_to_be_deleted:
# we triggered this deletion ourselves and
# we don't need to drop it from the database
self._about_to_be_deleted.discard(message_id)
return
channel = self.bot.get_channel(channel_id)
if channel is None or not isinstance(channel, discord.TextChannel):
return
starboard = await self.get_starboard(channel.guild.id)
if starboard.channel is None or starboard.channel.id != channel_id:
return
# at this point a message got deleted in the starboard
# so just delete it from the database
async with self.bot.pool.acquire() as con:
query = "DELETE FROM starboard_entries WHERE bot_message_id=$1;"
await con.execute(query, message_id)
def on_raw_bulk_message_delete(self, message_ids, channel_id):
if message_ids <= self._about_to_be_deleted:
# see comment above
self._about_to_be_deleted.difference_update(message_ids)
return
channel = self.bot.get_channel(channel_id)
if channel is None or not isinstance(channel, discord.TextChannel):
return
starboard = await self.get_starboard(channel.guild.id)
if starboard.channel is None or starboard.channel.id != channel_id:
return
async with self.bot.pool.acquire() as con:
query = "DELETE FROM starboard_entries WHERE bot_message_id=ANY($1::bigint[]);"
await con.execute(query, list(message_ids))
def on_raw_reaction_clear(self, message_id, channel_id):
channel = self.bot.get_channel(channel_id)
if channel is None or not isinstance(channel, discord.TextChannel):
return
async with self.bot.pool.acquire() as con:
starboard = await self.get_starboard(channel.guild.id, connection=con)
if starboard.channel is None:
return
query = "DELETE FROM starboard_entries WHERE message_id=$1 RETURNING bot_message_id;"
bot_message_id = await con.fetchrow(query, message_id)
if bot_message_id is None:
return
bot_message_id = bot_message_id[0]
msg = await self.get_message(starboard.channel, bot_message_id)
if msg is not None:
await msg.delete()
def nostalgia(self, ctx, date: date, *, channel: discord.TextChannel = None):
"""Pins an old message from a specific date.
If a channel is not given, then pins from the channel the
command was ran on.
The format of the date must be either YYYY-MM-DD or YYYY/MM/DD.
"""
channel = channel or ctx.channel
message = await channel.history(after=date, limit=1).flatten()
if len(message) == 0:
return await ctx.send('Could not find message.')
message = message[0]
try:
await message.pin()
except discord.HTTPException:
await ctx.send('Could not pin message.')
else:
await ctx.send('Pinned message.')
def on_guild_channel_update(self, before, after):
self._log_ignored(f"Channel was updated in guild {after.guild.id}")
if not await self._accept_channel(after):
return
if before.name != after.name:
changed = f' (now {after.name})'
else:
changed = ''
if isinstance(after, discord.TextChannel):
self.logger.info(f"Channel #{before.name}{changed} was changed in {after.guild.name}")
with self.sql.transaction() as trans:
self.sql.update_channel(trans, after)
# pylint: disable=not-callable
hook = self.hooks['on_guild_channel_update']
if hook:
self.logger.debug(f"Found hook {hook!r}, calling it")
await hook(before, after)
elif isinstance(after, discord.VoiceChannel):
self.logger.info("Voice channel {before.name}{changed} was changed in {after.guild.name}")
with self.sql.transaction() as trans:
self.sql.update_voice_channel(trans, after)
elif isinstance(after, discord.CategoryChannel):
self.logger.info(f"Channel category {before.name}{changed} was changed in {after.guild.name}")
with self.sql.transaction() as trans:
self.sql.update_channel_category(trans, after)
def lol(ctx):
if ctx.message.guild.id == 213468583252983809:
msg = await ctx.send("Searching channels... (this may take a while)")
l = 0
for c in ctx.message.guild.channels:
if isinstance(c, discord.TextChannel):
if c.permissions_for(ctx.message.guild.me).read_messages:
async for m in c.history():
if m.author.id == 132584525296435200 and "lol" in m.content.lower():
l = l+1
await msg.edit(content="Lars' total lol counter so far is: `{}`".format(l))
# force update
def link(self, ctx, text: discord.TextChannel, *, voice: discord.VoiceChannel):
"""Links an existing text channel to a voice channel."""
with ctx.session:
link = ctx.session.get(TextVoiceLink, text_id=text.id, voice_id=voice.id).one_or_none()
if link is not None:
return await ctx.send('BAKA! Those channels are already linked!')
role = await self._create_role(ctx.guild, text, voice, 'Voice link requested by {}'.format(ctx.author))
link = ctx.session.add(TextVoiceLink(role_id=role.id, text_id=text.id, voice_id=voice.id))
await ctx.send(content='Linked {} to "{}"'.format(text.mention, voice.name))
def unlink(self, ctx, text: discord.TextChannel, *, voice: discord.VoiceChannel):
"""Unlinks a voice channel and deletes the corresponding role."""
with ctx.session:
link = ctx.session.get(TextVoiceLink, text_id=text.id, voice_id=voice.id).one_or_none()
if link is None:
return await ctx.send('BAKA! Those channels are not linked!')
role_id, text_id, voice_id = link.role_id, link.text_id, link.voice_id
ctx.session.delete(link)
role = discord.utils.get(ctx.guild.roles, id=role_id)
if role is None:
await ctx.send(content='Unlinked {} from "{}" and deleted the "{}" role.'.format(text.mention, voice.name, role.name))
else:
await self._delete_role(ctx.guild, role, text, voice, 'Voice unlink requested by {}'.format(ctx.author))
await ctx.send(content='Unlinked {} from "{}" and deleted the "{}" role.'.format(text.mention, voice.name, role.name))
def on_command_error(self, ctx, err):
if isinstance(err, errors.PermissionDenied):
await ctx.send('BAKA! You do not have my permission!')
elif isinstance(err, errors.MissingPermissions):
await ctx.send('BAKA! I require these permissions: %s' % ', '.join(err.args))
elif isinstance(err, commands.NoPrivateMessage):
await ctx.send('BAKA! This command does not work in private messages!')
elif isinstance(err, commands.BadArgument):
str_err = str(err)
if not str_err.endswith(('.', '!')):
str_err += '.'
await ctx.send('BAKA! %s' % str_err)
elif isinstance(err, commands.MissingRequiredArgument):
await ctx.send('BAKA! Missing required argument: `%s`' % err.args[0].partition(' ')[0])
elif isinstance(err, commands.TooManyArguments):
await ctx.send('BAKA! Too many arguments!')
elif isinstance(err, commands.CommandNotFound):
pass
else:
await ctx.send('```\n%s\n```' % ''.join(traceback.format_exception_only(type(err), err)).strip())
if isinstance(ctx.channel, discord.TextChannel):
log.error('Error in command <{0}> ({1.name!r}({1.id}) {2}({2.id}) {3}({3.id}) {4!r})'.format(ctx.command, ctx.guild, ctx.channel, ctx.author, ctx.message.content))
else:
log.error('Error in command <{0}> (DM {1}({1.id}) {2!r})'.format(ctx.command, ctx.channel.recipient, ctx.message.content))
log.error(''.join(traceback.format_exception(type(err), err, err.__traceback__)))
def channelinfo(self, ctx, *, channel: discord.TextChannel=None):
"""Display information about a text channel.
Defaults to the current channel.
* channel - Optional argument. A specific channel to get information about."""
# If channel is None, then it is set to ctx.channel.
channel = channel or ctx.channel
embed = discord.Embed(title=f"{channel.name}")
try:
embed.description = channel.topic
except AttributeError:
pass
embed.add_field(name="Channel ID", value=channel.id)
try:
embed.add_field(name="Guild", value=channel.guild.name)
except AttributeError:
pass
embed.add_field(name="Members", value=len(channel.members))
embed.add_field(name="Created at", value=channel.created_at.ctime())
if channel.is_nsfw():
embed.set_footer(text="NSFW content is allowed for this channel.")
await ctx.send(embed=embed)
def c4_enable(self, ctx, *, chan: discord.TextChannel=None):
"""Enable Connect Four on a channel
Run without an argument to enable on the current channel
Pass a channel as an argument to enable on that channel"""
if not chan:
chan = ctx.channel
if self.db.sadd(f"{self.app_name}:c4:allowed_channels", chan.id):
await self.message(ctx, msg="Connect Four successfully enabled on channel.")
else:
await self.message(ctx, msg="Connect Four already enabled on channel.", level=1)
def c4_disable(self, ctx, *, chan: discord.TextChannel=None):
"""Disable Connect Four on a channel
Run without an argument to disabled on the current channel
Pass a channel as an argument to disable on that channel"""
if not chan:
chan = ctx.channel
if self.db.srem(f"{self.app_name}:c4:allowed_channels", chan.id):
await self.message(ctx, msg="Connect Four successfully disabled on channel.")
else:
await self.message(ctx, msg="Connect Four already disabled on channel.", level=1)
def on_message(message):
default_prefix = get_default_prefix()
if not isinstance(message.channel, discord.TextChannel):
bot.command_prefix = [default_prefix]
else:
guild_prefix = db.hget(f'{config}:prefix', message.guild.id)
if guild_prefix:
bot.command_prefix = [guild_prefix, default_prefix]
else:
bot.command_prefix = [default_prefix]
await bot.process_commands(message)
def check_nsfw(self):
if isinstance(self.message.channel, discord.TextChannel):
if self.cmd.nsfw:
if self.message.channel.is_nsfw():
self.nsfw_denied = False
else:
self.nsfw_denied = True
else:
self.nsfw_denied = False
else:
self.nsfw_denied = False
def count_channels(channels):
text = 0
voice = 0
categories = 0
for channel in channels:
if isinstance(channel, discord.TextChannel):
text += 1
elif isinstance(channel, discord.VoiceChannel):
voice += 1
elif isinstance(channel, discord.CategoryChannel):
categories += 1
return text, voice, categories
def hardunmute(cmd, message, args):
if message.author.permissions_in(message.channel).manage_channels:
if message.mentions:
target = message.mentions[0]
if len(args) > 1:
reason = ' '.join(args[1:])
else:
reason = 'Not stated.'
hierarchy_me = hierarchy_permit(message.guild.me, target)
if hierarchy_me:
hierarchy_auth = hierarchy_permit(message.author, target)
if hierarchy_auth:
ongoing = discord.Embed(color=0x696969, title='? Editing permissions...')
ongoing_msg = await message.channel.send(embed=ongoing)
for channel in message.guild.channels:
if isinstance(channel, discord.TextChannel) or isinstance(channel, discord.CategoryChannel):
try:
await channel.set_permissions(target, overwrite=None, reason=reason)
except discord.Forbidden:
pass
log_embed = generate_log_embed(message, target, args)
await log_event(cmd.db, message.guild, log_embed)
title = f'? {target.display_name} has been hard-unmuted.'
response = discord.Embed(color=0x77B255, title=title)
await ongoing_msg.delete()
else:
response = discord.Embed(color=0xBE1931, title='? That user is euqal or above you.')
else:
response = discord.Embed(color=0xBE1931, title='? I can\'t mute a user equal or above me.')
else:
response = discord.Embed(color=0xBE1931, title='? No user targetted.')
else:
response = discord.Embed(title='? Access Denied. Manage Channels needed.', color=0xBE1931)
await message.channel.send(embed=response)
def quote(cmd, message, args):
if args:
lookup = args[0]
try:
lookup = int(lookup)
except ValueError:
lookup = None
if lookup:
msg = None
for channel in message.guild.channels:
if isinstance(channel, discord.TextChannel):
try:
msg = await channel.get_message(lookup)
break
except discord.Forbidden:
msg = None
except discord.NotFound:
msg = None
if msg:
if msg.content:
location = f'{msg.guild.name} | #{msg.channel.name}'
response = discord.Embed(color=msg.author.color, timestamp=msg.created_at)
response.set_author(name=f'{msg.author.display_name}', icon_url=user_avatar(msg.author))
response.description = msg.content
response.set_footer(text=location)
else:
response = discord.Embed(color=0xBE1931, title='? That message has no text content.')
else:
response = discord.Embed(color=0xBE1931, title='? Message not found.')
else:
response = discord.Embed(color=0xBE1931, title='? Invalid ID given.')
else:
response = discord.Embed(color=0xBE1931, title='? No ID given.')
await message.channel.send(embed=response)
def _get_recent_image(channel: discord.TextChannel) -> typing.Optional[discord.Message]:
async for msg in channel.history(limit=100):
# Scan any attached images.
for attachment in msg.attachments:
if attachment.height:
return attachment.proxy_url
# Scan any embeds in the message.
for embed in msg.embeds:
if embed.image is discord.Embed.Empty:
continue
return embed.image.proxy_url