python类ref()的实例源码

recipe-546530.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def format(self, format='%(name)s size=%(size)d flat=%(flat)d',
                     detail=-1, order_by='size', indent=''):
        '''Formats the size information of the object and of all sized
        referents as a string.

            *format='%(name)s...'* -- specifies the format string per
            instance, valid interpolation parameters are 'name', 'size'
            and 'flat'

            *detail=-1* -- detail level up to which referents are
            printed (-1 for unlimited)

            *order_by='size'* -- sort order of referents, valid choices
            are 'name', 'size' or 'flat'

            *indent=''* -- optional indentation
        '''
        lines = [indent + (format % dict(size=self.size, flat=self.flat,
                                         name=self.name))]
        if detail and self.refs:
            refs = sorted(self.refs, key=lambda x: getattr(x, order_by),
                                     reverse=order_by in ('size', 'flat'))
            lines += [ref.format(format=format, detail=detail-1, order_by=order_by,
                                 indent=indent+'    ') for ref in refs]
        return '\n'.join(lines)
recipe-546530.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def named_refs(obj, **opts):
    """Returns (a generator for) all named *referents* of an object
    (re-using functionality from **asizeof**).

    See function **basicsize** for a description of the options.

    Does not return un-named *referents*, e.g. objects in a list.
    """
    t = _typedefof(obj, **opts)
    if t:
        r = t.refs
        if r and _iscallable(r):
            for nr in r(obj, True):
                try:
                    yield nr.name, nr.ref
                except AttributeError:
                    pass
thread_util.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def inc(self):
        # Copy these references so on_thread_died needn't close over self
        ident = self.ident
        _counters = self._counters

        tid = ident.get()
        _counters.setdefault(tid, 0)
        _counters[tid] += 1

        if not ident.watching():
            # Before the tid is possibly reused, remove it from _counters
            def on_thread_died(ref):
                ident.unwatch(tid)
                _counters.pop(tid, None)

            ident.watch(on_thread_died)

        return _counters[tid]
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_task_refcounting(self):
        # On CPython, tasks and their arguments should be released immediately
        # without waiting for garbage collection.
        @gen.engine
        def f():
            class Foo(object):
                pass
            arg = Foo()
            self.arg_ref = weakref.ref(arg)
            task = gen.Task(self.io_loop.add_callback, arg=arg)
            self.task_ref = weakref.ref(task)
            yield task
            self.stop()

        self.run_gen(f)
        self.assertIs(self.arg_ref(), None)
        self.assertIs(self.task_ref(), None)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_task_refcounting(self):
        # On CPython, tasks and their arguments should be released immediately
        # without waiting for garbage collection.
        @gen.engine
        def f():
            class Foo(object):
                pass
            arg = Foo()
            self.arg_ref = weakref.ref(arg)
            task = gen.Task(self.io_loop.add_callback, arg=arg)
            self.task_ref = weakref.ref(task)
            yield task
            self.stop()

        self.run_gen(f)
        self.assertIs(self.arg_ref(), None)
        self.assertIs(self.task_ref(), None)
thread_util.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def inc(self):
        # Copy these references so on_thread_died needn't close over self
        ident = self.ident
        _counters = self._counters

        tid = ident.get()
        _counters.setdefault(tid, 0)
        _counters[tid] += 1

        if not ident.watching():
            # Before the tid is possibly reused, remove it from _counters
            def on_thread_died(ref):
                ident.unwatch(tid)
                _counters.pop(tid, None)

            ident.watch(on_thread_died)

        return _counters[tid]
pyparsing.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __setitem__( self, k, v, isinstance=isinstance ):
        if isinstance(v,_ParseResultsWithOffset):
            self.__tokdict[k] = self.__tokdict.get(k,list()) + [v]
            sub = v[0]
        elif isinstance(k,(int,slice)):
            self.__toklist[k] = v
            sub = v
        else:
            self.__tokdict[k] = self.__tokdict.get(k,list()) + [_ParseResultsWithOffset(v,0)]
            sub = v
        if isinstance(sub,ParseResults):
            sub.__parent = wkref(self)
pyparsing.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __iadd__( self, other ):
        if other.__tokdict:
            offset = len(self.__toklist)
            addoffset = lambda a: offset if a<0 else a+offset
            otheritems = other.__tokdict.items()
            otherdictitems = [(k, _ParseResultsWithOffset(v[0],addoffset(v[1])) )
                                for (k,vlist) in otheritems for v in vlist]
            for k,v in otherdictitems:
                self[k] = v
                if isinstance(v[0],ParseResults):
                    v[0].__parent = wkref(self)

        self.__toklist += other.__toklist
        self.__accumNames.update( other.__accumNames )
        return self
pyparsing.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __setstate__(self,state):
        self.__toklist = state[0]
        (self.__tokdict,
         par,
         inAccumNames,
         self.__name) = state[1]
        self.__accumNames = {}
        self.__accumNames.update(inAccumNames)
        if par is not None:
            self.__parent = wkref(par)
        else:
            self.__parent = None
