python类isgeneratorfunction()的实例源码

netpycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def register(self):
        """RTI must be registered so it can be located.
        """
        if self._location:
            return -1
        if not inspect.isgeneratorfunction(self._method):
            return -1
        RTI._pycos._lock.acquire()
        if RTI._pycos._rtis.get(self._name, None) is None:
            RTI._pycos._rtis[self._name] = self
            RTI._pycos._lock.release()
            return 0
        else:
            RTI._pycos._lock.release()
            return -1
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __get_generator(task, *args, **kwargs):
        if args:
            target = args[0]
            args = args[1:]
        else:
            target = kwargs.pop('target', None)
            args = kwargs.pop('args', ())
            kwargs = kwargs.pop('kwargs', kwargs)
        if not inspect.isgeneratorfunction(target):
            raise Exception('%s is not a generator!' % target.__name__)
        if target.func_defaults and \
           'task' in target.func_code.co_varnames[:target.func_code.co_argcount][-len(target.func_defaults):]:
            kwargs['task'] = task
        return target(*args, **kwargs)
dispycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _run_request(self, request, where, cpu, gen, *args, **kwargs):
        """Internal use only.
        """
        if isinstance(gen, str):
            name = gen
        else:
            name = gen.__name__

        if name in self._xfer_funcs:
            code = None
        else:
            # if not inspect.isgeneratorfunction(gen):
            #     logger.warning('"%s" is not a valid generator function', name)
            #     raise StopIteration([])
            code = inspect.getsource(gen).lstrip()

        def _run_req(task=None):
            msg = {'req': 'job', 'auth': self._auth,
                   'job': _DispycosJob_(request, task, name, where, cpu, code, args, kwargs)}
            if (yield self.scheduler.deliver(msg, timeout=MsgTimeout)) == 1:
                reply = yield task.receive()
                if isinstance(reply, Task):
                    if self.status_task:
                        msg = DispycosTaskInfo(reply, args, kwargs, time.time())
                        self.status_task.send(DispycosStatus(Scheduler.TaskCreated, msg))
                if not request.endswith('async'):
                    reply = yield task.receive()
            else:
                reply = None
            raise StopIteration(reply)

        yield Task(_run_req).finish()
netpycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def register(self):
        """RTI must be registered so it can be located.
        """
        if self._location:
            return -1
        if not inspect.isgeneratorfunction(self._method):
            return -1
        RTI._pycos._lock.acquire()
        if RTI._pycos._rtis.get(self._name, None) is None:
            RTI._pycos._rtis[self._name] = self
            RTI._pycos._lock.release()
            return 0
        else:
            RTI._pycos._lock.release()
            return -1
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __get_generator(task, *args, **kwargs):
        if args:
            target = args[0]
            args = args[1:]
        else:
            target = kwargs.pop('target', None)
            args = kwargs.pop('args', ())
            kwargs = kwargs.pop('kwargs', kwargs)
        if not inspect.isgeneratorfunction(target):
            raise Exception('%s is not a generator!' % target.__name__)
        if target.__defaults__ and \
           'task' in target.__code__.co_varnames[:target.__code__.co_argcount][-len(target.__defaults__):]:
            kwargs['task'] = task
        return target(*args, **kwargs)
messaging.py 文件源码 项目:CEX.IO-Client-Python3.5 作者: cexioltd 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def is_awaitable(obj):
        # There is no single method which can answer in any case, should wait or not - so need to create one
        # for the suspected cases : func, coro, gen-coro, future,
        #                           class with sync __call__, class with async __call__,
        #                           sync method, async method
        if inspect.isawaitable(obj) or inspect.iscoroutinefunction(obj) or inspect.iscoroutine(obj):
            return True
        elif inspect.isgeneratorfunction(obj):
            return True
        elif CallChain.is_user_defined_class(obj):
            if hasattr(obj, '__call__'):
                return CallChain.is_awaitable(obj.__call__)
            return False
        else:
            return False
wsgi.py 文件源码 项目:vdi-broker 作者: cloudbase 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def pre_process_extensions(self, extensions, request, action_args):
        # List of callables for post-processing extensions
        post = []

        for ext in extensions:
            if inspect.isgeneratorfunction(ext):
                response = None

                # If it's a generator function, the part before the
                # yield is the preprocessing stage
                try:
                    with ResourceExceptionHandler():
                        gen = ext(req=request, **action_args)
                        response = next(gen)
                except Fault as ex:
                    response = ex

                # We had a response...
                if response:
                    return response, []

                # No response, queue up generator for post-processing
                post.append(gen)
            else:
                # Regular functions only perform post-processing
                post.append(ext)

        # Run post-processing in the reverse order
        return None, reversed(post)
