def blockreason(self, ctx, anything_id: int):
"""Get a reason for a block if it exists"""
userblock = await self.block_coll.find_one({'user_id': anything_id})
if userblock is not None:
e = discord.Embed(title='User blocked')
e.description = f'<@{anything_id}> - {userblock.get("reason")}'
return await ctx.send(embed=e)
guildblock = await self.block_coll.find_one({'guild_id': anything_id})
if guildblock is not None:
e = discord.Embed(title='Guild blocked')
e.description = f'why? {userblock.get("reason")}'
return await ctx.send(embed=e)
await ctx.send('Block not found')
python类Guild()的实例源码
def test_lookup(self):
m = utils.TypeMap()
m.put(discord.abc.User, "something")
m.put(discord.TextChannel, "something else")
m.put(discord.Guild, "another thing")
self.assertEquals("something", m.lookup(discord.abc.User))
self.assertEquals("something", m.lookup(discord.User))
self.assertEquals("something", m.lookup(discord.Member))
self.assertEquals("something else", m.lookup(discord.TextChannel))
self.assertEquals(None, m.lookup(discord.DMChannel))
def test_constructed(self):
m = utils.TypeMap({
discord.abc.User: "something",
discord.TextChannel: "something else",
discord.Guild: "another thing"
})
self.assertEquals("something", m.lookup(discord.abc.User))
self.assertEquals("something", m.lookup(discord.User))
self.assertEquals("something", m.lookup(discord.Member))
self.assertEquals("something else", m.lookup(discord.TextChannel))
self.assertEquals(None, m.lookup(discord.DMChannel))
def __init__(self, config, database, redis):
self.database = database
self.redis = redis
self._mapping = utils.TypeMap()
self._lru = LRU(2048)
self._lru_types = set()
self.register_mapping(discord.abc.User, 'member')
self.register_mapping(discord.Guild, 'server')
self.register_mapping(discord.TextChannel, 'channel')
def get_joinleave_channel(member):
"""
Fetches instance of `discord.TextChannel` in
instance of `discord.Guild` using an instance
of `discord.Member`.
"""
return discord.utils.get(member.guild.channels, name='joinleave')
def is_mod(guild: discord.Guild, member: discord.Member):
return discord.utils.get(guild.roles, name="Mods") in member.roles
def on_member_ban(self, guild: Guild, user):
if not await self.bot.config_is_set(guild, 'pollr_mod_log'):
# not configured to announce bans here
return
logger.debug('pollr ban process: %d', user.id)
# grab bans channel
bans = discord.utils.get(guild.text_channels, name='bans')
if not bans:
logger.debug('not announcing ban, couldn\'t find channel. gid=%d', guild.id)
return
ban = f'**Ban:** {describe(user)}'
# get my permissions
perms = guild.me.guild_permissions
# check to see if they were banned via dogbot's ban command, and provide cleaner insight
information = await self.ban_debounce.check(
member_id=user.id,
return_information=True,
partial=True,
wait_period=2.0
)
if information:
reason = information['reason']
responsible = information['moderator']
ban += f'\n**Responsible:** {describe(responsible)}\n**Reason:** {reason}'
if not information and perms.view_audit_log:
# fall back to audit log
await asyncio.sleep(1) # wait a bit, because audit logs can be slow
logs = await guild.audit_logs(action=AuditLogAction.ban, limit=5).flatten()
# grab the entry that targeted our banned user
entry = get(logs, target=user)
if entry:
ban += f'\n**Responsible:** {describe(entry.user)}'
if entry.reason:
ban += f'\n**Reason:** {entry.reason}'
try:
# send the ban message
await bans.send(ban)
except discord.HTTPException as exc:
logger.warning('Cannot announce ban, error! gid=%d %s', guild.id, exc)
def get_name(self, user_id: int, account=None):
"""Get a string representation of a user or guild.
Parameters
----------
user_id: int
User ID to get a string representation of.
account: dict, optional
Account object to determine if this is
A user or a guild to search.
Returns
-------
str
"""
if isinstance(user_id, discord.Guild):
return f'taxbank:{user_id.name}'
elif isinstance(user_id, discord.User):
return str(user_id)
obj = self.bot.get_user(int(user_id))
if not obj:
# try to find guild
obj = self.bot.get_guild(user_id)
if obj:
obj = f'taxbank:{obj}'
if not obj:
# we tried stuff, show a special text
if account:
res = ''
if account['type'] == AccountType.USER:
res = f'Unfindable User {user_id}'
elif account['type'] == AccountType.TAXBANK:
res = f'Unfindable Guild {user_id}'
else:
res = f'Unfindable Unknown {user_id}'
return res
else:
return f'Unfindable ID {user_id}'
return str(obj)
def do_heist(self, ctx) -> dict:
"""Actually does the heist.
Returns
-------
dict
With data about if the heist was successful and the amount stolen,
or which members went to jail if it was unsuccessful.
"""
await self.finish.wait()
log.info('Doing the heist')
res = {
'success': False,
'jailed': [],
'saved': [],
}
bot = ctx.bot
jcoin = bot.get_cog('Coins')
if not jcoin:
raise SayException('coins cog not loaded')
target_account = await jcoin.get_account(self.target.id)
if target_account is None:
raise SayException('Guild not found')
if target_account['type'] != 'taxbank':
raise SayException('Account is not a taxbank')
amnt = target_account['amount']
increase_people = len(self.users) * INCREASE_PER_PERSON
chance = BASE_HEIST_CHANCE + ((amnt / self.amount) + increase_people) * HEIST_CONSTANT
# trim it to 60% success
if chance > 6:
chance = 6
result = random.uniform(0, 10)
res['chance'] = chance
res['result'] = result
if result < chance:
res['success'] = True
else:
res['success'] = False
# 50% chance of every person
# in the heist to go to jail
for user_id in self.users:
if random.random() < 0.5:
res['jailed'].append(user_id)
else:
res['saved'].append(user_id)
return res
def heist(self, ctx,
target: GuildConverter, amount: CoinConverter):
"""Heist a server.
This works better if you have more people joining in your heist.
- As soon as you use this command, a heist join session will start.
- This session requires that all other people that want to join the
heist to use the "j!heist join" command
- There is a timeout of 3 minutes on the heist join session.
- If your heist fails, all participants of the heist will be sentenced
to jail or not, its random.
- If your heist succeedes, you get a type 1 cooldown of 7 hours.
it will show you are "regenerating your steal points".
"""
if amount > 200:
return await ctx.send('Cannot heist more than 200JC.')
account = await self.coins.get_account(ctx.author.id)
if amount > account['amount']:
raise self.SayException('You cant heist more than'
' what you currently hold.')
for session in self.sessions.values():
if session.target.id == target.id:
raise self.SayException('An already existing session '
'exists with the same target')
if target == ctx.guild:
raise self.SayException('stealing from the same guild? :thinking:')
taxbank = await self.coins.get_account(target.id)
if not taxbank:
raise self.SayException('Guild taxbank account not found')
if amount > taxbank['amount']:
raise self.SayException('You cannot steal more than the '
'guild taxbank currently holds.')
session = self.get_sess(ctx, target, True)
try:
await self.check_user(ctx, session)
except self.SayException as err:
self.sessions.pop(ctx.guild.id)
raise err
session.amount = amount
session.add_member(ctx.author.id)
session.task = self.loop.create_task(session.do_heist(ctx))
await ctx.send('Join session started!')
# timeout of 5 minutes accepting members
# OR "j!heist raid"
await asyncio.sleep(300)
if not session.finish.is_set():
await session.process_heist(await session.force_finish())