python类ClientError()的实例源码

clicksend.py 文件源码 项目:irisett 作者: beebyte 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def send_sms(recipients: Iterable[str], msg: str, username: str, api_key: str, sender: str):
    data = {
        'messages': [],
    }  # type: Dict[str, List]
    for recipient in recipients:
        data['messages'].append({
            'source': 'python',
            'from': sender,
            'body': msg[:140],
            'to': recipient,
            'schedule': ''
        })
    try:
        async with aiohttp.ClientSession(headers={'Content-Type': 'application/json'},
                                         auth=aiohttp.BasicAuth(username, api_key)) as session:
            async with session.post(CLICKSEND_URL, data=json.dumps(data), timeout=30) as resp:
                if resp.status != 200:
                    log.msg('Error sending clicksend sms notification: http status %s' % (str(resp.status)),
                            'NOTIFICATION')
    except aiohttp.ClientError as e:
        log.msg('Error sending clicksend sms notification: %s' % (str(e)), 'NOTIFICATIONS')
dlna_dmr.py 文件源码 项目:home-assistant-dlna-dmr 作者: StevenLooman 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def async_unsubscribe_all(self):
        """
        Disconnect from device.
        This removes all UpnpServices.
        """
        _LOGGER.debug('%s.async_disconnect()', self)

        if not self._device:
            return

        for service in self._device.services.values():
            try:
                sid = service.subscription_sid
                if sid:
                    self._callback_view.unregister_service(sid)
                    yield from service.async_unsubscribe(True)
            except (asyncio.TimeoutError, aiohttp.ClientError):
                pass
daemon.py 文件源码 项目:jd4 作者: vijos 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def do_record(self):
        self.tag = self.request.pop('tag')
        self.type = self.request.pop('type')
        self.domain_id = self.request.pop('domain_id')
        self.pid = self.request.pop('pid')
        self.rid = self.request.pop('rid')
        self.lang = self.request.pop('lang')
        self.code = self.request.pop('code')
        try:
            if self.type == 0:
                await self.do_submission()
            elif self.type == 1:
                await self.do_pretest()
            else:
                raise Exception('Unsupported type: {}'.format(self.type))
        except CompileError:
            self.end(status=STATUS_COMPILE_ERROR, score=0, time_ms=0, memory_kb=0)
        except ClientError:
            raise
        except Exception as e:
            logger.exception(e)
            self.next(judge_text=repr(e))
            self.end(status=STATUS_SYSTEM_ERROR, score=0, time_ms=0, memory_kb=0)
test_client.py 文件源码 项目:aioautomatic 作者: armills 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_ws_loop_exception(client):
    """Test websocket loop exception."""
    @asyncio.coroutine
    def side_effect(*args, **kwargs):
        raise aiohttp.ClientError("Mock Exception")
    mock_ws = AsyncMock()
    mock_ws.receive.side_effect = side_effect

    client._ws_connection = mock_ws
    client.ws_close = AsyncMock()
    client._handle_event = MagicMock()

    with pytest.raises(exceptions.TransportError):
        client.loop.run_until_complete(client._ws_loop())

    assert client.ws_close.called
    assert len(client.ws_close.mock_calls) == 1
    assert client._handle_event.called
    assert len(client._handle_event.mock_calls) == 1
    assert client._handle_event.mock_calls[0][1][0] == 'closed'
    assert client._handle_event.mock_calls[0][1][1] is None
test_client.py 文件源码 项目:aioautomatic 作者: armills 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_ws_close_exception(client):
    """Test websocket close exception."""
    @asyncio.coroutine
    def side_effect(*args, **kwargs):
        raise aiohttp.ClientError("Mock Exception")
    mock_ws = AsyncMock()
    mock_ws.send_str.side_effect = side_effect

    client._ws_connection = mock_ws
    client._ws_session_data = {}
    client._handle_event = MagicMock()

    client.loop.run_until_complete(client.ws_close())

    assert mock_ws.close.called
    assert len(mock_ws.close.mock_calls) == 1
    assert mock_ws.send_str.called
    assert len(mock_ws.send_str.mock_calls) == 1
    assert mock_ws.send_str.mock_calls[0][1][0] == '41'
