def format(self, format='%(name)s size=%(size)d flat=%(flat)d',
detail=-1, order_by='size', indent=''):
'''Formats the size information of the object and of all sized
referents as a string.
*format='%(name)s...'* -- specifies the format string per
instance, valid interpolation parameters are 'name', 'size'
and 'flat'
*detail=-1* -- detail level up to which referents are
printed (-1 for unlimited)
*order_by='size'* -- sort order of referents, valid choices
are 'name', 'size' or 'flat'
*indent=''* -- optional indentation
'''
lines = [indent + (format % dict(size=self.size, flat=self.flat,
name=self.name))]
if detail and self.refs:
refs = sorted(self.refs, key=lambda x: getattr(x, order_by),
reverse=order_by in ('size', 'flat'))
lines += [ref.format(format=format, detail=detail-1, order_by=order_by,
indent=indent+' ') for ref in refs]
return '\n'.join(lines)
python类ref()的实例源码
def named_refs(obj, **opts):
"""Returns (a generator for) all named *referents* of an object
(re-using functionality from **asizeof**).
See function **basicsize** for a description of the options.
Does not return un-named *referents*, e.g. objects in a list.
"""
t = _typedefof(obj, **opts)
if t:
r = t.refs
if r and _iscallable(r):
for nr in r(obj, True):
try:
yield nr.name, nr.ref
except AttributeError:
pass
def inc(self):
# Copy these references so on_thread_died needn't close over self
ident = self.ident
_counters = self._counters
tid = ident.get()
_counters.setdefault(tid, 0)
_counters[tid] += 1
if not ident.watching():
# Before the tid is possibly reused, remove it from _counters
def on_thread_died(ref):
ident.unwatch(tid)
_counters.pop(tid, None)
ident.watch(on_thread_died)
return _counters[tid]
def test_task_refcounting(self):
# On CPython, tasks and their arguments should be released immediately
# without waiting for garbage collection.
@gen.engine
def f():
class Foo(object):
pass
arg = Foo()
self.arg_ref = weakref.ref(arg)
task = gen.Task(self.io_loop.add_callback, arg=arg)
self.task_ref = weakref.ref(task)
yield task
self.stop()
self.run_gen(f)
self.assertIs(self.arg_ref(), None)
self.assertIs(self.task_ref(), None)
def test_task_refcounting(self):
# On CPython, tasks and their arguments should be released immediately
# without waiting for garbage collection.
@gen.engine
def f():
class Foo(object):
pass
arg = Foo()
self.arg_ref = weakref.ref(arg)
task = gen.Task(self.io_loop.add_callback, arg=arg)
self.task_ref = weakref.ref(task)
yield task
self.stop()
self.run_gen(f)
self.assertIs(self.arg_ref(), None)
self.assertIs(self.task_ref(), None)
def inc(self):
# Copy these references so on_thread_died needn't close over self
ident = self.ident
_counters = self._counters
tid = ident.get()
_counters.setdefault(tid, 0)
_counters[tid] += 1
if not ident.watching():
# Before the tid is possibly reused, remove it from _counters
def on_thread_died(ref):
ident.unwatch(tid)
_counters.pop(tid, None)
ident.watch(on_thread_died)
return _counters[tid]
def __setitem__( self, k, v, isinstance=isinstance ):
if isinstance(v,_ParseResultsWithOffset):
self.__tokdict[k] = self.__tokdict.get(k,list()) + [v]
sub = v[0]
elif isinstance(k,(int,slice)):
self.__toklist[k] = v
sub = v
else:
self.__tokdict[k] = self.__tokdict.get(k,list()) + [_ParseResultsWithOffset(v,0)]
sub = v
if isinstance(sub,ParseResults):
sub.__parent = wkref(self)
def __iadd__( self, other ):
if other.__tokdict:
offset = len(self.__toklist)
addoffset = lambda a: offset if a<0 else a+offset
otheritems = other.__tokdict.items()
otherdictitems = [(k, _ParseResultsWithOffset(v[0],addoffset(v[1])) )
for (k,vlist) in otheritems for v in vlist]
for k,v in otherdictitems:
self[k] = v
if isinstance(v[0],ParseResults):
v[0].__parent = wkref(self)
self.__toklist += other.__toklist
self.__accumNames.update( other.__accumNames )
return self
def __setstate__(self,state):
self.__toklist = state[0]
(self.__tokdict,
par,
inAccumNames,
self.__name) = state[1]
self.__accumNames = {}
self.__accumNames.update(inAccumNames)
if par is not None:
self.__parent = wkref(par)
else:
self.__parent = None
def __setitem__( self, k, v, isinstance=isinstance ):
if isinstance(v,_ParseResultsWithOffset):
self.__tokdict[k] = self.__tokdict.get(k,list()) + [v]
sub = v[0]
elif isinstance(k,(int,slice)):
self.__toklist[k] = v
sub = v
else:
self.__tokdict[k] = self.__tokdict.get(k,list()) + [_ParseResultsWithOffset(v,0)]
sub = v
if isinstance(sub,ParseResults):
sub.__parent = wkref(self)
def __iadd__( self, other ):
if other.__tokdict:
offset = len(self.__toklist)
addoffset = lambda a: offset if a<0 else a+offset
otheritems = other.__tokdict.items()
otherdictitems = [(k, _ParseResultsWithOffset(v[0],addoffset(v[1])) )
for (k,vlist) in otheritems for v in vlist]
for k,v in otherdictitems:
self[k] = v
if isinstance(v[0],ParseResults):
v[0].__parent = wkref(self)
self.__toklist += other.__toklist
self.__accumNames.update( other.__accumNames )
return self
def __setitem__( self, k, v, isinstance=isinstance ):
if isinstance(v,_ParseResultsWithOffset):
self.__tokdict[k] = self.__tokdict.get(k,list()) + [v]
sub = v[0]
elif isinstance(k,(int,slice)):
self.__toklist[k] = v
sub = v
else:
self.__tokdict[k] = self.__tokdict.get(k,list()) + [_ParseResultsWithOffset(v,0)]
sub = v
if isinstance(sub,ParseResults):
sub.__parent = wkref(self)
def __iadd__( self, other ):
if other.__tokdict:
offset = len(self.__toklist)
addoffset = lambda a: offset if a<0 else a+offset
otheritems = other.__tokdict.items()
otherdictitems = [(k, _ParseResultsWithOffset(v[0],addoffset(v[1])) )
for (k,vlist) in otheritems for v in vlist]
for k,v in otherdictitems:
self[k] = v
if isinstance(v[0],ParseResults):
v[0].__parent = wkref(self)
self.__toklist += other.__toklist
self.__accumNames.update( other.__accumNames )
return self
def __setstate__(self,state):
self.__toklist = state[0]
(self.__tokdict,
par,
inAccumNames,
self.__name) = state[1]
self.__accumNames = {}
self.__accumNames.update(inAccumNames)
if par is not None:
self.__parent = wkref(par)
else:
self.__parent = None
def __setitem__( self, k, v, isinstance=isinstance ):
if isinstance(v,_ParseResultsWithOffset):
self.__tokdict[k] = self.__tokdict.get(k,list()) + [v]
sub = v[0]
elif isinstance(k,(int,slice)):
self.__toklist[k] = v
sub = v
else:
self.__tokdict[k] = self.__tokdict.get(k,list()) + [_ParseResultsWithOffset(v,0)]
sub = v
if isinstance(sub,ParseResults):
sub.__parent = wkref(self)
def __iadd__( self, other ):
if other.__tokdict:
offset = len(self.__toklist)
addoffset = lambda a: offset if a<0 else a+offset
otheritems = other.__tokdict.items()
otherdictitems = [(k, _ParseResultsWithOffset(v[0],addoffset(v[1])) )
for (k,vlist) in otheritems for v in vlist]
for k,v in otherdictitems:
self[k] = v
if isinstance(v[0],ParseResults):
v[0].__parent = wkref(self)
self.__toklist += other.__toklist
self.__accumNames.update( other.__accumNames )
return self
def test_WeakObjectCallback(self):
"""
Test that the weak object callback occurs as expected.
"""
obj = TestClass()
results = set()
def callback(target):
results.add(id(target))
ref = weakobj.objectref(obj, callback)
expected = set([id(ref)])
del obj
self.assertEqual(results, expected)
def _make_callback(obj, callback):
# Wraps the given callback function for use with weakref.ref callbacks,
# ensuring that a reference cycle is not created between the wrapper object
# and the weak reference.
if callback is not None:
self_ref = weakref.ref(obj)
def delref(ref):
callback(self_ref())
return delref
else:
return None
def __init__(self, ref, callback=None):
self.__ref__ = weakref.ref(ref, _make_callback(self, callback))
def __getref__(self):
ref = self.__ref__()
if ref is None:
raise weakref.ReferenceError('weakly-referenced object no longer exists')
return ref