python类isgenerator()的实例源码

utils.py 文件源码 项目:pythonVSCode 作者: DonJayamanne 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def underscore_memoization(func):
    """
    Decorator for methods::

        class A(object):
            def x(self):
                if self._x:
                    self._x = 10
                return self._x

    Becomes::

        class A(object):
            @underscore_memoization
            def x(self):
                return 10

    A now has an attribute ``_x`` written by this decorator.
    """
    name = '_' + func.__name__

    def wrapper(self):
        try:
            return getattr(self, name)
        except AttributeError:
            result = func(self)
            if inspect.isgenerator(result):
                result = list(result)
            setattr(self, name, result)
            return result

    return wrapper


# for fast_parser, should not be deleted
cache.py 文件源码 项目:pythonVSCode 作者: DonJayamanne 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def memoize_default(default=NO_DEFAULT, evaluator_is_first_arg=False, second_arg_is_evaluator=False):
    """ This is a typical memoization decorator, BUT there is one difference:
    To prevent recursion it sets defaults.

    Preventing recursion is in this case the much bigger use than speed. I
    don't think, that there is a big speed difference, but there are many cases
    where recursion could happen (think about a = b; b = a).
    """
    def func(function):
        def wrapper(obj, *args, **kwargs):
            if evaluator_is_first_arg:
                cache = obj.memoize_cache
            elif second_arg_is_evaluator:  # needed for meta classes
                cache = args[0].memoize_cache
            else:
                cache = obj._evaluator.memoize_cache

            try:
                memo = cache[function]
            except KeyError:
                memo = {}
                cache[function] = memo

            key = (obj, args, frozenset(kwargs.items()))
            if key in memo:
                return memo[key]
            else:
                if default is not NO_DEFAULT:
                    memo[key] = default
                rv = function(obj, *args, **kwargs)
                if inspect.isgenerator(rv):
                    rv = list(rv)
                memo[key] = rv
                return rv
        return wrapper
    return func
redis_cache.py 文件源码 项目:smartcache 作者: wecatch 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def _is_iterable(data):
        return isinstance(data, list) or isinstance(data, tuple) or isinstance(data, set) or inspect.isgenerator(data)
glance.py 文件源码 项目:novajoin 作者: openstack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def call(self, context, method, *args, **kwargs):
        """Call a glance client method."""
        if self.client is None:
            self.client = self._glance_client(context)

        retry_excs = (glanceclient.exc.ServiceUnavailable,
                      glanceclient.exc.InvalidEndpoint,
                      glanceclient.exc.CommunicationError)
        retries = CONF.glance_num_retries
        if retries < 0:
            LOG.warning("Treating negative retries as 0")
            retries = 0

        num_attempts = retries + 1

        for attempt in range(1, num_attempts + 1):
            client = self._glance_client(context)
            try:
                controller = getattr(client,
                                     kwargs.pop('controller', 'images'))
                result = getattr(controller, method)(*args, **kwargs)
                if inspect.isgenerator(result):
                    # Convert generator results to a list, so that we can
                    # catch any potential exceptions now and retry the call.
                    return list(result)
                return result
            except retry_excs as e:
                if attempt < num_attempts:
                    extra = "retrying"
                else:
                    extra = "done trying"

                LOG.exception("Error contacting glance server "
                              "'%(server)s' for '%(method)s', "
                              "%(extra)s.",
                              {'server': self.api_server,
                               'method': method, 'extra': extra})
                if attempt == num_attempts:
                    raise exception.GlanceConnectionFailed(
                        server=str(self.api_server), reason=six.text_type(e))
                time.sleep(1)
utils.py 文件源码 项目:0ops.exed 作者: whisperaven 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def fix_http_content_length():
    """ Reverse operate done by cherrypy `_be_ie_unfriendly`. """
    response = cherrypy.serving.response
    if not inspect.isgenerator(response.body): # Dont do this in `stream` mode
        response.body = response.collapse_body().strip()
        response.headers['Content-Length'] = str(len(response.collapse_body()))
utils.py 文件源码 项目:0ops.exed 作者: whisperaven 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def unescape_response():
    """ Unescape the html body which escaped by `_cpcompat.escape_html()`. """
    response = cherrypy.serving.response
    if not inspect.isgenerator(response.body): # Dont do this in `stream` mode
        response.body = six.binary_type(unescape_html(response.collapse_body()))
utils.py 文件源码 项目:0ops.exed 作者: whisperaven 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _json_stream_output(next_handler, *args, **kwargs):
    """ Output JSON in stream mode. """
    cherrypy.response.headers['Content-Type'] = "application/json"
    _outputs = next_handler(*args, **kwargs)
    if inspect.isgenerator(_outputs):
        def _stream_outputs():
            for _content in _outputs:
                yield json.dumps(_content)
        return _stream_outputs()
    else:
        return json.dumps(_outputs)