client.py 文件源码 项目:aioautomatic 作者: armills 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def ws_connect(self):
        """Open a websocket connection for real time events."""
        if self.ws_connected:
            raise exceptions.TransportError('Connection already open.')

        _LOGGER.info("Opening websocket connection.")
        try:
            # Open an engineIO session
            session_data = yield from self._get_engineio_session()

            # Now that the session data has been fetched, open the actual
            # websocket connection.
            ws_connection = yield from self._get_ws_connection(session_data)

            # Finalize connection status
            self._ws_connection = ws_connection
            self._ws_session_data = session_data

            # Send the first ping packet
            self.loop.create_task(self._ping())
        except (ClientError, HttpProcessingError, asyncio.TimeoutError) as exc:
            raise exceptions.TransportError from exc
        return self.loop.create_task(self._ws_loop())
client.py 文件源码 项目:aioautomatic 作者: armills 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _ws_loop(self):
        """Run the websocket loop listening for messages."""
        msg = None
        try:
            while True:
                msg = yield from self._ws_connection.receive()
                if msg.type == aiohttp.WSMsgType.TEXT:
                    self._handle_packet(msg.data)
                elif msg.type == aiohttp.WSMsgType.CLOSED:
                    break
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    break
        except (ClientError, HttpProcessingError, asyncio.TimeoutError) as exc:
            raise exceptions.TransportError from exc
        finally:
            yield from self.ws_close()
            self._handle_event(EVENT_WS_CLOSED, None)
            if msg is not None and msg.type == aiohttp.WSMsgType.ERROR:
                raise exceptions.TransportError(
                    'Websocket error detected. Connection closed.')
client.py 文件源码 项目:aioautomatic 作者: armills 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def ws_close(self):
        """Close the websocket connection."""
        if not self.ws_connected:
            return

        # Try to gracefully end the connection
        try:
            yield from self._ws_connection.send_str('41')
            yield from self._ws_connection.send_str('1')
        except (ClientError, HttpProcessingError, asyncio.TimeoutError):
            pass

        # Close any remaining ping handles
        handle = self._ws_session_data.get(ATTR_PING_INTERVAL_HANDLE)
        if handle:
            handle.cancel()
        handle = self._ws_session_data.get(ATTR_PING_TIMEOUT_HANDLE)
        if handle:
            handle.cancel()

        yield from self._ws_connection.close()
        self._ws_connection = None
        self._ws_session_data = None
artifacts.py 文件源码 项目:scriptworker 作者: mozilla-releng 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def retry_create_artifact(*args, **kwargs):
    """Retry create_artifact() calls.

    Args:
        *args: the args to pass on to create_artifact
        **kwargs: the args to pass on to create_artifact

    """
    await retry_async(
        create_artifact,
        retry_exceptions=(
            ScriptWorkerRetryException,
            aiohttp.ClientError
        ),
        args=args,
        kwargs=kwargs
    )


# create_artifact {{{1
tester.py 文件源码 项目:ProxyPool 作者: Python3WebSpider 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_single_proxy(self, proxy):
        """
        ??????
        :param proxy:
        :return:
        """
        conn = aiohttp.TCPConnector(verify_ssl=False)
        async with aiohttp.ClientSession(connector=conn) as session:
            try:
                if isinstance(proxy, bytes):
                    proxy = proxy.decode('utf-8')
                real_proxy = 'http://' + proxy
                print('????', proxy)
                async with session.get(TEST_URL, proxy=real_proxy, timeout=15, allow_redirects=False) as response:
                    if response.status in VALID_STATUS_CODES:
                        self.redis.max(proxy)
                        print('????', proxy)
                    else:
                        self.redis.decrease(proxy)
                        print('???????? ', response.status, 'IP', proxy)
            except (ClientError, aiohttp.client_exceptions.ClientConnectorError, asyncio.TimeoutError, AttributeError):
                self.redis.decrease(proxy)
                print('??????', proxy)
