python类Awaitable()的实例源码

__main__.py 文件源码 项目:the-knights-who-say-ni 作者: python 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def handler(create_client: Callable[[], aiohttp.ClientSession], server: ni_abc.ServerHost,
            cla_records: ni_abc.CLAHost) -> Callable[[web.Request], Awaitable[web.Response]]:
    """Create a closure to handle requests from the contribution host."""
    async def respond(request: web.Request) -> web.Response:
        """Handle a webhook trigger from the contribution host."""
        async with create_client() as client:
            try:
                contribution = await ContribHost.process(server, request, client)
                usernames = await contribution.usernames()
                server.log("Usernames: " + str(usernames))
                trusted_users = server.trusted_users()
                usernames_to_check = usernames - trusted_users
                cla_status = await cla_records.check(client, usernames_to_check)
                server.log("CLA status: " + str(cla_status))
                # With a work queue, one could make the updating of the
                # contribution a work item and return an HTTP 202 response.
                await contribution.update(cla_status)
                return web.Response(status=http.HTTPStatus.OK)
            except ni_abc.ResponseExit as exc:
                return exc.response
            except Exception as exc:
                server.log_exception(exc)
                return web.Response(
                        status=http.HTTPStatus.INTERNAL_SERVER_ERROR)

    return respond
objects.py 文件源码 项目:jsonapi-client 作者: qvantel 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def fetch(self, cache_only=True) \
            -> 'Union[Awaitable[ResourceObject], ResourceObject]':
        if self.session.enable_async:
            return self.fetch_async(cache_only)
        else:
            return self.fetch_sync(cache_only)
resourceobject.py 文件源码 项目:jsonapi-client 作者: qvantel 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def commit(self, custom_url: str = '', meta: dict = None) \
            -> 'Union[None, ResourceObject, Awaitable[Optional[ResourceObject]]':
        """
        Commit (PATCH/POST) this resource to server.

        :param custom_url: Use this url instead of automatically determined one.
        :param meta: Optional metadata that is passed to server in POST/PATCH request

        If in async mode, this needs to be awaited.
        """
        if self.session.enable_async:
            return self._commit_async(custom_url, meta)
        else:
            return self._commit_sync(custom_url, meta)
relationships.py 文件源码 项目:jsonapi-client 作者: qvantel 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def filter(self, filter: 'Filter') -> 'Union[Awaitable[Document], Document]':
        """
        Receive filtered list of resources. Use Filter instance.

        If in async mode, this needs to be awaited.
        """
        if self.session.enable_async:
            return self._filter_async(filter)
        else:
            return self._filter_sync(filter)
relationships.py 文件源码 项目:jsonapi-client 作者: qvantel 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def fetch(self) -> 'Union[Awaitable[List[ResourceObject]], List[ResourceObject]]':
        """
        Fetch ResourceObjects. In practice this needs to be used only if in async mode
        and then this needs to be awaited.

        In blocking (sync) mode this is called automatically when .resource or
        .resources is accessed.
        """
        if self.session.enable_async:
            return self._fetch_async()
        else:
            return self._fetch_sync()
utils.py 文件源码 项目:graphscale 作者: schrockn 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def execute_gen(gen: Awaitable[T]) -> T:
    """It's useful, especially in the context of scripts and tests, so be able to
    synchronous execute async functions. This is a convenience for doing that.
    """
    loop = asyncio.new_event_loop()
    result = loop.run_until_complete(gen)
    loop.close()
    return result
utils.py 文件源码 项目:graphscale 作者: schrockn 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def async_zip(keys: Iterable[str], coros: Iterable[Awaitable[T]]) -> Dict[str, T]:
    return await async_dict(dict(zip(keys, coros)))
utils.py 文件源码 项目:graphscale 作者: schrockn 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def async_dict(coro_dict: Dict[str, Awaitable[T]]) -> Dict[str, T]:
    keys = list(coro_dict.keys())
    results = await async_list(list(coro_dict.values()))
    return OrderedDict(zip(keys, results))
utils.py 文件源码 项目:graphscale 作者: schrockn 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def async_list(coros: List[Awaitable[T]]) -> List[T]:
    """Use to await a list and return a list.
    Example: list_of_results = await async_list(list_of_gens)
    """
    return await asyncio.gather(*coros)
utils.py 文件源码 项目:graphscale 作者: schrockn 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def async_tuple(*coros: Awaitable) -> Tuple[Any, ...]:
    """Await on a parameters and get a tuple back.
    Example: result_one, result_two = await async_tuple(gen_one(), gen_two())
    """
    return tuple(await asyncio.gather(*coros))