test_base_filter.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_filter_all(self, mock_filter_one):
        mock_filter_one.side_effect = [True, False, True]
        filter_obj_list = ['obj1', 'obj2', 'obj3']
        container = {}
        base_filter = base_filters.BaseFilter()
        extra_spec = {}
        result = base_filter.filter_all(filter_obj_list, container, extra_spec)
        self.assertTrue(inspect.isgenerator(result))
        self.assertEqual(['obj1', 'obj3'], list(result))
wsgi.py 文件源码 项目:coriolis 作者: cloudbase 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def post_process_extensions(self, extensions, resp_obj, request,
                                action_args):
        for ext in extensions:
            response = None
            if inspect.isgenerator(ext):
                # If it's a generator, run the second half of
                # processing
                try:
                    with ResourceExceptionHandler():
                        response = ext.send(resp_obj)
                except StopIteration:
                    # Normal exit of generator
                    continue
                except Fault as ex:
                    response = ex
            else:
                # Regular functions get post-processing...
                try:
                    with ResourceExceptionHandler():
                        response = ext(req=request, resp_obj=resp_obj,
                                       **action_args)
                except Fault as ex:
                    response = ex

            # We had a response...
            if response:
                return response

        return None
base.py 文件源码 项目:parsec-cloud 作者: Scille 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def do(f):

    @wraps(f)
    def wrapper(*args, **kwargs):
        gen = f(*args, **kwargs)
        if not inspect.isgenerator(gen):
            res = gen
            def generator_no_yield():
                return res
                yield
            gen = generator_no_yield()
        return Effect(ChainedIntent(gen))

    return wrapper
base_handler.py 文件源码 项目:Url 作者: beiruan 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def run_task(self, module, task, response):
        """
        Processing the task, catching exceptions and logs, return a `ProcessorResult` object
        """
        self.logger = logger = module.logger
        result = None
        exception = None
        stdout = sys.stdout
        self.task = task
        if isinstance(response, dict):
            response = rebuild_response(response)
        self.response = response
        self.save = (task.get('track') or {}).get('save', {})

        try:
            if self.__env__.get('enable_stdout_capture', True):
                sys.stdout = ListO(module.log_buffer)
            self._reset()
            result = self._run_task(task, response)
            if inspect.isgenerator(result):
                for r in result:
                    self._run_func(self.on_result, r, response, task)
            else:
                self._run_func(self.on_result, result, response, task)
        except Exception as e:
            logger.exception(e)
            exception = e
        finally:
            follows = self._follows
            messages = self._messages
            logs = list(module.log_buffer)
            extinfo = self._extinfo
            save = self.save

            sys.stdout = stdout
            self.task = None
            self.response = None
            self.save = None

        module.log_buffer[:] = []
        return ProcessorResult(result, follows, messages, logs, exception, extinfo, save)
cache.py 文件源码 项目:leetcode 作者: thomasyimgit 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def underscore_memoization(func):
    """
    Decorator for methods::

        class A(object):
            def x(self):
                if self._x:
                    self._x = 10
                return self._x

    Becomes::

        class A(object):
            @underscore_memoization
            def x(self):
                return 10

    A now has an attribute ``_x`` written by this decorator.
    """
    name = '_' + func.__name__

    def wrapper(self):
        try:
            return getattr(self, name)
        except AttributeError:
            result = func(self)
            if inspect.isgenerator(result):
                result = list(result)
            setattr(self, name, result)
            return result

    return wrapper
cache.py 文件源码 项目:leetcode 作者: thomasyimgit 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def memoize_default(default=NO_DEFAULT, evaluator_is_first_arg=False, second_arg_is_evaluator=False):
    """ This is a typical memoization decorator, BUT there is one difference:
    To prevent recursion it sets defaults.

    Preventing recursion is in this case the much bigger use than speed. I
    don't think, that there is a big speed difference, but there are many cases
    where recursion could happen (think about a = b; b = a).
    """
    def func(function):
        def wrapper(obj, *args, **kwargs):
            if evaluator_is_first_arg:
                cache = obj.memoize_cache
            elif second_arg_is_evaluator:  # needed for meta classes
                cache = args[0].memoize_cache
            else:
                cache = obj.evaluator.memoize_cache

            try:
                memo = cache[function]
            except KeyError:
                memo = {}
                cache[function] = memo

            key = (obj, args, frozenset(kwargs.items()))
            if key in memo:
                return memo[key]
            else:
                if default is not NO_DEFAULT:
                    memo[key] = default
                rv = function(obj, *args, **kwargs)
                if inspect.isgenerator(rv):
                    rv = list(rv)
                memo[key] = rv
                return rv
        return wrapper
    return func
Trainer.py 文件源码 项目:aetros-cli 作者: aetros 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def is_generator(obj):
    import inspect

    return obj is not None and (inspect.isgeneratorfunction(obj) or inspect.isgenerator(obj) or hasattr(obj, 'next') or hasattr(obj, '__next__'))
pyversion.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def isgenerator(o):
        if isinstance(o, UnboundMethod):
            o = o._func
        return inspect.isgeneratorfunction(o) or inspect.isgenerator(o)
