python类isawaitable()的实例源码

query.py 文件源码 项目:ormageddon 作者: renskiy 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _map(callback, *iterables, loop=None):
    if any(map(inspect.isawaitable, iterables)):
        return map_async(callback, *iterables, loop=loop)
    return map(callback, *iterables)
query.py 文件源码 项目:ormageddon 作者: renskiy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _zip(*iterables, loop=None):
    assert not all(map(inspect.isawaitable, iterables))
    return zip(*ensure_iterables(*iterables, loop=loop))
query.py 文件源码 项目:ormageddon 作者: renskiy 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def execute(self):
        loop = self.database.loop
        with contextlib.ExitStack() as exit_stack:
            exit_stack.enter_context(patch(self, '_execute', _QueryExecutor(self._execute, loop=loop)))
            exit_stack.enter_context(patch(peewee, 'map', functools.partial(_map, loop=loop), map))
            exit_stack.enter_context(patch(peewee, 'zip', functools.partial(_zip, loop=loop), zip))
            result = super().execute()
            if inspect.isawaitable(result):
                return await result
            elif isinstance(result, list):
                return await asyncio.gather(*result, loop=loop)
            return result
utils.py 文件源码 项目:ormageddon 作者: renskiy 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def ensure_iterables(*iterables, loop=None):
    for iterable in iterables:
        if inspect.isawaitable(iterable):
            future = asyncio.ensure_future(iterable, loop=loop)
            yield future_iterator(future)
        else:
            yield iterable
utils.py 文件源码 项目:ormageddon 作者: renskiy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def force_future(entity, loop=None):
    if inspect.isawaitable(entity):
        return entity
    future = asyncio.Future(loop=loop)
    future.set_result(entity)
    return future
asyncsupport.py 文件源码 项目:RealtimePythonChat 作者: quangtqag 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def auto_await(value):
    if inspect.isawaitable(value):
        return await value
    return value
asyncsupport.py 文件源码 项目:Indushell 作者: SecarmaLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def auto_await(value):
    if inspect.isawaitable(value):
        return await value
    return value
admin.py 文件源码 项目:SESTREN 作者: SirThane 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _eval(self, ctx, *, code: str):
        """Run eval() on an input."""

        code = code.strip('` ')
        python = '```py\n{0}\n```'
        env = self.env(ctx)

        try:
            result = eval(code, env)
            if inspect.isawaitable(result):
                result = await result
            result = str(result)[:1014]
            color = 0x00FF00
            field = {
                'inline': False,
                'name': 'Yielded result:',
                'value': python.format(result)
            }
        except Exception as e:
            color = 0xFF0000
            field = {
                'inline': False,
                'name': 'Yielded exception "{0.__name__}":'.format(type(e)),
                'value': '{0} '.format(e)
            }

        embed = discord.Embed(title="Eval on:", description=python.format(code), color=color)
        embed.add_field(**field)

        await ctx.send(embed=embed)
asyncsupport.py 文件源码 项目:Liljimbo-Chatbot 作者: chrisjim316 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def auto_await(value):
    if inspect.isawaitable(value):
        return await value
    return value
asyncsupport.py 文件源码 项目:flask_system 作者: prashasy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def auto_await(value):
    if inspect.isawaitable(value):
        return await value
    return value
__init__.py 文件源码 项目:pcbot 作者: pckv 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def call_reload(name: str):
    """ Initiates reload of plugin. """
    # See if the plugin has an on_reload() function, and call that
    if hasattr(plugins[name], "on_reload"):
        if callable(plugins[name].on_reload):
            result = plugins[name].on_reload(name)
            if inspect.isawaitable(result):
                await result
    else:
        await on_reload(name)
asyncsupport.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def auto_await(value):
    if inspect.isawaitable(value):
        return await value
    return value
asyncsupport.py 文件源码 项目:FileStoreGAE 作者: liantian-cn 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def auto_await(value):
    if inspect.isawaitable(value):
        return await value
    return value
gui.py 文件源码 项目:goldmine 作者: Armored-Dragon 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __call__(self, *args, **kwargs):
        print('called fake')
        res = self.attr(*args, **kwargs)
        print('calling regular' + str(args))
        print(self.attr)
        if inspect.isawaitable(res):
            self.iface.loop.create_task(res)
syncer.py 文件源码 项目:syncer 作者: miyakogi 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _is_awaitable(co: Generator[Any, None, Any]) -> bool:
    if PY35:
        return inspect.isawaitable(co)
    else:
        return (isinstance(co, types.GeneratorType) or
                isinstance(co, asyncio.Future))
manager.py 文件源码 项目:curious 作者: SunDwarf 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def handle_commands(self, ctx: EventContext, message: Message):
        """
        Handles commands for a message.
        """
        # check bot type
        if message.author.user.bot and self.client.bot_type & 8:
            return

        if message.author.user != self.client.user and self.client.bot_type & 64:
            return

        if message.guild_id is not None and self.client.bot_type & 32:
            return

        if message.guild_id is None and self.client.bot_type & 16:
            return

        # step 1, match the messages
        matched = self.message_check(self.client, message)
        if inspect.isawaitable(matched):
            matched = await matched

        if matched is None:
            return None

        # deconstruct the tuple returned into more useful variables than a single tuple
        command_word, tokens = matched

        # step 2, create the new commands context
        ctx = Context(event_context=ctx, message=message)
        ctx.command_name = command_word
        ctx.tokens = tokens
        ctx.manager = self

        # step 3, invoke the context to try and match the command and run it
        await ctx.try_invoke()
stream_utils.py 文件源码 项目:metapensiero.reactive 作者: azazel75 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _exec_possible_awaitable(self, func, *args, **kwargs):
        result = func(*args, **kwargs)
        if inspect.isawaitable(result):
            result = await result
        return result
asyncsupport.py 文件源码 项目:python-group-proj 作者: Sharcee 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def auto_await(value):
    if inspect.isawaitable(value):
        return await value
    return value
asyncsupport.py 文件源码 项目:islam-buddy 作者: hamir 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def auto_await(value):
    if inspect.isawaitable(value):
        return await value
    return value
misc.py 文件源码 项目:Chiaki-Nanami 作者: Ikusaba-san 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def maybe_awaitable(func, *args, **kwargs):
    maybe = func(*args, **kwargs)
    return await maybe if inspect.isawaitable(maybe) else maybe


问题


面经


文章

微信
公众号

扫码关注公众号