python类ClientSession()的实例源码

lagbot.py 文件源码 项目:lagbot 作者: mikevb1 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, *args, debug=False, **kwargs):
        self._debug = debug
        self.game = config.game
        game = discord.Game(name=self.game)
        status = discord.Status.dnd if self._debug else discord.Status.online
        super().__init__(*args, command_prefix=command_prefix,
                         game=game, status=status, **kwargs)
        self._before_invoke = self._before_invoke_
        self._after_invoke = self._after_invoke_
        self.resumes = 0
        useragent = 'Discord Bot'
        source = config.source
        if source is not None:
            useragent += ' ' + source
        self.http_ = aiohttp.ClientSession(loop=self.loop, headers={'User-Agent': useragent})
        self.db_pool = self.loop.run_until_complete(
            asyncpg.create_pool(dsn=config.pg_dsn, command_timeout=10, loop=self.loop))
animelist.py 文件源码 项目:Jumper-Cogs 作者: Redjumpman 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def credential_verfication(self, username, password):
        auth = aiohttp.BasicAuth(login=username, password=password)
        url = "https://myanimelist.net/api/account/verify_credentials.xml"
        with aiohttp.ClientSession(auth=auth) as session:
            async with session.get(url) as response:
                status = response.status

                if status == 200:
                    return True

                if status == 401:
                    await self.bot.say("Username and Password is incorrect.")
                    return False

                if status == 403:
                    await self.bot.say("Too many failed login attempts. Try putting in the"
                                       "correct credentials after some time has passed.")
                    return False
releases.py 文件源码 项目:PyPlanet 作者: PyPlanet 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def check(self, first_check=False):
        from pyplanet import __version__ as current_version

        logging.debug('Checking for new versions...')

        async with aiohttp.ClientSession() as session:
            async with session.get(self.url) as resp:
                for release in await resp.json():
                    if not release['draft'] and not release['prerelease']:
                        self.latest = release['tag_name']
                        break

                self.current = current_version
                logging.debug('Version check, your version: {}, online version: {}'.format(self.current, self.latest))

                if first_check and self.update_available:
                    logging.info('New version of PyPlanet available, consider updating: {}'.format(self.latest))
                    await self.instance.chat(
                        '\uf1e6 $FD4$oPy$369Planet$z$s$fff \uf0e7 new version available: v{}. Consider updating!'.format(self.latest)
                    )
analytics.py 文件源码 项目:PyPlanet 作者: PyPlanet 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def start(self, instance):
        """
        Initiate the analytics.

        :param instance: Instance of controller.
        :type instance: pyplanet.core.instance.Instance
        """
        self.instance = instance
        self.client = aiohttp.ClientSession()

        try:
            await self.capture('start_controller', dict(
                app_labels=list(self.instance.apps.apps.keys())
            ))
        except:
            pass

        # Start report loop
        asyncio.ensure_future(self.loop())
aiodownload.py 文件源码 项目:aiodownload 作者: jelloslinger 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, client=None, download_strategy=None, request_strategy=None):

        if not client:
            # Get the event loop and initialize a client session if not provided
            self.loop = asyncio.get_event_loop()
            self.client = aiohttp.ClientSession(loop=self.loop)
        else:
            # Or grab the event loop from the client session
            self.loop = client._loop
            self.client = client

        # Configuration objects managing download and request strategies
        self._download_strategy = download_strategy or DownloadStrategy()  # chunk_size, home, skip_cached
        self._request_strategy = request_strategy or Lenient()  # concurrent, max_attempts, timeout

        # Bounded semaphore guards how many requests can run concurrently
        self._main_semaphore = asyncio.BoundedSemaphore(self._request_strategy.concurrent)
