python类timeout()的实例源码

synology.py 文件源码 项目:homeassistant 作者: NAStools 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def async_camera_image(self):
        """Return a still image response from the camera."""
        image_url = SYNO_API_URL.format(
            self._synology_url, WEBAPI_PATH, self._camera_path)

        image_payload = {
            'api': CAMERA_API,
            'method': 'GetSnapshot',
            'version': '1',
            'cameraId': self._camera_id
        }
        try:
            with async_timeout.timeout(TIMEOUT, loop=self.hass.loop):
                response = yield from self._websession.get(
                    image_url,
                    params=image_payload
                )
        except (asyncio.TimeoutError, aiohttp.errors.ClientError):
            _LOGGER.exception("Error on %s", image_url)
            return None

        image = yield from response.read()
        yield from response.release()

        return image
request.py 文件源码 项目:backend.ai-client-py 作者: lablup 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def asend(self, *, sess=None, timeout=10.0):
        '''
        Sends the request to the server.

        This method is a coroutine.
        '''
        assert self.method in self._allowed_methods
        if sess is None:
            sess = aiohttp.ClientSession()
        else:
            assert isinstance(sess, aiohttp.ClientSession)
        with sess:
            if self.content_type == 'multipart/form-data':
                with aiohttp.MultipartWriter('mixed') as mpwriter:
                    for file in self._content:
                        part = mpwriter.append(file.file)
                        part.set_content_disposition('attachment',
                                                     filename=file.filename)
                data = mpwriter
            else:
                data = self._content
            self._sign()
            reqfunc = getattr(sess, self.method.lower())
            try:
                with _timeout(timeout):
                    resp = await reqfunc(self.build_url(),
                                         data=data,
                                         headers=self.headers)
                    async with resp:
                        body = await resp.read()
                        return Response(resp.status, resp.reason, body,
                                        resp.content_type,
                                        len(body))
            except Exception as e:
                msg = 'Request to the API endpoint has failed.\n' \
                      'Check your network connection and/or the server status.'
                raise BackendClientError(msg) from e
async_fetch_url.py 文件源码 项目:Software-Architecture-with-Python 作者: PacktPublishing 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def fetch_page(session, url, timeout=60):
    """ Asynchronous URL fetcher """

    with async_timeout.timeout(timeout):
        response = session.get(url)
        return response
async_fetch_url2.py 文件源码 项目:Software-Architecture-with-Python 作者: PacktPublishing 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def fetch_page(session, url, timeout=60):
    """ Asynchronous URL fetcher """

    with async_timeout.timeout(timeout):
        response = session.get(url)
        return response
dlna_dmr.py 文件源码 项目:home-assistant-dlna-dmr 作者: StevenLooman 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def async_http_request(self, method, url, headers=None, body=None):
        websession = async_get_clientsession(self.hass)
        try:
            with async_timeout.timeout(5, loop=self.hass.loop):
                response = yield from websession.request(method, url, headers=headers, data=body)
                response_body = yield from response.text()
        except (asyncio.TimeoutError, aiohttp.ClientError) as ex:
            _LOGGER.debug("Error in %s.async_call_action(): %s", self, ex)
            raise

        return response.status, response.headers, response_body
dummy_tv.py 文件源码 项目:home-assistant-dlna-dmr 作者: StevenLooman 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def async_notify_listeners(self, **kwargs):
        property = str(self.__class__.__name__)
        value = str(self.value)

        LOGGER.debug('async_notify_listeners(): %s -> %s', property, value)

        event_base = '<Event xmlns="urn:schemas-upnp-org:metadata-1-0/RCS/">' \
                     '<InstanceID val="0" />' \
                     '</Event>'
        el_event = ET.fromstring(event_base)
        el_instance_id = el_event.find('.//rcs:InstanceID', NS)
        args = kwargs.copy()
        args.update({'val': value})
        ET.SubElement(el_instance_id, 'rcs:' + property, **args)

        notify_base = '<?xml version="1.0" encoding="utf-8"?>' \
                      '<e:propertyset xmlns:e="urn:schemas-upnp-org:event-1-0">' \
                      '<e:property>' \
                      '<LastChange />' \
                      '</e:property>' \
                      '</e:propertyset>'
        el_notify = ET.fromstring(notify_base)
        el_last_change = el_notify.find('.//LastChange', NS)
        el_last_change.text = ET.tostring(el_event).decode('utf-8')

        global SUBSCRIBED_CLIENTS
        service_name = self.SERVICE_NAME
        for sid, url in SUBSCRIBED_CLIENTS[service_name].items():
            headers = {
                'SID': sid
            }
            with ClientSession(loop=asyncio.get_event_loop()) as session:
                with async_timeout.timeout(10):
                    data = ET.tostring(el_notify)
                    LOGGER.debug('Calling: %s', url)
                    yield from session.request('NOTIFY', url, headers=headers, data=data)
