python类get()的实例源码

bot.py 文件源码 项目:ModTools 作者: MattBSG 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def cmd_info(self, message, author, server):
        """
        Usage: {command_prefix}info
        Sends a whole buncha info pertaining to the bot to the chat!
        """
        return Response(
                'I was coded by SexualRhinoceros and modified by MattBSG. I am currently on v{} ! \nFor documentation on my commands or info on how to get my in your'
                ' server, check out this link! {}'.format(VERSION, DOCUMENTATION_FOR_BOT), reply=True)

#    async def cmd_donate(self, message, author, server):
#        """
#        Usage: {command_prefix}donate
#        Sends a whole buncha info pertaining to rhino's patreon to the chat!
#        """
#        return Response('Thanks for considering donating! If you want to support me monthly, check out my'
#                        ' Patreon here\n\t{}\nor for one time, you can find my paypal here\n\t{}'
#                        ''.format(RHINO_PATREON, RHINO_STREAMTIP),
#                        reply=True)
bot.py 文件源码 项目:ModTools 作者: MattBSG 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def cmd_ignore(self, message, author, server, option, new_id, reason=None):
        """
        Usage: {command_prefix}ignore [ + | - | add | remove ] <channel ID> ["reason"]
        Adds or removes the channel ID to the list of ignored channels when outputting to the server log
        """
        if await self.has_roles(message.channel, author, server, command='ignore'):
            if option not in ['+', '-', 'add', 'remove']:
                raise CommandError('Invalid option "%s" specified, use +, -, add, or remove' % option)
            try:
                channel = discord.utils.get(server.channels, id=new_id)
                if not channel:
                    int('this')
            except:
                raise CommandError('Invalid Channel: {}'.format(new_id))
            if option in ['+', 'add']:
                self.server_index[server.id][12].append(channel.id)
            else:
                try:
                    self.server_index[server.id][12].remove(channel.id)
                except ValueError:
                    raise CommandError('No such channel in ignore list : {}'.format(new_id))
            await self.write_to_modlog(message, author, server, reason)
bot.py 文件源码 项目:ModTools 作者: MattBSG 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def cmd_dropdeadbeats(self, message, author, server):
        """
        Usage: {command_prefix}dropdeadbeats
        Removes the bot from all dead beat servers who never register
        """
        if author.id == self.config.master_id:
            server_leave_array = []
            for server in self.servers:
                if server.id not in self.server_index:
                    rh1 = discord.utils.get(server.members, id=self.user.id)
                    if datetime.utcnow() - timedelta(hours=24) > rh1.joined_at:
                        server_leave_array.append(server)

            servers_list = []
            if server_leave_array:
                for dbserver in server_leave_array:
                    print('Leaving Deadbeat Server : {}'.format(dbserver.name))
                    servers_list.append(dbserver.name)
                    await self.leave_server(dbserver)
            return Response('Dropped servers: ```%s```' % ', '.join(servers_list), reply=True)
        return
bot.py 文件源码 项目:ModTools 作者: MattBSG 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def cmd_blserver(self, author, key):
        """
        Usage: {command_prefix}lurk
        Force the bot to lurk in a server rather than send shit to it or leave
        after the time is up
        """
        if author.id in [self.config.master_id]:
            try:
                if discord.utils.get(self.servers, name=key):
                    await self.leave_server(discord.utils.get(self.servers, name=key))
                    self.globalbans.add(discord.utils.get(self.servers, name=key).id)
                elif discord.utils.get(self.servers, id=key):
                    await self.leave_server(discord.utils.get(self.servers, id=key))
                    self.globalbans.add(discord.utils.get(self.servers, id=key).id)
                else:
                    print('I did fuck all')
                print('Server %s was blacklisted' % key)
                sban = open('config/globalbans.txt', 'a')
                self.globalbans = str(self.globalbans)[5:0] + key + '\n'
                sban.write(self.globalbans)
                sban.close()
                return Response(':thumbsup:', reply=True)
            except:
                return Response(':thumbsdown:', reply=True)
        return
claw.py 文件源码 项目:my-spider 作者: time-river 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _fetch_page(self, request):
        try:
            with aiohttp.Timeout(10):
                async with aiohttp.get(request['url'], params=request['params'], headers=request['headers']) as response:
                    try:
                        assert response.status == 200
                        if request['type'] == 'json':
                            content = await response.json()
                        else:
                            content = await response.text(request['type'])
                        obj = {'order':request['order'], 'content':content}
                        redis_push(self.redis, self.content_key, obj)
                    except AssertionError:
                        logging.warning('{} {}'.format(response.status, url))
        except: # kinds of error, not only asyncio.TimeoutError
            #redis_push(self.redis, self.request_key, request)
            pass