api.py 文件源码 项目:mblog 作者: moling3650 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def oauth2(code):
    url = 'https://api.weibo.com/oauth2/access_token'
    payload = {
        'client_id': '366603916',
        'client_secret': 'b418efbd77094585d0a7f9ccac98a706',
        'grant_type': 'authorization_code',
        'code': code,
        'redirect_uri': 'http://www.qiangtaoli.com'
    }
    with ClientSession() as session:
        async with session.post(url, data=payload) as resp:
            params = await resp.json()
        async with session.get('https://api.weibo.com/2/users/show.json', params=params) as resp:
            info = await resp.json()
        o = await Oauth.find('weibo-' + info['idstr'])
        if not o:
            return 'redirect:/bootstrap/register?oid=weibo-%s&name=%s&image=%s' % (info['idstr'], info['name'], info['avatar_large'])
        user = await User.find(o.user_id)
        if not user:
            return 'oauth user was deleted.'
        return user.signin(web.HTTPFound('/'))


# ????
request.py 文件源码 项目:backend.ai-client-py 作者: lablup 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def connect_websocket(self, sess=None):
        '''
        Creates a WebSocket connection.
        '''
        assert self.method == 'GET'
        if sess is None:
            sess = aiohttp.ClientSession()
        else:
            assert isinstance(sess, aiohttp.ClientSession)
        try:
            ws = await sess.ws_connect(self.build_url(), headers=self.headers)
            return sess, ws
        except Exception as e:
            msg = 'Request to the API endpoint has failed.\n' \
                  'Check your network connection and/or the server status.'
            raise BackendClientError(msg) from e
test_request.py 文件源码 项目:backend.ai-client-py 作者: lablup 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_asend_with_appropriate_method(mocker, req_params):
    req = Request(**req_params)
    methods = Request._allowed_methods
    for method in methods:
        req.method = method

        mock_reqfunc = mocker.patch.object(
            aiohttp.ClientSession, method.lower(), autospec=True)

        assert mock_reqfunc.call_count == 0
        try:
            # Ignore exceptions in `async with` statement. We're only
            # interested in request call here.
            await req.asend()
        except BackendClientError:
            pass
        mock_reqfunc.assert_called_once_with(
            mocker.ANY, req.build_url(), data=req._content, headers=req.headers)
test_request.py 文件源码 项目:backend.ai-client-py 作者: lablup 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_asend_returns_appropriate_sorna_response(mocker, req_params,
                                                        mock_sorna_aresp):
    req = Request(**req_params)
    methods = Request._allowed_methods
    for method in methods:
        req.method = method

        mock_reqfunc = mocker.patch.object(
            aiohttp.ClientSession, method.lower(),
            new_callable=asynctest.CoroutineMock
        )
        mock_reqfunc.return_value, conf = mock_sorna_aresp

        resp = await req.asend()

        assert isinstance(resp, Response)
        assert resp.status == conf['status']
        assert resp.reason == conf['reason']
        assert resp.content_type == conf['content_type']
        body = await conf['read']()
        assert resp.content_length == len(body)
        assert resp.text() == body.decode()
        assert resp.json() == json.loads(body.decode())
wayback.py 文件源码 项目:waybackscraper 作者: abrenaut 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def run_scraping(url, timestamps, scrape_function, concurrency, user_agent):
    """
    Run the scraping function asynchronously on the given archives.
    The concurrency parameter limits the number of concurrent connections to the web archive.
    """
    # Use a semaphore to limit the number of concurrent connections to the internet archive
    sem = asyncio.Semaphore(concurrency)

    # Use one session to benefit from connection pooling
    async with aiohttp.ClientSession(headers={'User-Agent': user_agent}) as session:
        # Create scraping coroutines for each archive
        coroutines = [scrape_archive(session, url, timestamp, scrape_function, sem) for timestamp in timestamps]

        # Wait for coroutines to finish and gather the results
        results = await asyncio.gather(*coroutines)

    # Compile each valid scraping results in a dictionary
    return {timestamp: result for timestamp, result in results if result is not None}
