python类isawaitable()的实例源码

builtin.py 文件源码 项目:pcbot 作者: pckv 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def eval_(message: discord.Message, python_code: Annotate.Code):
    """ Evaluate a python expression. Can be any python code on one
    line that returns something. Coroutine generators will by awaited.
    """
    code_globals.update(dict(message=message, client=client,
                             author=message.author, server=message.server, channel=message.channel))

    before = datetime.now()
    try:
        result = eval(python_code, code_globals)
        if inspect.isawaitable(result):
            result = await result
    except SyntaxError as e:
        result = utils.format_syntax_error(e)
    except Exception as e:
        result = utils.format_exception(e)

    await send_result(message.channel, result, datetime.now() - before)
sanic_application.py 文件源码 项目:django-sanic-adaptor 作者: ashleysommer 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def async_legacy_get_response_dj_1_10(self, request):
        """
        Apply process_request() middleware and call the main _get_response(),
        if needed. Used only for legacy MIDDLEWARE_CLASSES.
        """
        response = None
        # Apply request middleware
        for middleware_method in self._request_middleware:
            response = middleware_method(request)
            if isawaitable(response):
                response = await response
            if response:
                break

        if response is None:
            response = await self._get_response_inner_dj_1_10(request)

        return response

    # This function is protected under the Django BSD 3-Clause licence
    # This function is reproduced under the terms of the Django Licence
    # See DJANGO_LICENCE in this source code repository
Owner.py 文件源码 项目:youtube 作者: FishyFing 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def debug(self, ctx, *, code: str):
        """Evaluates code."""
        code = code.strip('` ')
        python = '```py\n{}\n```'

        env = {
            'bot': self.bot,
            'ctx': ctx,
            'message': ctx.message,
            'guild': ctx.guild,
            'channel': ctx.channel,
            'author': ctx.author
        }

        env.update(globals())

        try:
            result = eval(code, env)
            if inspect.isawaitable(result):
                result = await result
        except Exception as e:
            await ctx.send(python.format(type(e).__name__ + ': ' + str(e)))
            return

        await ctx.send(python.format(result))
owner.py 文件源码 项目:lagbot 作者: mikevb1 项目源码 文件源码 阅读 52 收藏 0 点赞 0 评论 0
def debug(self, ctx, *, code: cleanup_code):
        """Evaluates code."""
        result = None
        env = get_env(ctx)
        env['__'] = self.last_eval

        try:
            result = eval(code, env)
            if inspect.isawaitable(result):
                result = await result
            if isinstance(result, discord.Embed):
                await ctx.send(embed=result)
                return
        except Exception as e:
            say = self.eval_output(exception_signature())
        else:
            say = self.eval_output(rep(result))
        if say is None:
            say = 'None'
        await ctx.send(say)
context.py 文件源码 项目:curious 作者: SunDwarf 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def can_run(self, cmd) -> Tuple[bool, list]:
        """
        Checks if a command can be ran.

        :return: If it can be ran, and a list of conditions that failed.
        """
        conditions = getattr(cmd, "cmd_conditions", [])
        failed = []
        for condition in conditions:
            try:
                success = condition(self)
                if inspect.isawaitable(success):
                    success = await success
            except CommandsError:
                raise
            except Exception:
                failed.append(condition)
            else:
                if not success:
                    failed.append(success)

        if failed:
            return False, failed

        return True, []
evaluate.py 文件源码 项目:apex-sigma 作者: lu-ci 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def evaluate(cmd, message, args):
    if message.author.id in permitted_id:
        if not args:
            await message.channel.send(cmd.help())
        else:
            try:
                execution = " ".join(args)
                output = eval(execution)
                if inspect.isawaitable(output):
                    output = await output
                status = discord.Embed(title='? Executed', color=0x66CC66)
                if output:
                    try:
                        status.add_field(name='Results', value='\n```\n' + str(output) + '\n```')
                    except:
                        pass
            except Exception as e:
                cmd.log.error(e)
                status = discord.Embed(color=0xDB0000, title='? Error')
                status.add_field(name='Execution Failed', value=str(e))
            await message.channel.send(None, embed=status)
    else:
        status = discord.Embed(type='rich', color=0xDB0000,
                               title='? Insufficient Permissions. Bot Owner or Server Admin Only.')
        await message.channel.send(None, embed=status)