streams.py 文件源码 项目:KeekoBot 作者: DavidNeon 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def twitch_online(self, stream):
        session = aiohttp.ClientSession()
        url = "https://api.twitch.tv/kraken/streams/" + stream
        header = {'Client-ID': self.settings.get("TWITCH_TOKEN", "")}
        try:
            async with session.get(url, headers=header) as r:
                data = await r.json(encoding='utf-8')
            await session.close()
            if r.status == 400:
                return 400
            elif r.status == 404:
                return 404
            elif data["stream"] is None:
                return False
            elif data["stream"]:
                embed = self.twitch_embed(data)
                return embed
        except:
            return "error"
        return "error"
streams.py 文件源码 项目:KeekoBot 作者: DavidNeon 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def beam_online(self, stream):
        url = "https://beam.pro/api/v1/channels/" + stream
        try:
            async with aiohttp.get(url) as r:
                data = await r.json(encoding='utf-8')
            if "online" in data:
                if data["online"] is True:
                    data = self.beam_embed(data)
                    return data
                else:
                    return False
            elif "error" in data:
                return None
        except:
            return "error"
        return "error"
image.py 文件源码 项目:KeekoBot 作者: DavidNeon 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def gif(self, *text):
        """Retrieves first search result from giphy

        gif [keyword]"""
        if len(text) > 0:
            if len(text[0]) > 1 and len(text[0]) < 20:
                try:
                    msg = "+".join(text)
                    search = "http://api.giphy.com/v1/gifs/search?q=" + msg + "&api_key=dc6zaTOxFJmzC"
                    async with aiohttp.get(search) as r:
                        result = await r.json()
                    if result["data"] != []:
                        url = result["data"][0]["url"]
                        await self.bot.say(url)
                    else:
                        await self.bot.say("Your search terms gave no results.")
                except:
                    await self.bot.say("Error.")
            else:
                await self.bot.say("Invalid search.")
        else:
            await self.bot.say("gif [text]")
utility.py 文件源码 项目:discord_bot 作者: Der-Eddy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def emoji(self, ctx, emojiname: str):
        '''Gibt eine vergrößerte Version eines angegebenen Emojis zurück

        Beispiel:
        -----------

        :emoji Emilia
        '''
        emoji = discord.utils.find(lambda e: e.name.lower() == emojiname.lower(), self.bot.emojis)
        if emoji:
            tempEmojiFile = 'tempEmoji.png'
            async with aiohttp.ClientSession() as cs:
                async with cs.get(emoji.url) as img:
                    with open(tempEmojiFile, 'wb') as f:
                        f.write(await img.read())
                f = discord.File(tempEmojiFile)
                await ctx.send(file=f)
                os.remove(tempEmojiFile)
        else:
            await ctx.send(':x: Konnte das angegebene Emoji leider nicht finden :(')
anime.py 文件源码 项目:discord_bot 作者: Der-Eddy 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def nsfw(self, ctx):
        '''Vergibt die Rolle um auf die NSFW Channel zugreifen zu können'''
        if ctx.guild.id == loadconfig.__botserverid__:
            if loadconfig.__selfassignrole__:
                role = discord.utils.get(ctx.guild.roles, name=loadconfig.__selfassignrole__)
                if role in ctx.author.roles:
                    try:
                        await ctx.author.remove_roles(role)
                    except:
                        pass
                    tmp = await ctx.send(f':x: Rolle **{role}** wurde entfernt')
                else:
                    try:
                        await ctx.author.add_roles(role)
                    except:
                        pass
                    tmp = await ctx.send(f':white_check_mark: Rolle **{role}** wurde hinzugefügt')
            else:
                tmp = await ctx.send('**:no_entry:** Es wurde keine Rolle für den Bot eingestellt! Wende dich bitte an den Bot Admin')
        else:
            tmp = await ctx.send(f'**:no_entry:** This command don\'t work on this server!')
        await asyncio.sleep(2 * 60)
        await tmp.delete()
        await ctx.message.delete()