pyversion.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def isgenerator(o):
        if isinstance(o, UnboundMethod):
            o = o._func
        return inspect.isgeneratorfunction(o) or inspect.isgenerator(o)
usb1.py 文件源码 项目:sc-controller 作者: kozec 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _validContext(func):
        # Defined inside USBContext so we can access "self.__*".
        @contextlib.contextmanager
        def refcount(self):
            with self.__context_cond:
                if not self.__context_p and self.__auto_open:
                    # BBB
                    warnings.warn(
                        'Use "with USBContext() as context:" for safer cleanup'
                        ' on interpreter shutdown. See also USBContext.open().',
                        DeprecationWarning,
                    )
                    self.open()
                self.__context_refcount += 1
            try:
                yield
            finally:
                with self.__context_cond:
                    self.__context_refcount -= 1
                    if not self.__context_refcount:
                        self.__context_cond.notifyAll()
        if inspect.isgeneratorfunction(func):
            def wrapper(self, *args, **kw):
                with refcount(self):
                    if self.__context_p:
                        # pylint: disable=not-callable
                        for value in func(self, *args, **kw):
                            # pylint: enable=not-callable
                            yield value
        else:
            def wrapper(self, *args, **kw):
                with refcount(self):
                    if self.__context_p:
                        # pylint: disable=not-callable
                        return func(self, *args, **kw)
                        # pylint: enable=not-callable
        functools.update_wrapper(wrapper, func)
        return wrapper
    # pylint: enable=no-self-argument,protected-access
wsgi.py 文件源码 项目:meteos 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def pre_process_extensions(self, extensions, request, action_args):
        # List of callables for post-processing extensions
        post = []

        for ext in extensions:
            if inspect.isgeneratorfunction(ext):
                response = None

                # If it's a generator function, the part before the
                # yield is the preprocessing stage
                try:
                    with ResourceExceptionHandler():
                        gen = ext(req=request, **action_args)
                        response = next(gen)
                except Fault as ex:
                    response = ex

                # We had a response...
                if response:
                    return response, []

                # No response, queue up generator for post-processing
                post.append(gen)
            else:
                # Regular functions only perform post-processing
                post.append(ext)

        # Run post-processing in the reverse order
        return None, reversed(post)
agent.py 文件源码 项目:osbrain 作者: opensistemas-hub 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def _process_async_rep_event(self, socket, channel, data):
        """
        Process a ASYNC_REP socket's event.

        Parameters
        ----------
        socket : zmq.Socket
            Socket that generated the event.
        channel : AgentChannel
            AgentChannel associated with the socket that generated the event.
        data : bytes
            Data received on the socket.
        """
        message = deserialize_message(message=data,
                                      serializer=channel.serializer)
        address_uuid, request_uuid, data, address = message
        client_address = address.twin()
        if not self.registered(client_address):
            self.connect(address)
        handler = self.handler[socket]
        is_generator = inspect.isgeneratorfunction(handler)
        if is_generator:
            generator = handler(self, data)
            reply = next(generator)
        else:
            reply = handler(self, data)
        self.send(client_address, (address_uuid, request_uuid, reply))
        if is_generator:
            execute_code_after_yield(generator)
agent.py 文件源码 项目:osbrain 作者: opensistemas-hub 项目源码 文件源码 阅读 46 收藏 0 点赞 0 评论 0
def _process_sync_pub_event(self, socket, channel, data):
        """
        Process a SYNC_PUB socket's event.

        Parameters
        ----------
        socket : zmq.Socket
            Socket that generated the event.
        channel : AgentChannel
            AgentChannel associated with the socket that generated the event.
        data : bytes
            Data received on the socket.
        """
        message = deserialize_message(message=data,
                                      serializer=channel.serializer)
        address_uuid, request_uuid, data = message
        handler = self.handler[socket]
        is_generator = inspect.isgeneratorfunction(handler)
        if is_generator:
            generator = handler(self, data)
            reply = next(generator)
        else:
            reply = handler(self, data)
        message = (address_uuid, request_uuid, reply)
        self._send_channel_sync_pub(channel=channel,
                                    message=message,
                                    topic=address_uuid,
                                    general=False)
        if is_generator:
            execute_code_after_yield(generator)