utils.py 文件源码 项目:nougat 作者: NougatWeb 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def is_middleware(func):
    """
    test whether it is a middleware
    :return: Boolean
    """
    args = list(inspect.signature(func).parameters.items())

    # if not inspect.isawaitable(func):
    #     raise UnknownMiddlewareException("middleware {} should be awaitable".format(func.__name__))

    if len(args) != 3:
        raise UnknownMiddlewareException("middleware {} should has 3 params named req, res and next".format(func.__name__))

    if args[0][0] != 'req':
        raise UnknownMiddlewareException("the first param's name of middleware {} should be req".format(func.__name__))

    if args[1][0] != 'res':
        raise UnknownMiddlewareException("the second param's name of middleware {} should be res".format(func.__name__))

    if args[2][0] != 'next':
        raise UnknownMiddlewareException("the third param's name of middleware {} should be next".format(func.__name__))

    return True
tasks.py 文件源码 项目:golightan 作者: shirou 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def ensure_future(coro_or_future, *, loop=None):
    """Wrap a coroutine or an awaitable in a future.

    If the argument is a Future, it is returned directly.
    """
    if futures.isfuture(coro_or_future):
        if loop is not None and loop is not coro_or_future._loop:
            raise ValueError('loop argument must agree with Future')
        return coro_or_future
    elif coroutines.iscoroutine(coro_or_future):
        if loop is None:
            loop = events.get_event_loop()
        task = loop.create_task(coro_or_future)
        if task._source_traceback:
            del task._source_traceback[-1]
        return task
    elif compat.PY35 and inspect.isawaitable(coro_or_future):
        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
    else:
        raise TypeError('A Future, a coroutine or an awaitable is required')
tasks.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def ensure_future(coro_or_future, *, loop=None):
    """Wrap a coroutine or an awaitable in a future.

    If the argument is a Future, it is returned directly.
    """
    if isinstance(coro_or_future, futures.Future):
        if loop is not None and loop is not coro_or_future._loop:
            raise ValueError('loop argument must agree with Future')
        return coro_or_future
    elif coroutines.iscoroutine(coro_or_future):
        if loop is None:
            loop = events.get_event_loop()
        task = loop.create_task(coro_or_future)
        if task._source_traceback:
            del task._source_traceback[-1]
        return task
    elif compat.PY35 and inspect.isawaitable(coro_or_future):
        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
    else:
        raise TypeError('A Future, a coroutine or an awaitable is required')
extension.py 文件源码 项目:sanic-cors 作者: ashleysommer 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def route_wrapper(self, route, req, context, request_args, request_kw,
                            *decorator_args, **decorator_kw):
        _ = decorator_kw.pop('with_context')  # ignore this.
        _options = decorator_kw
        options = get_cors_options(context.app, _options)
        if options.get('automatic_options') and req.method == 'OPTIONS':
            resp = response.HTTPResponse()
        else:
            resp = route(req, *request_args, **request_kw)
            if resp is not None:
                while isawaitable(resp):
                    resp = await resp
        if resp is not None:
            request_context = context.request[id(req)]
            set_cors_headers(req, resp, context, options)
            request_context[SANIC_CORS_EVALUATED] = "1"
        return resp
asynctest.py 文件源码 项目:aioprometheus 作者: claws 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def isawaitable(obj):
    ''' Return True if the object is an awaitable or is a function that
    returns an awaitable.

    This function is used internally by aiotesting.
    '''
    if PY35:
        result = inspect.iscoroutinefunction(obj) or inspect.isawaitable(obj)

    elif PY34:
        result = (isinstance(obj, asyncio.Future) or
                  asyncio.iscoroutine(obj) or
                  hasattr(obj, '__await__'))
    else:
        raise Exception(
            'isawaitable is not supported on Python {}'.format(
                sys.version_info))
    return result
db.py 文件源码 项目:aiorethink 作者: lars-tiede 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _run_query(query, conn = None):
    """`run()`s query if caller hasn't already done so, then awaits and returns
    its result.

    If run() has already been called, then the query (strictly speaking, the
    awaitable) is just awaited. This gives the caller the opportunity to
    customize the run() call.

    If run() has not been called, then the query is run on the given connection
    (or the default connection). This is more convenient for the caller than
    the other version.
    """
    # run() it if caller didn't do that already
    if not inspect.isawaitable(query):
        if not isinstance(query, r.RqlQuery):
            raise TypeError("query is neither awaitable nor a RqlQuery")
        cn = conn or await db_conn
        query = query.run(cn)

    return await query