main.py 文件源码 项目:discord_bot 作者: Der-Eddy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _fileCheck(msg):
    url = msg.attachments[0]['url']
    allowedExtension = ['.exe', '.zip', '.rar']
    if url[-4:] in allowedExtension:
        name = os.path.basename(url)
        downloadPath = 'tmp\\' + name
        async with aiohttp.get(url) as download:
            with open(downloadPath, 'wb') as f:
                f.write(await download.read())
        stats = os.stat(downloadPath)
        size = stats.st_size
        KBSize = round(size / 1024, 3)
        MBSize = round(size / 1024 / 1024, 3)
        MD5 = _getHash(downloadPath, hashlib.md5())
        SHA1 = _getHash(downloadPath, hashlib.sha1())
        SHA256 = _getHash(downloadPath, hashlib.sha256())
        SHA512 = _getHash(downloadPath, hashlib.sha512())
        msg = f'**Name:** {name}\n'
        msg += f'**Size:** {MBSize} MB ({size} Bytes)\n'
        msg += f'**MD5:** `{MD5}`\n'
        msg += f'**SHA1:** `{SHA1}`\n'
        msg += f'**SHA256:** `{SHA256}`\n'
        msg += f'**SHA512:** `{SHA512}`\n'
        os.remove(downloadPath)
        return msg
isitdown.py 文件源码 项目:beatrice-witchcogs 作者: PookaMustard 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def isitdown(self, url):
        """Checks if a website is down or up."""
        if url == "":
            await self.bot.say("You haven't entered a website to check.")
            return
        if "http://" not in url or "https://" not in url:
            url = "http://" + url
        try:
            with aiohttp.Timeout(15):
                await self.bot.say("Testing " + url + "…")
                try:
                    response = await aiohttp.get(url, headers = { 'user_agent': headers })
                    if response.status == 200:
                        await self.bot.say(url + " is up and running.")
                    else:
                        await self.bot.say(url + " is down.")
                except:
                    await self.bot.say(url + " is down.")
        except asyncio.TimeoutError:
            await self.bot.say(url + " is down.")
dota.py 文件源码 项目:ORELS-Cogs 作者: orels1 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, bot):
        self.bot = bot
        self.dota_settings = fileIO("data/dota/settings.json", "load")

        # Check for key either in settings or in ENV
        if "key" in self.dota_settings.keys() and self.dota_settings["key"] != "":

            # If exists in setting and is set
            api.set_api_key(self.dota_settings["key"])
            self.key = True

        elif os.environ.get("DOTA2_API_KEY") is not None:

            # If exists in env vars and is set
            api.set_api_key(os.environ.get("DOTA2_API_KEY"))
            self.key = True

        else:
            self.key = False
blizzard.py 文件源码 项目:FlapJack-Cogs 作者: flapjax 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def print_token(self, url, realm):

        thumb_url = 'http://wowtokenprices.com/assets/wowtokeninterlaced.png'

        try:
            async with aiohttp.get(url, headers=self.header) as response:
                soup = BeautifulSoup(await response.text(), "html.parser")

            data = soup.find('div', class_=realm + '-region-div')
            desc = data.div.a.h3.text
            buy_price = data.div.find('p', class_='money-text').text
            trend = data.div.find('span', class_='money-text-small').text
            # Update time broken, returns --:-- -- when requested by bot
            #updated = data.div.find('p', id=realm + '-datetime').text

            embed = discord.Embed(title='WoW Token Info', description=desc, colour=0xFFD966)
            embed.set_thumbnail(url=thumb_url)
            embed.add_field(name='Buy Price', value=buy_price, inline=False)
            embed.add_field(name='Change', value=trend, inline=False)
            #embed.set_footer(text='Updated: ' + updated)

            await self.bot.say(embed=embed)

        except:
            await self.bot.say("Error finding WoW token prices.")
kz.py 文件源码 项目:tmerc-cogs 作者: tmercswims 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _steam_url_to_text_id(self, server_id: str,
                                    vanityurl: str) -> str:
        api_key = self.settings[server_id]["steam_api_key"]

        url = "http://api.steampowered.com/ISteamUser/"
        "ResolveVanityURL/v0001/?key={}&vanityurl={}".format(
            api_key, vanityurl)

        steam64_id = None
        async with aiohttp.get(url) as res:
            response = json.loads(await res.text())["response"]
            if response["success"] != 1:
                raise SteamUrlError(
                    "'{}' is not a Steam vanity URL.".format(vanityurl))
            steam64_id = int(response["steamid"])

        account_id = steam64_id & ((1 << 32) - 1)
        universe = (steam64_id >> 56) & ((1 << 8) - 1)

        I = universe
        J = account_id & 1
        K = (account_id >> 1) & ((1 << 31) - 1)

        return "STEAM_{}:{}:{}".format(I, J, K)