memes.py 文件源码 项目:dogbot 作者: slice 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def orly(self, ctx, title, guide, author, *, top_text=''):
        """Generates O'Reilly book covers."""

        api_base = 'https://orly-appstore.herokuapp.com/generate?'

        url = (api_base +
               f'title={urlescape(title)}&top_text={urlescape(top_text)}&image_code={randrange(0, 41)}' +
               f'&theme={randrange(0, 17)}&author={urlescape(author)}&guide_text={urlescape(guide)}' +
               f'&guide_text_placement=bottom_right')

        try:
            async with ctx.typing():
                async with ctx.bot.session.get(url) as resp:
                    with BytesIO(await resp.read()) as bio:
                        await ctx.send(file=discord.File(filename='orly.png', fp=bio))
        except aiohttp.ClientError:
            await ctx.send("Couldn't contact the API.")
test_connection.py 文件源码 项目:aioelasticsearch 作者: wikibusiness 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_perform_request_ssl_error(auto_close, loop):
    for exc, expected in [
        (aiohttp.ClientConnectorCertificateError(mock.Mock(), mock.Mock()), SSLError),  # noqa
        (aiohttp.ClientConnectorSSLError(mock.Mock(), mock.Mock()), SSLError),
        (aiohttp.ClientSSLError(mock.Mock(), mock.Mock()), SSLError),
        (aiohttp.ClientError('Other'), ConnectionError),
        (asyncio.TimeoutError, ConnectionTimeout),
    ]:
        session = aiohttp.ClientSession(loop=loop)

        @asyncio.coroutine
        def request(*args, **kwargs):
            raise exc
        session._request = request

        conn = auto_close(AIOHttpConnection(session=session, loop=loop,
                                            use_ssl=True))
        with pytest.raises(expected):
            await conn.perform_request('HEAD', '/')
exporter.py 文件源码 项目:dump1090-exporter 作者: claws 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def fetch(url: str,
                session: aiohttp.ClientSession,
                timeout: float = None,
                loop: AbstractEventLoop = None) -> Dict[Any, Any]:
    ''' Fetch JSON format data from a web resource and return a dict '''
    try:
        logger.debug('fetching %s', url)
        async with session.get(url, timeout=timeout) as resp:
            if not resp.status == 200:
                raise Exception('Fetch failed {}: {}'.format(resp.status, url))
            data = await resp.json()
            return data
    except asyncio.TimeoutError:
        raise Exception('Request timed out to {}'.format(url)) from None
    except aiohttp.ClientError as exc:
        raise Exception('Client error {}, {}'.format(exc, url)) from None
transport.py 文件源码 项目:python-zeep 作者: mvantellingen 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _load_remote_data(self, url):
        result = None

        async def _load_remote_data_async():
            nonlocal result
            with aiohttp.Timeout(self.load_timeout):
                response = await self.session.get(url)
                result = await response.read()
                try:
                    response.raise_for_status()
                except aiohttp.ClientError as exc:
                    raise TransportError(
                        message=str(exc),
                        status_code=response.status,
                        content=result
                    ).with_traceback(exc.__traceback__) from exc

        # Block until we have the data
        self.loop.run_until_complete(_load_remote_data_async())
        return result
crawl.py 文件源码 项目:mp 作者: dongweiming 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def fetch(self, url, max_redirect):
        tries = 0
        exception = None
        while tries < self.max_tries:
            try:
                response = await self.session.get(
                    url, allow_redirects=False)
                break
            except aiohttp.ClientError as client_error:
                exception = client_error

            tries += 1
        else:
            return

        try:
            next_url = await self.parse_link(response)
            print('{} has finished'.format(url))
            if next_url is not None:
                self.add_url(next_url, max_redirect)
        finally:
            response.release()
