python类BasicAuth()的实例源码

nsfw.py 文件源码 项目:rerobot 作者: voqz 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def danbooru(self, message):
        """
        Get a random image from Danbooru

        :param message:
        :return:
        """
        cursor = self.db.server_backend.find({"serv_id": message.server.id})
        for c in cursor:
            nsfw_status = c['nsfw']['nsfw_status']
            nsfw_chan_id = c['nsfw']['nsfw_chan_id']

            if nsfw_status == "on|global" or nsfw_chan_id == message.channel.id:
                tag = message.content[10:]
                tags = str(tag).rstrip()
                if tags == "":
                    with aiohttp.ClientSession(
                            auth=aiohttp.BasicAuth(
                                login=settings["DANBOORU_ID"], password=settings["DANBOORU_PASSWORD"])) as session:
                        async with session.get("https://danbooru.donmai.us/posts.json?") as resp:
                            if resp.status == 200:
                                data = await resp.json()
                            else:
                                return

                else:
                    with aiohttp.ClientSession(
                            auth=aiohttp.BasicAuth(
                                login=settings["DANBOORU_ID"], password=settings["DANBOORU_PASSWORD"])) as session:
                        async with session.get("https://danbooru.donmai.us/posts.json?tags={}".format(tag)) as resp:
                            if resp.status == 200:
                                data = await resp.json()
                            else:
                                return
                if not data:
                    await self.ctx.send_message(message.channel,
                                                "**Error**: `Tag {} did not return any results.`".format(tag))
                    return

                data_len = len(data)
                if data_len == 0:
                    await self.ctx.send_message(message.channel,
                                                "**Error**: `Tag {} did not return any results.`".format(tag))
                    return
                try:
                    ran = random.randint(0, data_len - 1)
                    lucky = data[ran]
                    a = "https://danbooru.donmai.us"
                    img = a + lucky['file_url']
                    await self.ctx.send_message(message.channel, img)
                    return
                except KeyError:
                    pass
                except IndexError:
                    pass
pool.py 文件源码 项目:freehp 作者: jadbin 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, agent_addr, *, pool_size=100, block_time=3600, max_fail_times=2,
                 update_interval=300, auth=None, params=None, timeout=20, loop=None):
        """
        ProxyPool constructor.

        agent_addr - Proxy agent address.
        pool_size - (optional) The size of the pool.
        block_time - (optional) Time for blocking a proxy.
        max_fail_times - (optional) The maximum acceptable number of the continuous failure of a proxy.
        update_interval - (optional) Time interval to update the proxy list from proxy agent.
        auth - (optional) Http Basic Auth tuple.
        params - (optional) Prameters dictionary be sent in the query.
        timeout - (optional) Timeout when connects proxy agent.
        loop - (optional) Event loop.
        """
        self.agent_addr = agent_addr
        if loop is None:
            self.asyn = False
            self.loop = asyncio.new_event_loop()
        else:
            self.asyn = True
            self.loop = loop
        self.auth = auth
        if self.auth is not None:
            if isinstance(self.auth, tuple):
                self.auth = aiohttp.BasicAuth(*self.auth)
            elif not isinstance(self.auth, aiohttp.BasicAuth):
                raise TypeError('The type of "auth" must be tuple or aiohttp.BasicAuth')
        self.params = params or {}
        self.timeout = timeout

        self.update_interval = update_interval
        self._last_update = 0
        self._update_lock = asyncio.Lock(loop=self.loop)
        self.max_fail_times = max_fail_times

        self._proxies = {}

        self._pool = PriorityQueue(pool_size)
        self._pool_p = PriorityQueue(pool_size)
        self._pool_n = PriorityQueue(pool_size)

        self.backup_size = pool_size * 5
        self._backup = PriorityQueue(self.backup_size)
        self._backup_p = PriorityQueue(self.backup_size)
        self._backup_n = PriorityQueue(self.backup_size)

        self.block_time = block_time
        self._trash = {}
        self._block_queue = deque()