rest.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def aget(self, **kw):
        session = aiohttp.ClientSession()
        url = URL(self.url)
        if kw:
            url = url.with_query(**kw)
        logger.debug("GET %s", url)
        try:
            response = yield from session.get(url, timeout=10)
            payload = yield from response.read()
        finally:
            yield from session.close()
        response.raise_for_status()
        payload = payload.decode('utf-8')
        if response.content_type == 'text/x-python':
            payload = ast.literal_eval(payload)
        return Payload.factory(response.status, response.headers, payload)
rest.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def apost(self, headers={}, data=None, **kw):
        session = aiohttp.ClientSession()
        url = URL(self.url)
        if kw:
            url = url.with_query(**kw)
        logger.debug("POST %s", url)
        try:
            response = yield from session.post(
                url, headers=headers, data=data, timeout=10,
            )
            payload = yield from response.read()
        finally:
            yield from session.close()
        response.raise_for_status()
        payload = payload.decode('utf-8')
        return Payload.factory(response.status, response.headers, payload)
http_agent.py 文件源码 项目:oshino 作者: CodersOfTheNight 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def process(self, event_fn):
        logger = self.get_logger()
        ts = time()
        state = None
        async with aiohttp.ClientSession() as session:
            async with session.get(self.url) as resp:
                state = self.translate_status(resp.status)
        te = time()
        span = int((te - ts) * 1000)
        logger.debug("Request to {url} returned status code {code}(as {state})"
                     "in {span} milliseconds.".format(url=self.url,
                                                      code=resp.status,
                                                      state=state,
                                                      span=span))

        event_fn(service=self.prefix + "health",
                 metric_f=span,
                 state=str(state),
                 description=self.url)
openstack.py 文件源码 项目:rci 作者: seecloud 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, auth_url, username, tenant, loop=None, log=None,
                 cafile=None, token_renew_delay=3300):
        self.auth_url = auth_url
        self.username = username
        self.tenant = tenant
        self.log = log
        self.token_renew_delay = token_renew_delay
        self.loop = loop or asyncio.get_event_loop()
        self.headers = {"content-type": "application/json",
                        "accept": "application/json"}
        if cafile:
            sslcontext = ssl.create_default_context(cafile=cafile)
            conn = aiohttp.TCPConnector(ssl_context=sslcontext)
            self.session = aiohttp.ClientSession(connector=conn, loop=self.loop)
        else:
            session = aiohttp.ClientSession(loop=self.loop)
clicksend.py 文件源码 项目:irisett 作者: beebyte 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def send_sms(recipients: Iterable[str], msg: str, username: str, api_key: str, sender: str):
    data = {
        'messages': [],
    }  # type: Dict[str, List]
    for recipient in recipients:
        data['messages'].append({
            'source': 'python',
            'from': sender,
            'body': msg[:140],
            'to': recipient,
            'schedule': ''
        })
    try:
        async with aiohttp.ClientSession(headers={'Content-Type': 'application/json'},
                                         auth=aiohttp.BasicAuth(username, api_key)) as session:
            async with session.post(CLICKSEND_URL, data=json.dumps(data), timeout=30) as resp:
                if resp.status != 200:
                    log.msg('Error sending clicksend sms notification: http status %s' % (str(resp.status)),
                            'NOTIFICATION')
    except aiohttp.ClientError as e:
        log.msg('Error sending clicksend sms notification: %s' % (str(e)), 'NOTIFICATIONS')