test_cursor.py 文件源码 项目:asyncpg 作者: MagicStack 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_cursor_iterable_02(self):
        # Test that it's not possible to create a cursor without hold
        # outside of a transaction
        s = await self.con.prepare(
            'DECLARE t BINARY CURSOR WITHOUT HOLD FOR SELECT 1')
        with self.assertRaises(asyncpg.NoActiveSQLTransactionError):
            await s.fetch()

        # Now test that statement.cursor() does not let you
        # iterate over it outside of a transaction
        st = await self.con.prepare('SELECT generate_series(0, 20)')

        it = st.cursor(prefetch=5).__aiter__()
        if inspect.isawaitable(it):
            it = await it

        with self.assertRaisesRegex(asyncpg.NoActiveSQLTransactionError,
                                    'cursor cannot be created.*transaction'):
            await it.__anext__()
commands.py 文件源码 项目:Turbo 作者: jaydenkieran 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def c_eval(self, message, server, channel, author, stmt, args):
        """
        Evaluates Python code

        {prefix}eval <code>

        If the result is a coroutine, it will be awaited
        """
        stmt = ' '.join([stmt, *args])
        try:
            result = eval(stmt)
            if inspect.isawaitable(result):
                result = await result
        except Exception as e:
            exc = traceback.format_exc().splitlines()
            result = exc[-1]
        log.debug("Evaluated: {} - Result was: {}".format(stmt, result))
        return Response("```xl\n--- In ---\n{}\n--- Out ---\n{}\n```".format(stmt, result))
messaging.py 文件源码 项目:CEX.IO-Client-Python3.5 作者: cexioltd 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def is_awaitable(obj):
        # There is no single method which can answer in any case, should wait or not - so need to create one
        # for the suspected cases : func, coro, gen-coro, future,
        #                           class with sync __call__, class with async __call__,
        #                           sync method, async method
        if inspect.isawaitable(obj) or inspect.iscoroutinefunction(obj) or inspect.iscoroutine(obj):
            return True
        elif inspect.isgeneratorfunction(obj):
            return True
        elif CallChain.is_user_defined_class(obj):
            if hasattr(obj, '__call__'):
                return CallChain.is_awaitable(obj.__call__)
            return False
        else:
            return False
meta.py 文件源码 项目:Harmonbot 作者: Harmon758 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def eval(self, ctx, *, code : str):
        code = code.strip('`')
        try:
            result = eval(code)
            if inspect.isawaitable(result):
                result = await result
            await self.bot.reply(clients.py_code_block.format(result))
        except Exception as e:
            await self.bot.reply(clients.py_code_block.format("{}: {}".format(type(e).__name__, e)))
meta.py 文件源码 项目:Harmonbot 作者: Harmon758 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def repl(self, ctx):
        variables = {"self" : self, "ctx" : ctx, "last" : None}
        await self.bot.embed_reply("Enter code to execute or evaluate\n`exit` or `quit` to exit")
        while True:
            message = await self.bot.wait_for_message(author = ctx.message.author, channel = ctx.message.channel, check = lambda m: m.content.startswith('`'))
            if message.content.startswith("```py") and message.content.endswith("```"):
                code = message.content[5:-3].strip(" \n")
            else:
                code = message.content.strip("` \n")
            if code in ("quit", "exit", "quit()", "exit()"):
                await self.bot.embed_reply('Exiting repl')
                return
            function = exec
            if '\n' not in code:
                try:
                    code = compile(code, "<repl>", "eval")
                except SyntaxError:
                    pass
                else:
                    function = eval
            if function is exec:
                try:
                    code = compile(code, "<repl>", "exec")
                except SyntaxError as e:
                    await self.bot.reply(clients.py_code_block.format("{0.text}{1:>{0.offset}}\n{2}: {0}".format(e, '^', type(e).__name__)))
                    continue
            try:
                result = function(code, variables)
                if inspect.isawaitable(result):
                    result = await result
            except:
                await self.bot.reply(clients.py_code_block.format("\n".join(traceback.format_exc().splitlines()[-2:]).strip()))
            else:
                if function is eval:
                    try:
                        await self.bot.reply(clients.py_code_block.format(result))
                    except Exception as e:
                        await self.bot.reply(clients.py_code_block.format("{}: {}".format(type(e).__name__, e)))
                variables["last"] = result
backports_abc.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def isawaitable(obj):
        return isinstance(obj, Awaitable)