connection.py 文件源码 项目:aredis 作者: NoneGG 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def connect(self):
        try:
            await self._connect()
        except Exception as exc:
            raise ConnectionError()
        # run any user callbacks. right now the only internal callback
        # is for pubsub channel/pattern resubscription
        for callback in self._connect_callbacks:
            task = callback(self)
            if isinstance(task, typing.Awaitable):
                await task
command.py 文件源码 项目:yui 作者: item4 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def only(*channels: Union[Type[DM], str], error: Optional[str]=None)\
        -> Callable[[Any, Event], Awaitable[bool]]:
    """Mark channel to allow to use handler."""

    allow_dm = False
    if DM in channels:
        channels = tuple(x for x in channels if x is not DM)
        allow_dm = True

    async def callback(bot, event: Event) -> bool:
        if isinstance(event.channel, (PrivateChannel, PublicChannel)):
            if event.channel.name in channels:
                return True
            else:
                if error:
                    await bot.say(
                        event.channel,
                        error
                    )
                return False

        if allow_dm:
            return True
        else:
            if error:
                await bot.say(
                    event.channel,
                    error
                )
            return False

    return callback
command.py 文件源码 项目:yui 作者: item4 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def not_(*channels: Union[Type[DM], str], error: Optional[str]=None) \
        -> Callable[[Any, Event], Awaitable[bool]]:
    """Mark channel to deny to use handler."""

    deny_dm = False
    if DM in channels:
        channels = tuple(x for x in channels if x is not DM)
        deny_dm = True

    async def callback(bot, event: Event) -> bool:
        if isinstance(event.channel, (PrivateChannel, PublicChannel)):
            if event.channel.name in channels:
                if error:
                    await bot.say(
                        event.channel,
                        error
                    )
                return False
            else:
                return True

        if deny_dm:
            if error:
                await bot.say(
                    event.channel,
                    error
                )
            return False
        else:
            return True

    return callback
method.py 文件源码 项目:venom 作者: biosustain 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __get__(self, instance: Service, owner: Type[Service] = None) -> 'Callable[[Any, Req], Awaitable[Res]]':
        pass

    # XXX MethodDescriptor.__get__() is not used anymore
method.py 文件源码 项目:venom 作者: biosustain 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __get__(self, instance: S, owner: Type[S] = None) -> 'Callable[[S, Req], Awaitable[Res]]':
        pass
image.py 文件源码 项目:plumeria 作者: sk89q 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def unfurl_image_url(url: str) -> Awaitable[str]:
    with DefaultClientSession() as session:
        results = await fetch_all(session, url)
        if 'twitter_cards' in results and 'image' in results['twitter_cards']:
            return results['twitter_cards']['image']
        if 'open_graph' in results and 'image' in results['open_graph']:
            return results['open_graph']['image']
        if 'oembed' in results and 'thumbnail_url' in results['oembed']:
            return results['oembed']['thumbnail_url']
        raise CommandError("Couldn't extract an image from the URL '{}'".format(url))
attachment.py 文件源码 项目:plumeria 作者: sk89q 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def read(self) -> Awaitable[bytes]:
        """
        Return the bytes of the file.

        Returns
        -------
        Awaitable[bytes]
            The file's data

        """
        raise NotImplemented()
http.py 文件源码 项目:mach9 作者: silver-castle 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, *, loop, request_handler: Awaitable,
                 log=None, signal=None, connections=set(), request_timeout=60,
                 request_max_size=None, has_log=True,
                 keep_alive=True, netlog=None):
        '''signal is shared'''
        self.loop = loop
        self.transport = None
        self.parser = None
        self.url = None
        self.headers = None
        self.body_channel = None
        self.message = None
        self.signal = signal
        self.has_log = has_log
        self.log = log
        self.netlog = netlog
        self.connections = connections
        self.request_handler = request_handler
        self.request_timeout = request_timeout
        self.request_max_size = request_max_size
        self._total_request_size = 0
        self._timeout_handler = None
        self._last_request_time = None
        self._request_handler_task = None
        self._request_stream_task = None
        self._is_upgrade = False
        # config.KEEP_ALIVE or not check_headers()['connection_close']
        self._keep_alive = keep_alive
maybe.py 文件源码 项目:amino 作者: tek 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def unsafe_await(self) -> "'Maybe[Awaitable]'":
        if self.is_just:
            ret = await cast(Callable[[], Awaitable], self._get)()
            return Maybe(ret)
        else:
            return cast(Maybe[Awaitable], self)
io.py 文件源码 项目:amino 作者: tek 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def coro(self) -> Awaitable[Either[IOException, A]]:
        async def coro() -> Either[IOException, A]:
            return self.attempt
        return coro()


问题


面经


文章

微信
公众号

扫码关注公众号