crawl.py 文件源码 项目:LianJia_Crawl 作者: CodingCrush 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def handle(self, url):
        tries = 0
        while tries < self.max_tries:
            try:
                response = await self.session.get(
                    url, allow_redirects=False)
                break
            except aiohttp.ClientError:
                pass
            tries += 1
        try:
            doc = await self.fetch_etree(response)
            if is_root_url(url):
                print('root:{}'.format(url))
                self.parse_root_etree(doc)
            else:
                print('second level:{}'.format(url))
                self.parse_second_etree(doc, url)
        finally:
            await response.release()
app.py 文件源码 项目:arq 作者: samuelcolvin 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def download_content(self, url, count):
        total_size = 0
        errors = []
        start = time()
        for _ in range(count):
            try:
                async with self.session.get(url) as r:
                    content = await r.read()
                    total_size += len(content)
                    if r.status != 200:
                        errors.append(f'{r.status} length: {len(content)}')
            except ClientError as e:
                errors.append(f'{e.__class__.__name__}: {e}')
        output = f'{time() - start:0.2f}s, {count} downloads, total size: {total_size}'
        if errors:
            output += ', errors: ' + ', '.join(errors)
        await self.redis.rpush(R_OUTPUT, output.encode())
        return total_size
utils.py 文件源码 项目:socketshark 作者: closeio 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def http_post(shark, url, data):
    log = shark.log.bind(url=url)
    opts = shark.config['HTTP']
    if opts.get('ssl_cafile'):
        ssl_context = ssl.create_default_context(cafile=opts['ssl_cafile'])
    else:
        ssl_context = None
    conn = aiohttp.TCPConnector(ssl_context=ssl_context)
    async with aiohttp.ClientSession(connector=conn) as session:
        wait = opts['wait']
        for n in range(opts['tries']):
            if n > 0:
                await asyncio.sleep(wait)
            try:
                log.debug('http request', data=data)
                async with session.post(url, json=data,
                                        timeout=opts['timeout']) as resp:
                    if resp.status == 429:  # Too many requests.
                        wait = _get_rate_limit_wait(log, resp, opts)
                        continue
                    else:
                        wait = opts['wait']
                    resp.raise_for_status()
                    data = await resp.json()
                    log.debug('http response', data=data)
                    return data
            except aiohttp.ClientError:
                log.exception('unhandled exception in http_post')
            except asyncio.TimeoutError:
                log.exception('timeout in http_post')
        return {'status': 'error', 'error': c.ERR_SERVICE_UNAVAILABLE}
slack.py 文件源码 项目:irisett 作者: beebyte 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def send_slack_notification(url: str, attachments: List[Dict]):
    data = {
        'attachments': attachments
    }
    try:
        async with aiohttp.ClientSession() as session:
            async with session.post(url, data=json.dumps(data), timeout=30) as resp:
                if resp.status != 200:
                    log.msg('Error sending slack notification: http status %s' % (str(resp.status)),
                            'NOTIFICATION')
    except aiohttp.ClientError as e:
        log.msg('Error sending slack notification: %s' % (str(e)), 'NOTIFICATIONS')
http.py 文件源码 项目:irisett 作者: beebyte 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def send_http_notification(url: str, in_data: Any):
    out_data = json.dumps(in_data)
    try:
        async with aiohttp.ClientSession() as session:
            async with session.post(url, data=out_data, timeout=10) as resp:
                if resp.status != 200:
                    log.msg('Error sending http notification: http status %s' % (str(resp.status)),
                            'NOTIFICATION')
    except aiohttp.ClientError as e:
        log.msg('Error sending http notification: %s' % (str(e)), 'NOTIFICATIONS')
dlna_dmr.py 文件源码 项目:home-assistant-dlna-dmr 作者: StevenLooman 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def async_http_request(self, method, url, headers=None, body=None):
        websession = async_get_clientsession(self.hass)
        try:
            with async_timeout.timeout(5, loop=self.hass.loop):
                response = yield from websession.request(method, url, headers=headers, data=body)
                response_body = yield from response.text()
        except (asyncio.TimeoutError, aiohttp.ClientError) as ex:
            _LOGGER.debug("Error in %s.async_call_action(): %s", self, ex)
            raise

        return response.status, response.headers, response_body