client.py 文件源码 项目:Pyanimelist 作者: GetRektByMe 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_user_series(self, username: str, series_type: str):
        """
        :param username: The name of the accounts information you're trying to get
        :param series_type: If you're looking for manga or anime
        :return type list:
        """
        params = {
            "u": username,
            "status": 'all',
            "type": series_type
        }
        if series_type not in ("anime", "manga"):
            raise InvalidSeriesTypeException
        else:
            with aiohttp.ClientSession(auth=self._auth, headers={"User-Agent": self.user_agent}) as session:
                async with session.get(MAL_APP_INFO, params=params) as response:
                    # Raise an error if we get the wrong response code
                    if response.status != 200:
                        raise ResponseError(response.status)
                    # Get the response text and set parser
                    soup = bs4.BeautifulSoup(await response.text(), "lxml")
                    return [dict(self.process_(child) for child in anime.children) for anime in soup.find_all(series_type)]
    # End of bit Zeta wrote
client.py 文件源码 项目:Pyanimelist 作者: GetRektByMe 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_user_data(self, user: str) -> UserInfo:
        """
        :param user: username who's information we're getting
        :return type list:
        """
        # List that stores all the UserInfo Objects to return
        with aiohttp.ClientSession(auth=self._auth, headers={"User-Agent": self.user_agent}) as session:
            async with session.get(MAL_APP_INFO, params={"u": user}) as response:
                # Raise an error if we get the wrong response code
                if response.status != 200:
                    raise ResponseError(response.status)
                response_data = await response.read()
                # We want the [0] index as myanimelist always returns the user data first
                user_info = etree.fromstring(response_data)[0]
                # Add to list containing UserInfo objects
                return UserInfo(
                    id=user_info.find("user_id").text,
                    username=user_info.find("user_name").text,
                    watching=user_info.find("user_watching").text,
                    completed=user_info.find("user_completed").text,
                    on_hold=user_info.find("user_onhold").text,
                    dropped=user_info.find("user_dropped").text,
                    plan_to_watch=user_info.find("user_plantowatch").text,
                    days_spent_watching=user_info.find("user_days_spent_watching").text
                )
Internet.py 文件源码 项目:Sparcli 作者: 4Kaylum 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, sparcli):
        self.sparcli = sparcli
        self.urbanSite = 'https://api.urbandictionary.com/v0/define?term={}'
        self.wolfClient = None
        self.nounlist = []

        # Set up Wolfram
        if wolframalphaImported == True:
            try:
                tokens = getTokens()
                secret = tokens['WolframAlpha']['Secret']
                self.wolfClient = Client(secret)
            except KeyError:
                pass

        # Set up noun list
        self.nounlist = [] 

        self.session = ClientSession(loop=sparcli.loop)
io.py 文件源码 项目:heroku-log-lights 作者: codingjoe 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def read_stream(app_name, auth_token):
    while True:
        stream_url = yield from get_stream_url(app_name, auth_token)
        print('Reading stream: %s' % stream_url)
        log = b''
        with aiohttp.ClientSession() as session:
            response = yield from session.get(stream_url)
            while True:
                try:
                    chunk = yield from response.content.read(1)
                except aiohttp.ServerDisconnectedError:
                    break
                if not chunk:
                    break
                if chunk == b'\n':
                    try:
                        yield from write_to_queue(log)
                    except ValueError as e:
                        print(str(e))
                    log = b''
                else:
                    log += chunk
test_client.py 文件源码 项目:peony-twitter 作者: odrling 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_request_proxy(dummy_client):
    class RaiseProxy:

        def __init__(self, *args, proxy=None, **kwargs):
            raise RuntimeError(proxy)

    async with aiohttp.ClientSession() as session:
        with patch.object(session, 'request', side_effect=RaiseProxy):
            try:
                await dummy_client.request(method='get',
                                           url="http://hello.com",
                                           proxy="http://some.proxy.com",
                                           session=session,
                                           future=asyncio.Future())
            except RuntimeError as e:
                assert str(e) == "http://some.proxy.com"