###
#  allow patching the stdlib
backports_abc.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def patch(patch_inspect=True):
    """
    Main entry point for patching the ``collections.abc`` and ``inspect``
    standard library modules.
    """
    PATCHED['collections.abc.Generator'] = _collections_abc.Generator = Generator
    PATCHED['collections.abc.Coroutine'] = _collections_abc.Coroutine = Coroutine
    PATCHED['collections.abc.Awaitable'] = _collections_abc.Awaitable = Awaitable

    if patch_inspect:
        import inspect
        PATCHED['inspect.isawaitable'] = inspect.isawaitable = isawaitable
backports_abc.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def isawaitable(obj):
        return isinstance(obj, Awaitable)


###
#  allow patching the stdlib
backports_abc.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def patch(patch_inspect=True):
    """
    Main entry point for patching the ``collections.abc`` and ``inspect``
    standard library modules.
    """
    PATCHED['collections.abc.Generator'] = _collections_abc.Generator = Generator
    PATCHED['collections.abc.Coroutine'] = _collections_abc.Coroutine = Coroutine
    PATCHED['collections.abc.Awaitable'] = _collections_abc.Awaitable = Awaitable

    if patch_inspect:
        import inspect
        PATCHED['inspect.isawaitable'] = inspect.isawaitable = isawaitable
backports_abc.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def isawaitable(obj):
        return isinstance(obj, Awaitable)


###
#  allow patching the stdlib
chromerdp.py 文件源码 项目:chrome-prerender 作者: bosondata 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _handle_response(self, obj: Dict) -> None:
        req_id = obj.get('id')
        if req_id is not None:
            future = self._futures.get(req_id)
            if future and not future.cancelled():
                future.set_result(obj)
        method = obj.get('method')
        if method is not None:
            callback = self._callbacks.get(method)
            if callback is not None:
                ret = callback(obj)
                if inspect.isawaitable(ret):
                    await ret
asyncsupport.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def auto_await(value):
    if inspect.isawaitable(value):
        return await value
    return value
asyncsupport.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def auto_await(value):
    if inspect.isawaitable(value):
        return await value
    return value
admin.py 文件源码 项目:Inkxbot 作者: InkxtheSquid 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def debug(self, ctx, *, code : str):
        """Evaluates code."""
        code = code.strip('` ')
        python = '```py\n{}\n```'
        result = None

        env = {
            'bot': self.bot,
            'ctx': ctx,
            'message': ctx.message,
            'server': ctx.message.server,
            'channel': ctx.message.channel,
            'author': ctx.message.author
        }

        env.update(globals())

        try:
            result = eval(code, env)
            if inspect.isawaitable(result):
                result = await result
        except Exception as e:
            await self.bot.say(python.format(type(e).__name__ + ': ' + str(e)))
            return

        await self.bot.say(python.format(result))
asyncsupport.py 文件源码 项目:RPoint 作者: george17-meet 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def auto_await(value):
    if inspect.isawaitable(value):
        return await value
    return value
monk.py 文件源码 项目:monk 作者: IcyCC 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def handle_request(self, request, write_response):
        if self.is_static(request) is True:
            path = self.config.get('static_path')+str(request.url.path, encoding="utf-8")
            path = os.path.abspath(path)
            log.info("Static file read path {} ".format(path))
            context = await read_file(path)
            file_type = str(request.url.path, encoding="utf-8").split('.')[-1]
            resp = Response(body=context, version='1.1',
                            content_type=TYPE_MAP.get(file_type, "text/plain"))
            write_response(resp)

        for hooker in self.hook.after_request:
            request = hooker(request)
            if isawaitable(request):
                request = await request

        handle = self.router.get(request=request)
        if handle is None:
            write_response(self.abort_404("Not found method"))
            return

        response = handle(request)

        if isawaitable(response):
            response = await response

        for hooker in self.hook.before_response:
            response = hooker(response)
            if isawaitable(response):
                response = await response

        write_response(response)
wrappers.py 文件源码 项目:ormageddon 作者: renskiy 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __anext__(self):
        try:
            result = self.__next__()
            if inspect.isawaitable(result):
                result = await result
                self.qrw._result_cache[-1] = result
            return result
        except StopAsyncIteration:
            self.qrw._result_cache.pop()
            self.qrw._ct -= 1
            self._idx -= 1
            raise
        except StopIteration:
            raise StopAsyncIteration
fields.py 文件源码 项目:ormageddon 作者: renskiy 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def python_value(self, value):
        if inspect.isawaitable(value):
            return self.async_python_value(value)
        return super().python_value(value)


问题


面经


文章

微信
公众号

扫码关注公众号