dlna_dmr.py 文件源码 项目:home-assistant-dlna-dmr 作者: StevenLooman 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def async_update(self):
        """Retrieve the latest data."""
        _LOGGER.debug('%s.async_update()', self)
        if not self._device:
            _LOGGER.debug('%s.async_update(): no device', self)
            try:
                yield from self._async_init_device()
            except (asyncio.TimeoutError, aiohttp.ClientError):
                # Not yet seen alive, leave for now, gracefully
                _LOGGER.debug('%s._async_update(): device not seen yet, leaving', self)
                return

        # XXX TODO: if re-connected, then (re-)subscribe

        # call GetTransportInfo/GetPositionInfo regularly
        try:
            _LOGGER.debug('%s.async_update(): calling...', self)
            avt_service = self._service('AVT')
            if avt_service:
                get_transport_info_action = avt_service.action('GetTransportInfo')
                state = yield from self._async_poll_transport_info(get_transport_info_action)

                if state == STATE_PLAYING or state == STATE_PAUSED:
                    # playing something... get position info
                    get_position_info_action = avt_service.action('GetPositionInfo')
                    yield from self._async_poll_position_info(get_position_info_action)
            else:
                _LOGGER.debug('%s.async_update(): pinging...', self)
                yield from self._device.async_ping()

            self._is_connected = True
        except (asyncio.TimeoutError, aiohttp.ClientError) as ex:
            _LOGGER.debug('%s.async_update(): error on update: %s', self, ex)
            self._is_connected = False
            yield from self.async_unsubscribe_all()
upnp_client.py 文件源码 项目:home-assistant-dlna-dmr 作者: StevenLooman 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def async_unsubscribe(self, force=False):
        """UNSUBSCRIBE from events on StateVariables."""
        if not force and not self._subscription_sid:
            raise RuntimeError('Cannot unsubscribed, subscribe first')

        subscription_sid = self._subscription_sid
        if force:
            # we don't care what happens further, make sure we are unsubscribed
            self._subscription_sid = None

        headers = {
            'Host': urllib.parse.urlparse(self.event_sub_url).netloc,
            'SID': subscription_sid,
        }
        try:
            response_status, _, _ = \
                yield from self._requester.async_http_request('UNSUBSCRIBE',
                                                              self.event_sub_url,
                                                              headers)
        except (asyncio.TimeoutError, aiohttp.ClientError):
            if not force:
                raise
            return

        if response_status != 200:
            _LOGGER.error('Did not receive 200, but %s', response_status)
            return

        self._subscription_sid = None
task.py 文件源码 项目:scriptworker 作者: mozilla-releng 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def claim_work(context):
    """Find and claim the next pending task in the queue, if any.

    Args:
        context (scriptworker.context.Context): the scriptworker context.

    Returns:
        dict: a dict containing a list of the task definitions of the tasks claimed.

    """
    log.debug("Calling claimWork...")
    payload = {
        'workerGroup': context.config['worker_group'],
        'workerId': context.config['worker_id'],
        # Hardcode one task at a time.  Make this a pref if we allow for
        # parallel tasks in multiple `work_dir`s.
        'tasks': 1,
    }
    try:
        return await context.queue.claimWork(
            context.config['provisioner_id'],
            context.config['worker_type'],
            payload
        )
    except (taskcluster.exceptions.TaskclusterFailure, aiohttp.ClientError) as exc:
        log.warning("{} {}".format(exc.__class__, exc))