test_stream.py 文件源码 项目:peony-twitter 作者: odrling 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def test_stream_cancel(event_loop):
    async def cancel(task):
        await asyncio.sleep(0.001)
        task.cancel()

    async def test_stream_iterations(stream):
        while True:
            await test_stream_iteration(stream)

    with aiohttp.ClientSession(loop=event_loop) as session:
        client = peony.client.BasePeonyClient("", "", session=session)
        context = peony.stream.StreamResponse(method='GET',
                                              url="http://whatever.com",
                                              client=client)

        with context as stream:
            with patch.object(stream, '_connect',
                              side_effect=stream_content):
                coro = test_stream_iterations(stream)
                task = event_loop.create_task(coro)
                cancel_task = event_loop.create_task(cancel(task))

                with aiohttp.Timeout(1):
                    await asyncio.wait([task, cancel_task])
client.py 文件源码 项目:peony-twitter 作者: odrling 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _setup(self):
        if self._session is None:
            logger.debug("Creating session")
            self._session = aiohttp.ClientSession()

        # this will allow requests to be made starting from this point
        self.setup.early.set_result(True)

        init_tasks = self.init_tasks
        if callable(init_tasks):
            init_tasks = init_tasks()

        if init_tasks:
            logger.debug("Starting init tasks")
            await asyncio.gather(*init_tasks)

        self.setup.set_result(True)
main.py 文件源码 项目:Godavaru 作者: Godavaru 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def on_guild_join(server):
    server_count = len(bot.guilds)
    member_count = 0
    for server in bot.guilds:
        for member in server.members:
            member_count += 1
    await bot.change_presence(game=discord.Game(name=bot.command_prefix[0]+"help | {} guilds with {} members.".format(server_count, member_count)))
    webhook.send(':tada: [`'+str(datetime.datetime.now().strftime("%d/%m/%y %H:%M:%S"))+'`] I joined the server `' + server.name + '` ('+ str(server.id) + '), owned by `' + server.owner.name + '#' + server.owner.discriminator + '` (' + str(server.owner.id) + ').')
    guild_count = len(bot.guilds)
    headers = {'Authorization': config['Main']['dbotstoken']}
    data = {'server_count': guild_count}
    api_url = 'https://discordbots.org/api/bots/311810096336470017/stats'
    async with aiohttp.ClientSession() as session:
        await session.post(api_url, data=data, headers=headers)

# server leave
test_sessions.py 文件源码 项目:aiovk 作者: Fahreeve 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_auth_with_valid_data(self):
        s = TestAuthSession(login=USER_LOGIN, password=USER_PASSWORD, app_id=APP_ID)
        s.driver.session = aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False),
                                                 response_class=CustomClientResponse)
        yield from s.authorize()
        params = {'client_id': APP_ID, 'display': 'page', 'redirect_uri': REDIRECT_URI, 'response_type': 'code'}
        with aiohttp.Timeout(10):
            response = yield from s.driver.session.get("https://oauth.vk.com/authorize",
                                                       params=params, allow_redirects=True)
        s.close()
        code = response.url.query.get('code')
        self.assertIsNotNone(code)

        s = AuthorizationCodeSession(APP_ID, APP_SECRET, REDIRECT_URI, code)
        yield from s.authorize()
        s.close()
        self.assertIsNotNone(s.access_token)
automation.py 文件源码 项目:AutomaBot 作者: 73VW 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def set(self, ctx, lamp, state):
        """
        Set lamp state.

        Changes state of [lamp] to [state] and sends server response
        """
        tmp = await self.bot.send_message(ctx.message.channel, "Requesting")
        payload = {'lamp': lamp, 'state': state, 'user_agent': 'AutomaBot'}
        r = ""

        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(self.url_post, data=payload) as resp:
                    r = await resp.text()
        except aiohttp.errors.ClientOSError:
            raise APIconnectionError()

        embed = make_embed_message("*Result!*", json.loads(r), self.bot,
                                   ctx.message)
        await self.bot.edit_message(tmp, new_content='Right now : ',
                                    embed=embed)