tools.py 文件源码 项目:ha-ffmpeg 作者: pvizeli 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run_test(self, input_source, timeout=15):
        """Start a test and give a TRUE or FALSE."""
        command = [
            "-frames:v",
            "1",
            "-frames:a",
            "1",
        ]

        # Run a short test with input
        is_open = yield from self.open(
            cmd=command, input_source=input_source, stderr_pipe=True,
            output=None)

        # error after open?
        if not is_open:
            return False

        try:
            with async_timeout.timeout(timeout, loop=self._loop):
                out, error = yield from self._proc.communicate()

        except (OSError, asyncio.TimeoutError, ValueError):
            _LOGGER.warning("Timeout/Error reading test.")
            self._proc.kill()
            return False

        # check error code
        if self._proc.returncode == 0:
            _LOGGER.debug("STD: %s / ERR: %s", out, error)
            return True

        # error state
        _LOGGER.error("ReturnCode: %i / STD: %s / ERR: %s",
                      self._proc.returncode, out, error)
        return False
tools.py 文件源码 项目:ha-ffmpeg 作者: pvizeli 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_image(self, input_source, output_format=IMAGE_JPEG, extra_cmd=None,
                  timeout=15):
        """Open FFmpeg process as capture 1 frame."""
        command = [
            "-an",
            "-frames:v",
            "1",
            "-c:v",
            output_format,
        ]

        # open input for capture 1 frame
        is_open = yield from self.open(
            cmd=command, input_source=input_source, output="-f image2pipe -",
            extra_cmd=extra_cmd)

        # error after open?
        if not is_open:
            _LOGGER.warning("Error starting FFmpeg.")
            return None

        # read image
        try:
            with async_timeout.timeout(timeout, loop=self._loop):
                image, _ = yield from self._proc.communicate()

            return image

        except (asyncio.TimeoutError, ValueError):
            _LOGGER.warning("Timeout reading image.")
            self._proc.kill()
            return None
core.py 文件源码 项目:ha-ffmpeg 作者: pvizeli 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def close(self, timeout=5):
        """Stop a ffmpeg instance.

        Return a coroutine
        """
        if self._read_task is not None and not self._read_task.cancelled():
            self._read_task.cancel()

        return super().close(timeout)
test_py35.py 文件源码 项目:async-timeout 作者: aio-libs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_async_timeout(loop):
    with pytest.raises(asyncio.TimeoutError):
        async with timeout(0.01, loop=loop) as cm:
            await asyncio.sleep(10, loop=loop)
    assert cm.expired
test_py35.py 文件源码 项目:async-timeout 作者: aio-libs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_async_no_timeout(loop):
    async with timeout(1, loop=loop) as cm:
        await asyncio.sleep(0, loop=loop)
    assert not cm.expired
test_py35.py 文件源码 项目:async-timeout 作者: aio-libs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_async_zero(loop):
    with pytest.raises(asyncio.TimeoutError):
        async with timeout(0, loop=loop) as cm:
            await asyncio.sleep(10, loop=loop)
    assert cm.expired
test_py35.py 文件源码 项目:async-timeout 作者: aio-libs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_async_zero_coro_not_started(loop):
    coro_started = False

    async def coro():
        nonlocal coro_started
        coro_started = True

    with pytest.raises(asyncio.TimeoutError):
        async with timeout(0, loop=loop) as cm:
            await asyncio.sleep(0, loop=loop)
            await coro()

    assert cm.expired
    assert coro_started is False
test_timeout.py 文件源码 项目:async-timeout 作者: aio-libs 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_timeout_global_loop(loop):
    asyncio.set_event_loop(loop)

    @asyncio.coroutine
    def run():
        with timeout(10) as t:
            yield from asyncio.sleep(0.01)
            assert t._loop is loop

    loop.run_until_complete(run())
test_timeout.py 文件源码 项目:async-timeout 作者: aio-libs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_timeout_disable(loop):
    @asyncio.coroutine
    def long_running_task():
        yield from asyncio.sleep(0.1, loop=loop)
        return 'done'

    t0 = loop.time()
    with timeout(None, loop=loop):
        resp = yield from long_running_task()
    assert resp == 'done'
    dt = loop.time() - t0
    assert 0.09 < dt < 0.13, dt
test_timeout.py 文件源码 项目:async-timeout 作者: aio-libs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_timeout_is_none_no_task(loop):
    with timeout(None, loop=loop) as cm:
        assert cm._task is None
test_timeout.py 文件源码 项目:async-timeout 作者: aio-libs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_timeout_enable_zero(loop):
    with pytest.raises(asyncio.TimeoutError):
        with timeout(0, loop=loop) as cm:
            yield from asyncio.sleep(0.1, loop=loop)

    assert cm.expired
test_timeout.py 文件源码 项目:async-timeout 作者: aio-libs 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_timeout_enable_zero_coro_not_started(loop):
    coro_started = False

    @asyncio.coroutine
    def coro():
        nonlocal coro_started
        coro_started = True

    with pytest.raises(asyncio.TimeoutError):
        with timeout(0, loop=loop) as cm:
            yield from asyncio.sleep(0, loop=loop)
            yield from coro()

    assert cm.expired
    assert coro_started is False
test_timeout.py 文件源码 项目:async-timeout 作者: aio-libs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_timeout_canceled_error_is_not_converted_to_timeout(loop):
    yield from asyncio.sleep(0, loop=loop)
    with pytest.raises(asyncio.CancelledError):
        with timeout(0.001, loop=loop):
            raise asyncio.CancelledError


问题


面经


文章

微信
公众号

扫码关注公众号