nsfw.py 文件源码 项目:Lapzbot_Beta 作者: lap00zza 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def danbooru(self, message):
        """
        Get a random image from Danbooru

        :param message:
        :return:
        """
        cursor = self.db.server_backend.find({"serv_id": message.server.id})
        for c in cursor:
            nsfw_status = c['nsfw']['nsfw_status']
            nsfw_chan_id = c['nsfw']['nsfw_chan_id']

            if nsfw_status == "on|global" or nsfw_chan_id == message.channel.id:
                tag = message.content[10:]
                tags = str(tag).rstrip()
                if tags == "":
                    with aiohttp.ClientSession(
                            auth=aiohttp.BasicAuth(
                                login=settings["DANBOORU_ID"], password=settings["DANBOORU_PASSWORD"])) as session:
                        async with session.get("https://danbooru.donmai.us/posts.json?") as resp:
                            if resp.status == 200:
                                data = await resp.json()
                            else:
                                return

                else:
                    with aiohttp.ClientSession(
                            auth=aiohttp.BasicAuth(
                                login=settings["DANBOORU_ID"], password=settings["DANBOORU_PASSWORD"])) as session:
                        async with session.get("https://danbooru.donmai.us/posts.json?tags={}".format(tag)) as resp:
                            if resp.status == 200:
                                data = await resp.json()
                            else:
                                return
                if not data:
                    await self.ctx.send_message(message.channel,
                                                "**Error**: `Tag {} did not return any results.`".format(tag))
                    return

                data_len = len(data)
                if data_len == 0:
                    await self.ctx.send_message(message.channel,
                                                "**Error**: `Tag {} did not return any results.`".format(tag))
                    return
                try:
                    ran = random.randint(0, data_len - 1)
                    lucky = data[ran]
                    a = "https://danbooru.donmai.us"
                    img = a + lucky['file_url']
                    await self.ctx.send_message(message.channel, img)
                    return
                except KeyError:
                    pass
                except IndexError:
                    pass
connection.py 文件源码 项目:aioelasticsearch 作者: wikibusiness 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(
        self,
        host='localhost',
        port=9200,
        http_auth=None,
        use_ssl=False,
        ssl_context=None,
        verify_certs=False,
        maxsize=10,
        headers=None,
        *,
        loop,
        **kwargs
    ):
        super().__init__(host=host, port=port, use_ssl=use_ssl, **kwargs)

        if headers is None:
            headers = {}
        self.headers = headers
        self.headers.setdefault('Content-Type', 'application/json')

        self.loop = loop

        if http_auth is not None:
            if isinstance(http_auth, aiohttp.BasicAuth):
                pass
            elif isinstance(http_auth, str):
                http_auth = aiohttp.BasicAuth(*http_auth.split(':', 1))
            elif isinstance(http_auth, (tuple, list)):
                http_auth = aiohttp.BasicAuth(*http_auth)
            else:
                raise TypeError("Expected str, list, tuple or "
                                "aiohttp.BasicAuth as http_auth parameter,"
                                "got {!r}".format(http_auth))

        self.http_auth = http_auth

        self.verify_certs = verify_certs

        self.base_url = URL.build(scheme='https' if self.use_ssl else 'http',
                                  host=host,
                                  port=port,
                                  path=self.url_prefix)

        self.session = kwargs.get('session')
        if self.session is None:
            self.session = aiohttp.ClientSession(
                auth=self.http_auth,
                connector=aiohttp.TCPConnector(
                    limit=maxsize,
                    use_dns_cache=kwargs.get('use_dns_cache', False),
                    ssl_context=ssl_context,
                    verify_ssl=self.verify_certs,
                    loop=self.loop,
                ),
            )
myanimelist.py 文件源码 项目:plumeria 作者: sk89q 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def manga(message):
    """
    Gets information about an manga using myanimelist.net.

    Example::

        /manga naruto

    """
    query = message.content.strip()
    if not len(query):
        raise CommandError("Supply the name of a manga to search.")
    auth = aiohttp.BasicAuth(username(), password())
    try:
        r = await http.get("https://myanimelist.net/api/manga/search.xml", params=[
            ('q', query)
        ], auth=auth)
    except BadStatusCodeError as e:
        if e.http_code in (204, 404):
            raise CommandError("No manga results for '{query}'.".format(query=query))
        raise
    doc = BeautifulSoup(r.text(), features="lxml")
    entries = doc.manga.find_all("entry", recursive=False)
    if not len(entries):
        raise CommandError("No results found.")
    entry = entries[0]
    return "{image}\n\n" \
           "**{name}** ({type})\n\n" \
           "**Status:** {status}\n" \
           "**Score:** {score}\n" \
           "**Chapters:** {chapters}\n" \
           "**Run Dates:** {start}-{end}\n\n" \
           "{synopsis}\n".format(
        image=entry.image.text,
        type=entry.type.text,
        name=entry.title.text,
        status=entry.status.text,
        score=entry.score.text,
        chapters=entry.chapters.text,
        start=entry.start_date.text,
        end=entry.end_date.text,
        synopsis=strip_html(entry.synopsis.text),
    )


问题


面经


文章

微信
公众号

扫码关注公众号