traversal.py 文件源码 项目:plone.server 作者: plone 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def subrequest(
        orig_request, path, relative_to_site=True,
        headers={}, body=None, params=None, method='GET'):
    """Subrequest, initial implementation doing a real request."""
    session = aiohttp.ClientSession()
    method = method.lower()
    if method not in SUBREQUEST_METHODS:
        raise AttributeError('No valid method ' + method)
    caller = getattr(session, method)

    for head in orig_request.headers:
        if head not in headers:
            headers[head] = orig_request.headers[head]

    params = {
        'headers': headers,
        'params': params
    }
    if method in ['put', 'patch']:
        params['data'] = body

    return caller(path, **params)
worker.py 文件源码 项目:scriptworker 作者: mozilla-releng 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def main():
    """Scriptworker entry point: get everything set up, then enter the main loop."""
    context, credentials = get_context_from_cmdln(sys.argv[1:])
    log.info("Scriptworker starting up at {} UTC".format(arrow.utcnow().format()))
    cleanup(context)
    conn = aiohttp.TCPConnector(limit=context.config['aiohttp_max_connections'])
    loop = asyncio.get_event_loop()
    with aiohttp.ClientSession(connector=conn) as session:
        context.session = session
        context.credentials = credentials
        while True:
            try:
                loop.run_until_complete(async_main(context))
            except Exception:
                log.critical("Fatal exception", exc_info=1)
                raise
altitudes.py 文件源码 项目:Monocle 作者: Noctem 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def fetch(self, point, key=conf.GOOGLE_MAPS_KEY):
        if not key:
            return self.fallback()
        try:
            async with ClientSession(loop=LOOP) as session:
                async with session.get(
                        'https://maps.googleapis.com/maps/api/elevation/json',
                        params={'locations': '{0[0]},{0[1]}'.format(point),
                                'key': key},
                        timeout=10) as resp:
                    response = await resp.json(loads=json_loads)
                    altitude = response['results'][0]['elevation']
                    self.altitudes[point] = altitude
                    self.changed = True
                    return altitude
        except CancelledError:
            raise
        except Exception:
            try:
                self.log.error(response['error_message'])
            except (KeyError, NameError):
                self.log.error('Error fetching altitude for {}.', point)
            return self.fallback()
__init__.py 文件源码 项目:python_api 作者: DomainTools 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __awaitable__(self):
        if self._data is None:
            with aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=self.api.verify_ssl)) as session:
                wait_time = self._wait_time()
                if wait_time is None and self.api:
                    try:
                        await self._make_async_request(session)
                    except ServiceUnavailableException:
                        await asyncio.sleep(60)
                        self._wait_time()
                        await self._make_async_request(session)
                else:
                    await asyncio.sleep(wait_time)
                    await self._make_async_request(session)

        return self
commands.py 文件源码 项目:ridings-cogs 作者: ridinginstyle00 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def sfp(self, ctx, user: discord.Member=None):
        """Super Falcon Punch"""
        author = ctx.message.author
        if not user:
            await self.bot.say("{} has Super Falcon Punched!".format(author.mention))
            with aiohttp.ClientSession() as session:
                async with session.get("https://cdn.discordapp.com/attachments/172354611477348352/193299243539234817/imgres.jpg") as resp:
                    test = await resp.read()
                    with open("data/commands/Images/imgres.png", "wb") as f:
                        f.write(test)
            await self.bot.upload("data/commands/Images/imgres.png")
        else:
            await self.bot.say("{} has Super Falcon Punched {} and blew him away!".format(author.mention, user.mention))
            with aiohttp.ClientSession() as session:
                async with session.get("https://cdn.discordapp.com/attachments/172354611477348352/193299243539234817/imgres.jpg") as resp:
                    test = await resp.read()
                    with open("data/commands/Images/imgres.png", "wb") as f:
                        f.write(test)
            await self.bot.upload("data/commands/Images/imgres.png")


问题


面经


文章

微信
公众号

扫码关注公众号