pyversion.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def isgenerator(func):
        try:
            return func.func_code.co_flags & CO_GENERATOR != 0
        except AttributeError:
            return False

# Make a function to help check if an exception is derived from BaseException.
# In Python 2.4, we just use Exception instead.
test_inspect.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_excluding_predicates(self):
        global tb
        self.istest(inspect.isbuiltin, 'sys.exit')
        self.istest(inspect.isbuiltin, '[].append')
        self.istest(inspect.iscode, 'mod.spam.__code__')
        try:
            1/0
        except:
            tb = sys.exc_info()[2]
            self.istest(inspect.isframe, 'tb.tb_frame')
            self.istest(inspect.istraceback, 'tb')
            if hasattr(types, 'GetSetDescriptorType'):
                self.istest(inspect.isgetsetdescriptor,
                            'type(tb.tb_frame).f_locals')
            else:
                self.assertFalse(inspect.isgetsetdescriptor(type(tb.tb_frame).f_locals))
        finally:
            # Clear traceback and all the frames and local variables hanging to it.
            tb = None
        self.istest(inspect.isfunction, 'mod.spam')
        self.istest(inspect.isfunction, 'mod.StupidGit.abuse')
        self.istest(inspect.ismethod, 'git.argue')
        self.istest(inspect.ismodule, 'mod')
        self.istest(inspect.isdatadescriptor, 'collections.defaultdict.default_factory')
        self.istest(inspect.isgenerator, '(x for x in range(2))')
        self.istest(inspect.isgeneratorfunction, 'generator_function_example')
        if hasattr(types, 'MemberDescriptorType'):
            self.istest(inspect.ismemberdescriptor, 'datetime.timedelta.days')
        else:
            self.assertFalse(inspect.ismemberdescriptor(datetime.timedelta.days))
tag.py 文件源码 项目:fluiddb 作者: fluidinfo 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def delete(self, paths):
        """Delete L{Tag}s matching C{paths}.

        L{TagValue}s and permissions associated with the deleted L{Tag}s are
        removed by cascading deletes in the database schema.

        @param paths: A sequence of L{Tag.path}s.
        @return: A C{list} of C{(objectID, Tag.path)} 2-tuples representing the
            L{Tag}s that were removed.
        """
        if isgenerator(paths):
            paths = list(paths)
        result = getTags(paths=paths)
        deletedTagPaths = list(result.values(Tag.objectID, Tag.path))
        # Delete the fluiddb/tags/description tag values stored for removed
        # tags.  Associated TagValue's are removed by an ON DELETE CASCADE
        # trigger.
        self._factory.tagValues(self._user).delete(
            [(objectID, path) for objectID, _ in deletedTagPaths
             for path in [u'fluiddb/tags/description', u'fluiddb/tags/path']])

        # Touch all the objects for the given tag paths.
        objectIDs = list(getObjectIDs(paths))
        touchObjects(objectIDs)

        result.remove()
        return deletedTagPaths
user.py 文件源码 项目:fluiddb 作者: fluidinfo 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def delete(self, usernames):
        """Delete L{User}s matching C{username}s.

        @param usernames: A sequence of L{User.username}s.
        @raise FeatureError: Raised if no L{User.username}s are provided.
        @raise UnknownUserError: Raised if one or more usernames don't match
            existing L{User}s.
        @return: A  C{list} of C{(objectID, User.username)} 2-tuples
            representing the L{User}s that that were removed.
        """
        if isgenerator(usernames):
            usernames = list(usernames)
        if not usernames:
            raise FeatureError('At least one username must be provided.')

        usernames = set(usernames)
        result = getUsers(usernames=usernames)
        existingUsernames = set(result.values(User.username))
        unknownUsernames = usernames - existingUsernames
        if unknownUsernames:
            raise UnknownUserError(list(unknownUsernames))

        admin = getUser(u'fluiddb')
        deletedUsers = list(result.values(User.objectID, User.username))
        # FIXME: Deleting a user will leave the permission exception lists
        # containing the user in a corrupt state.
        result.remove()
        self._factory.tagValues(admin).delete(
            [(objectID, systemTag) for objectID, _ in deletedUsers
             for systemTag in [u'fluiddb/users/username',
                               u'fluiddb/users/name',
                               u'fluiddb/users/email',
                               u'fluiddb/users/role']])
        return deletedUsers
namespace.py 文件源码 项目:fluiddb 作者: fluidinfo 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def delete(self, paths):
        """See L{NamespaceAPI.delete}.

        @raise PermissionDeniedError: Raised if the user is not authorized to
            delete a given L{Namespace}.
        """
        if isgenerator(paths):
            paths = list(paths)
        pathsAndOperations = [(path, Operation.DELETE_NAMESPACE)
                              for path in paths]
        deniedOperations = checkPermissions(self._user, pathsAndOperations)
        if deniedOperations:
            raise PermissionDeniedError(self._user.username, deniedOperations)

        return self._api.delete(paths)


问题


面经


文章

微信
公众号

扫码关注公众号