genius.py 文件源码 项目:Sitryk-Cogs 作者: Sitryk 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def lyrics_from_path(path):
    """Gets the lyrics from a song path"""

    with requests.get(path) as page:
        html = BeautifulSoup(page.text, "html.parser")
        [h.extract() for h in html('script')]
        lyrics = html.find("div", class_="lyrics").get_text()
        return lyrics
cmdcommands.py 文件源码 项目:PTSCogs 作者: PlanetTeamSpeakk 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def on_ready(self):
        self.emptycogLoaded = os.path.exists('data/cmdcommands/emptycog.py')
        if not self.emptycogLoaded:
            print('Emptycog.py was not found, trying to download')
            try:
                async with aiohttp.get("https://raw.githubusercontent.com/PlanetTeamSpeakk/PTSCogs-attributes/master/emptycog.py") as r:
                    emptycog = await r.content.read()
                with open('data/cmdcommands/emptycog.py','wb') as f:
                    f.write(emptycog)
                print('Succesfully downloaded emptycog.py')
            except Exception as e:
                print(e)
                print("Error occured, did not download emptycog.py, go to https://raw.githubusercontent.com/PlanetTeamSpeakk/PTSCogs/master/emptycog.py press ctrl+s and save it in the data/cmdcommands/ folder.")
        else:
            pass
useful.py 文件源码 项目:PTSCogs 作者: PlanetTeamSpeakk 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def botowner(self, ctx):
        """Shows you who's boss!"""
        owner = discord.utils.get(self.bot.get_all_members(), id=self.bot.settings.owner)
        if owner != None:
            await self.bot.say("My owner is {}.".format(owner.mention))
        else:
            await self.bot.say("I don't know who my owner is ¯\_(?)_/¯.")
useful.py 文件源码 项目:PTSCogs 作者: PlanetTeamSpeakk 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def qrcode(self, ctx, url):
        """Creates a qrcode from a link."""
        shorten = Shortener('Bitly', bitly_token='dd800abec74d5b12906b754c630cdf1451aea9e0')
        short_link = shorten.short(url)
        async with aiohttp.get(shorten.qrcode(width=128, height=128)) as r:
            file = await r.content.read()
        number = random.randint(1000, 9999)
        fileloc = "data/useful/qrcode{}.png".format(number)
        with open(fileloc, 'wb') as f:
            f.write(file)
            file = None
            f = None
        await self.bot.send_file(ctx.message.channel, fp="data/useful/qrcode{}.png".format(number), filename="qrcode{}.png".format(number))
        os.remove("data/useful/qrcode{}.png".format(number))
useful.py 文件源码 项目:PTSCogs 作者: PlanetTeamSpeakk 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def time(self, ctx, *, place):
        """Get the time of a place somewhere on the earth
        Example:
        [p]time Los Angeles
        [p]time Amsterdam, Netherlands"""
        if "geocodingkey" not in self.settings or self.settings['geocodingkey'] == "key_here":
            await self.bot.say("The geocoding key is not yet set if you're my owner you can set it with {}setgeocodingkey.".format(ctx.prefix))
        elif "timezonekey" not in self.settings or self.settings['timezonekey'] == "key_here":
            await self.bot.say("The timezone key is not yet set if you're my owner you can set it with {}settimezonekey.".format(ctx.prefix))
        else:
            message = await self.bot.say("Getting data...")
            request = requests.get("https://maps.googleapis.com/maps/api/geocode/json?address={}&key={}".format(place, self.settings['geocodingkey'])).json()
            if request['status'] == "ZERO_RESULTS":
                await self.bot.edit_message(message, "Could not find any results for **{}**.".format(place))
            elif request['status'] == "OK":
                lng = request['results'][0]['geometry']['location']['lng']
                lat = request['results'][0]['geometry']['location']['lat']
                fulladdr = request['results'][0]['formatted_address']
                timestamp = int(datetime.datetime.utcnow().timestamp())
                request = requests.get("https://maps.googleapis.com/maps/api/timezone/json?location={},{}&timestamp={}&key={}".format(lat, lng, timestamp, self.settings['timezonekey'])).json()
                if request['status'] != "OK":
                    await self.bot.say("An unknown error occured while getting the time and timezone from the Google API.")
                else:
                    timestamp += request['dstOffset'] + request['rawOffset']
                    time = datetime.datetime.fromtimestamp(timestamp)
                    await self.bot.edit_message(message, "**{}**\n\t{} ({})".format(time.strftime("%d %b %Y %H:%M:%S"), fulladdr, request['timeZoneName']))
            else:
                await self.bot.say("An unknown error occured while getting the longitude and latitude from the Google API.")


问题


面经


文章

微信
公众号

扫码关注公众号