coroweb.py 文件源码 项目:awesome-python3-webapp 作者: syusonn 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def add_route(app,fn):
    method = getattr(fn,'__method__',None)
    path = getattr(fn,'__route__',None)
    if path is None or method is None:
        return ValueError('@get or @post not defined in %s' % str(fn))
    if not asyncio.iscoroutinefunction(fn) and not inspect.isgeneratorfunction(fn):
        fn = asyncio.coroutine(fn)
    logging.info('add route %s %s => %s(%s)' % (method,path,fn.__name__,','.join(inspect.signature(fn).parameters.keys())))
    app.router.add_route(method,path,RequestHandler(app,fn))

#????URL??
async.py 文件源码 项目:poloniex-api 作者: absortium 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def ticker_wrapper(handler):
    async def decorator(data):
        currency_pair = data[0]
        last = data[1]
        lowest_ask = data[2]
        highest_bid = data[3]
        percent_change = data[4]
        base_volume = data[5]
        quote_volume = data[6]
        is_frozen = data[7]
        day_high = data[8]
        day_low = data[9]

        kwargs = {
            "currency_pair": currency_pair,
            "last": last,
            "lowest_ask": lowest_ask,
            "highest_bid": highest_bid,
            "percent_change": percent_change,
            "base_volume": base_volume,
            "quote_volume": quote_volume,
            "is_frozen": is_frozen,
            "day_high": day_high,
            "day_low": day_low
        }

        if inspect.isgeneratorfunction(handler):
            await handler(**kwargs)
        else:
            handler(**kwargs)

    return decorator
async.py 文件源码 项目:poloniex-api 作者: absortium 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def trades_wrapper(topic, handler):
    async def decorator(data):
        for event in data:
            event["currency_pair"] = topic

            if inspect.isgeneratorfunction(handler):
                await handler(**event)
            else:
                handler(**event)

    return decorator
coreweb.py 文件源码 项目:python-awesome-web 作者: tianzhenyun 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def add_route(app, fn):
    method = getattr(fn, '__method__', None)
    path = getattr(fn, '__route__', None)
    if path is None or method is None:
        raise ValueError('@Get or @Post not defined in {}.'.format(str(fn)))
    if not asyncio.iscoroutinefunction(fn) and not inspect.isgeneratorfunction(fn):
        fn = asyncio.coroutine(fn)
    logging.info('add route {} {} => {}({})'.format(method, path, fn.__name__, ', '.join(inspect.signature(fn).parameters.keys())))
    app.router.add_route(method, path, RequestHandler(app, fn))
inspection.py 文件源码 项目:qcore 作者: quora 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def is_cython_or_generator(fn):
    """Returns whether this function is either a generator function or a Cythonized function."""
    if hasattr(fn, '__func__'):
        fn = fn.__func__  # Class method, static method
    if inspect.isgeneratorfunction(fn):
        return True
    name = type(fn).__name__
    return \
        name == 'generator' or \
        name == 'method_descriptor' or \
        name == 'cython_function_or_method' or \
        name == 'builtin_function_or_method'
wsgi.py 文件源码 项目:coriolis 作者: cloudbase 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def pre_process_extensions(self, extensions, request, action_args):
        # List of callables for post-processing extensions
        post = []

        for ext in extensions:
            if inspect.isgeneratorfunction(ext):
                response = None

                # If it's a generator function, the part before the
                # yield is the preprocessing stage
                try:
                    with ResourceExceptionHandler():
                        gen = ext(req=request, **action_args)
                        response = next(gen)
                except Fault as ex:
                    response = ex

                # We had a response...
                if response:
                    return response, []

                # No response, queue up generator for post-processing
                post.append(gen)
            else:
                # Regular functions only perform post-processing
                post.append(ext)

        # Run post-processing in the reverse order
        return None, reversed(post)
__init__.py 文件源码 项目:srep 作者: Answeror 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def return_list(func):
    import inspect
    from functools import wraps
    assert inspect.isgeneratorfunction(func)

    @wraps(func)
    def wrapped(*args, **kargs):
        return list(func(*args, **kargs))

    return wrapped
pipe.py 文件源码 项目:SimPype 作者: Mallets 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __enqueue(func, pipe, message):
    assert isinstance(pipe, Pipe)
    assert isinstance(message, simpype.Message)
    message.location = pipe
    message.resource = pipe.resource
    if inspect.isgeneratorfunction(func):
        result = yield pipe.env.process(func(pipe, message))
    else:
        result = func(pipe, message)
    pipe.full()
    return result


问题


面经


文章

微信
公众号

扫码关注公众号