pyparsing.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __setitem__( self, k, v, isinstance=isinstance ):
        if isinstance(v,_ParseResultsWithOffset):
            self.__tokdict[k] = self.__tokdict.get(k,list()) + [v]
            sub = v[0]
        elif isinstance(k,(int,slice)):
            self.__toklist[k] = v
            sub = v
        else:
            self.__tokdict[k] = self.__tokdict.get(k,list()) + [_ParseResultsWithOffset(v,0)]
            sub = v
        if isinstance(sub,ParseResults):
            sub.__parent = wkref(self)
pyparsing.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __iadd__( self, other ):
        if other.__tokdict:
            offset = len(self.__toklist)
            addoffset = lambda a: offset if a<0 else a+offset
            otheritems = other.__tokdict.items()
            otherdictitems = [(k, _ParseResultsWithOffset(v[0],addoffset(v[1])) )
                                for (k,vlist) in otheritems for v in vlist]
            for k,v in otherdictitems:
                self[k] = v
                if isinstance(v[0],ParseResults):
                    v[0].__parent = wkref(self)

        self.__toklist += other.__toklist
        self.__accumNames.update( other.__accumNames )
        return self
pyparsing.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __setitem__( self, k, v, isinstance=isinstance ):
        if isinstance(v,_ParseResultsWithOffset):
            self.__tokdict[k] = self.__tokdict.get(k,list()) + [v]
            sub = v[0]
        elif isinstance(k,(int,slice)):
            self.__toklist[k] = v
            sub = v
        else:
            self.__tokdict[k] = self.__tokdict.get(k,list()) + [_ParseResultsWithOffset(v,0)]
            sub = v
        if isinstance(sub,ParseResults):
            sub.__parent = wkref(self)
pyparsing.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __iadd__( self, other ):
        if other.__tokdict:
            offset = len(self.__toklist)
            addoffset = lambda a: offset if a<0 else a+offset
            otheritems = other.__tokdict.items()
            otherdictitems = [(k, _ParseResultsWithOffset(v[0],addoffset(v[1])) )
                                for (k,vlist) in otheritems for v in vlist]
            for k,v in otherdictitems:
                self[k] = v
                if isinstance(v[0],ParseResults):
                    v[0].__parent = wkref(self)

        self.__toklist += other.__toklist
        self.__accumNames.update( other.__accumNames )
        return self
pyparsing.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __setstate__(self,state):
        self.__toklist = state[0]
        (self.__tokdict,
         par,
         inAccumNames,
         self.__name) = state[1]
        self.__accumNames = {}
        self.__accumNames.update(inAccumNames)
        if par is not None:
            self.__parent = wkref(par)
        else:
            self.__parent = None
pyparsing.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __setitem__( self, k, v, isinstance=isinstance ):
        if isinstance(v,_ParseResultsWithOffset):
            self.__tokdict[k] = self.__tokdict.get(k,list()) + [v]
            sub = v[0]
        elif isinstance(k,(int,slice)):
            self.__toklist[k] = v
            sub = v
        else:
            self.__tokdict[k] = self.__tokdict.get(k,list()) + [_ParseResultsWithOffset(v,0)]
            sub = v
        if isinstance(sub,ParseResults):
            sub.__parent = wkref(self)
pyparsing.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __iadd__( self, other ):
        if other.__tokdict:
            offset = len(self.__toklist)
            addoffset = lambda a: offset if a<0 else a+offset
            otheritems = other.__tokdict.items()
            otherdictitems = [(k, _ParseResultsWithOffset(v[0],addoffset(v[1])) )
                                for (k,vlist) in otheritems for v in vlist]
            for k,v in otherdictitems:
                self[k] = v
                if isinstance(v[0],ParseResults):
                    v[0].__parent = wkref(self)

        self.__toklist += other.__toklist
        self.__accumNames.update( other.__accumNames )
        return self
test_00_PythonUtils.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_WeakObjectCallback(self):
        """
        Test that the weak object callback occurs as expected.
        """
        obj = TestClass()

        results = set()
        def callback(target):
            results.add(id(target))

        ref = weakobj.objectref(obj, callback)
        expected = set([id(ref)])
        del obj

        self.assertEqual(results, expected)
weakobj.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _make_callback(obj, callback):
    # Wraps the given callback function for use with weakref.ref callbacks,
    # ensuring that a reference cycle is not created between the wrapper object
    # and the weak reference.
    if callback is not None:
        self_ref = weakref.ref(obj)
        def delref(ref):
            callback(self_ref())
        return delref
    else:
        return None
weakobj.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, ref, callback=None):
        self.__ref__ = weakref.ref(ref, _make_callback(self, callback))
weakobj.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __getref__(self):
        ref = self.__ref__()
        if ref is None:
            raise weakref.ReferenceError('weakly-referenced object no longer exists')
        return ref


问题


面经


文章

微信
公众号

扫码关注公众号