notification.py 文件源码 项目:Monocle 作者: Noctem 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def sendToTelegram(self):
        session = SessionManager.get()
        TELEGRAM_BASE_URL = "https://api.telegram.org/bot{token}/sendVenue".format(token=conf.TELEGRAM_BOT_TOKEN)
        title = self.name
        try:
            minutes, seconds = divmod(self.tth, 60)
            description = 'Expires at: {} ({:.0f}m{:.0f}s left)'.format(self.expire_time, minutes, seconds)
        except AttributeError:
            description = "It'll expire between {} & {}.".format(self.min_expire_time, self.max_expire_time)

        try:
            title += ' ({}/{}/{})'.format(self.attack, self.defense, self.stamina)
        except AttributeError:
            pass

        payload = {
            'chat_id': conf.TELEGRAM_CHAT_ID,
            'latitude': self.coordinates[0],
            'longitude': self.coordinates[1],
            'title' : title,
            'address' : description,
        }

        try:
            async with session.post(TELEGRAM_BASE_URL, data=payload) as resp:
                self.log.info('Sent a Telegram notification about {}.', self.name)
                return True
        except ClientResponseError as e:
            self.log.error('Error {} from Telegram: {}', e.code, e.message)
        except ClientError as e:
            self.log.error('{} during Telegram notification.', e.__class__.__name__)
        except CancelledError:
            raise
        except Exception:
            self.log.exception('Exception caught in Telegram notification.')
        return False
notification.py 文件源码 项目:Monocle 作者: Noctem 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def hook_post(self, w, session, payload, headers={'content-type': 'application/json'}):
        try:
            async with session.post(w, json=payload, timeout=4, headers=headers) as resp:
                return True
        except ClientResponseError as e:
            self.log.error('Error {} from webook {}: {}', e.code, w, e.message)
        except (TimeoutError, ServerTimeoutError):
            self.log.error('Response timeout from webhook: {}', w)
        except ClientError as e:
            self.log.error('{} on webhook: {}', e.__class__.__name__, w)
        except CancelledError:
            raise
        except Exception:
            self.log.exception('Error from webhook: {}', w)
        return False
test_views.py 文件源码 项目:PollBot 作者: mozilla 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_status_response_handle_client_errors(cli):
    async def error_task(product, version):
        raise ClientError('Error message')
    error_endpoint = status_response(error_task)
    request = mock.MagicMock()
    request.match_info = {"product": "firefox", "version": "57.0"}
    resp = await error_endpoint(request)
    assert json.loads(resp.body.decode()) == {
        "status": Status.ERROR.value,
        "message": "Error message",
    }
hotslogs_api.py 文件源码 项目:fogeybot 作者: mattgreen 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_maps(self):
        try:
            async with aiohttp.get("https://www.hotslogs.com/API/Data/Maps") as response:
                maps = await response.json()
                return [m["PrimaryName"] for m in maps]

        except aiohttp.ClientError:
            raise APIError()
hotslogs_api.py 文件源码 项目:fogeybot 作者: mattgreen 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_mmr(self, tag):
        if "#" not in tag:
            raise ValueError("battle tag must include '#'")

        try:
            async with aiohttp.get("https://www.hotslogs.com/API/Players/1/" + tag.replace("#", "_")) as r:
                response = await r.json()
        except aiohttp.ClientError:
            raise APIError()

        if not response:
            return MMRInfo(MMRInfo.NO_INFO)

        rankings = response.get("LeaderboardRankings")
        if not rankings:
            return MMRInfo(MMRInfo.NO_INFO)

        qm_mmr = 0
        hl_mmr = 0

        for ranking in rankings:
            if ranking["GameMode"] == "QuickMatch":
                qm_mmr = ranking["CurrentMMR"]
            elif ranking["GameMode"] == "HeroLeague":
                hl_mmr = ranking["CurrentMMR"]

        return MMRInfo(MMRInfo.PRESENT, qm_mmr, hl_mmr)
background.py 文件源码 项目:fake-useragent-cache-server 作者: pcinkh 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def ping(url, timeout, *, loop):
    async with aiohttp.ClientSession(loop=loop) as session:
        with async_timeout.timeout(timeout, loop=loop):
            try:
                async with session.get(url) as response:
                    logger.debug(response.status)
            except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
                logger.exception(exc)


问题


面经


文章

微信
公众号

扫码关注公众号