def testBenchmark():
import time
def printThreadNum():
import gc
from greenlet import greenlet
objs = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)]
print "Greenlets: %s" % len(objs)
printThreadNum()
test = TestNoblock()
s = time.time()
for i in range(3):
gevent.spawn(test.count, i + 1)
print "Created in %.3fs" % (time.time() - s)
printThreadNum()
time.sleep(5)
python类get_objects()的实例源码
def make_gc_snapShot(filename, name):
"""Append the signatures to a file, giving them the given
'name'. A signature is a pair object_id / type_name"""
global first_time
if first_time:
gc.collect()
first_time = False
contents = []
for o in gc.get_objects():
try:
tname = o.__class__.__name__
except AttributeError:
tname = str(type(o))
contents.append((id(o), tname))
del tname
f = open(filename, 'a')
pickle.dump((name, contents), f)
f.close()
del contents
del f
def reach(self, ids):
"""
\param ids Iterable of object id, as returned by x[0],
with x in the result of (snapshot2 - snapshot1)
Return a dict id -> object with that id currently known.
The objects recorded with these id might have been
replaced by new ones... so we might end-up seeing objects
that don't correspond to the original ones. This is
especially true after a gc.collect()
"""
result = dict()
for obj in gc.get_objects():
if id(obj) in ids:
result[id(obj)] = obj
return result
def memory_dump():
import gc
x = 0
for obj in gc.get_objects():
i = id(obj)
size = sys.getsizeof(obj, 0)
# referrers = [id(o) for o in gc.get_referrers(obj)]
try:
cls = str(obj.__class__)
except:
cls = "<no class>"
if size > 1024 * 50:
referents = set([id(o) for o in gc.get_referents(obj)])
x += 1
print(x, {'id': i,
'class': cls,
'size': size,
"ref": len(referents)})
#if len(referents) < 2000:
#print(obj)
def add_file(self, filename):
# make sure this file is closed before opening it again, h5py doesn't
# handle this well otherwise
for obj in gc.get_objects():
if isinstance(obj, h5py.File):
try:
if obj.filename == filename:
print("Closing previously open %s"%filename)
obj.close()
except:
pass
f = h5py.File(filename, 'r')
self.files.append(f)
self.__traverse_add(f, filename)
def _green_existing_locks():
"""Make locks created before monkey-patching safe.
RLocks rely on a Lock and on Python 2, if an unpatched Lock blocks, it
blocks the native thread. We need to replace these with green Locks.
This was originally noticed in the stdlib logging module."""
import gc
import threading
import eventlet.green.thread
lock_type = type(threading.Lock())
rlock_type = type(threading.RLock())
if sys.version_info[0] >= 3:
pyrlock_type = type(threading._PyRLock())
# We're monkey-patching so there can't be any greenlets yet, ergo our thread
# ID is the only valid owner possible.
tid = eventlet.green.thread.get_ident()
for obj in gc.get_objects():
if isinstance(obj, rlock_type):
if (sys.version_info[0] == 2 and
isinstance(obj._RLock__block, lock_type)):
_fix_py2_rlock(obj, tid)
elif (sys.version_info[0] >= 3 and
not isinstance(obj, pyrlock_type)):
_fix_py3_rlock(obj)
def getactors(tag=None):
if tag is not None:
if tag in game_info.tagged_actors:
return game_info.tagged_actors[tag]
else:
print('DEBUG: tag not in dictionary')
return []
else:
return ACTORS
# tagged_actors_list = []
# for obj in gc.get_objects():
# if isinstance(obj, Actor):
# if tag is None:
# tagged_actors_list.append(obj)
# elif tag is str:
# if tag in obj.tags:
# tagged_actors_list.append(obj)
# return tagged_actors_list
def test_leakage_from_tracebacks(self):
tpool.execute(noop) # get it started
gc.collect()
initial_objs = len(gc.get_objects())
for i in range(10):
self.assertRaises(RuntimeError, tpool.execute, raise_exception)
gc.collect()
middle_objs = len(gc.get_objects())
# some objects will inevitably be created by the previous loop
# now we test to ensure that running the loop an order of
# magnitude more doesn't generate additional objects
for i in six.moves.range(100):
self.assertRaises(RuntimeError, tpool.execute, raise_exception)
first_created = middle_objs - initial_objs
gc.collect()
second_created = len(gc.get_objects()) - middle_objs
self.assert_(second_created - first_created < 10,
"first loop: %s, second loop: %s" % (first_created,
second_created))
tpool.killall()
def _green_existing_locks():
"""Make locks created before monkey-patching safe.
RLocks rely on a Lock and on Python 2, if an unpatched Lock blocks, it
blocks the native thread. We need to replace these with green Locks.
This was originally noticed in the stdlib logging module."""
import gc
import threading
import eventlet.green.thread
lock_type = type(threading.Lock())
rlock_type = type(threading.RLock())
if sys.version_info[0] >= 3:
pyrlock_type = type(threading._PyRLock())
# We're monkey-patching so there can't be any greenlets yet, ergo our thread
# ID is the only valid owner possible.
tid = eventlet.green.thread.get_ident()
for obj in gc.get_objects():
if isinstance(obj, rlock_type):
if (sys.version_info[0] == 2 and
isinstance(obj._RLock__block, lock_type)):
_fix_py2_rlock(obj, tid)
elif (sys.version_info[0] >= 3 and
not isinstance(obj, pyrlock_type)):
_fix_py3_rlock(obj)
def begin(self):
self.create_initial_summary()
if self.detect_leaked_mocks:
# Record pre-existing mocks
gc.collect()
self.known_mocks = list(KnownMock(weakref.ref(m), None, None)
for m in gc.get_objects() if isinstance(m, mock.Mock))
self.previous_mock_refs = list(m.mock_ref for m in self.known_mocks)
if self.patch_mock:
detector = self
def decorator(f):
@functools.wraps(f)
def wrapper(new_mock, *args, **kwargs):
f(new_mock, *args, **kwargs)
detector.register_mock(new_mock, detector.level_name.get(LEVEL_TEST))
return wrapper
Base.__init__ = decorator(Base.__init__)
def listObjs(regex='Q', typ=None):
"""List all objects managed by python gc with class name matching regex.
Finds 'Q...' classes by default."""
if typ is not None:
return [x for x in gc.get_objects() if isinstance(x, typ)]
else:
return [x for x in gc.get_objects() if re.match(regex, type(x).__name__)]
def get_all_objects():
"""Return a list of all live Python objects (excluding int and long), not including the list itself."""
gc.collect()
gcl = gc.get_objects()
olist = {}
_getr(gcl, olist)
del olist[id(olist)]
del olist[id(gcl)]
del olist[id(sys._getframe())]
return olist
def cleanup():
global _cleanupCalled
if _cleanupCalled:
return
if not getConfigOption('exitCleanup'):
return
ViewBox.quit() ## tell ViewBox that it doesn't need to deregister views anymore.
## Workaround for Qt exit crash:
## ALL QGraphicsItems must have a scene before they are deleted.
## This is potentially very expensive, but preferred over crashing.
## Note: this appears to be fixed in PySide as of 2012.12, but it should be left in for a while longer..
if QtGui.QApplication.instance() is None:
return
import gc
s = QtGui.QGraphicsScene()
for o in gc.get_objects():
try:
if isinstance(o, QtGui.QGraphicsItem) and isQObjectAlive(o) and o.scene() is None:
if getConfigOption('crashWarning'):
sys.stderr.write('Error: graphics item without scene. '
'Make sure ViewBox.close() and GraphicsView.close() '
'are properly called before app shutdown (%s)\n' % (o,))
s.addItem(o)
except RuntimeError: ## occurs if a python wrapper no longer has its underlying C++ object
continue
_cleanupCalled = True
def listObjs(regex='Q', typ=None):
"""List all objects managed by python gc with class name matching regex.
Finds 'Q...' classes by default."""
if typ is not None:
return [x for x in gc.get_objects() if isinstance(x, typ)]
else:
return [x for x in gc.get_objects() if re.match(regex, type(x).__name__)]
def get_all_objects():
"""Return a list of all live Python objects (excluding int and long), not including the list itself."""
gc.collect()
gcl = gc.get_objects()
olist = {}
_getr(gcl, olist)
del olist[id(olist)]
del olist[id(gcl)]
del olist[id(sys._getframe())]
return olist
def test_markup_leaks(self):
counts = set()
for count in range(20):
for item in range(1000):
escape("foo")
escape("<foo>")
escape(u"foo")
escape(u"<foo>")
counts.add(len(gc.get_objects()))
assert len(counts) == 1, 'ouch, c extension seems to leak objects'
def __enter__(self):
gc.disable()
_gc_lock.acquire()
loc = flask._request_ctx_stack._local
# Force Python to track this dictionary at all times.
# This is necessary since Python only starts tracking
# dicts if they contain mutable objects. It's a horrible,
# horrible hack but makes this kinda testable.
loc.__storage__['FOOO'] = [1, 2, 3]
gc.collect()
self.old_objects = len(gc.get_objects())
def __exit__(self, exc_type, exc_value, tb):
if not hasattr(sys, 'getrefcount'):
gc.collect()
new_objects = len(gc.get_objects())
if new_objects > self.old_objects:
self.testcase.fail('Example code leaked')
_gc_lock.release()
gc.enable()
def _get_objects(self):
return gc.get_objects()
def test_markup_leaks(self):
counts = set()
for count in range(20):
for item in range(1000):
escape("foo")
escape("<foo>")
escape(u"foo")
escape(u"<foo>")
counts.add(len(gc.get_objects()))
assert len(counts) == 1, 'ouch, c extension seems to leak objects'
def _find_objects(t):
return [o for o in gc.get_objects() if isinstance(o, t)]
def test_markup_leaks(self):
counts = set()
for count in range(20):
for item in range(1000):
escape("foo")
escape("<foo>")
escape(u"foo")
escape(u"<foo>")
counts.add(len(gc.get_objects()))
assert len(counts) == 1, 'ouch, c extension seems to leak objects'
def __enter__(self):
gc.disable()
_gc_lock.acquire()
loc = flask._request_ctx_stack._local
# Force Python to track this dictionary at all times.
# This is necessary since Python only starts tracking
# dicts if they contain mutable objects. It's a horrible,
# horrible hack but makes this kinda testable.
loc.__storage__['FOOO'] = [1, 2, 3]
gc.collect()
self.old_objects = len(gc.get_objects())
def __exit__(self, exc_type, exc_value, tb):
if not hasattr(sys, 'getrefcount'):
gc.collect()
new_objects = len(gc.get_objects())
if new_objects > self.old_objects:
self.testcase.fail('Example code leaked')
_gc_lock.release()
gc.enable()
def test_issue_7959(self):
proto = self.functype.im_func(None)
class X(object):
def func(self): pass
def __init__(self):
self.v = proto(self.func)
import gc
for i in range(32):
X()
gc.collect()
live = [x for x in gc.get_objects()
if isinstance(x, X)]
self.assertEqual(len(live), 0)
def actionDumpobj(self):
import gc
import sys
self.sendHeader()
# No more if not in debug mode
if not config.debug:
yield "Not in debug mode"
raise StopIteration
class_filter = self.get.get("class")
yield """
<style>
* { font-family: monospace; white-space: pre }
table * { text-align: right; padding: 0px 10px }
</style>
"""
objs = gc.get_objects()
for obj in objs:
obj_type = str(type(obj))
if obj_type != "<type 'instance'>" or obj.__class__.__name__ != class_filter:
continue
yield "%.1fkb %s... " % (float(sys.getsizeof(obj)) / 1024, cgi.escape(str(obj)))
for attr in dir(obj):
yield "- %s: %s<br>" % (attr, cgi.escape(str(getattr(obj, attr))))
yield "<br>"
gc.collect() # Implicit grabage collection
def test_errobj_reference_leak(self, level=rlevel):
# Ticket #955
with np.errstate(all="ignore"):
z = int(0)
p = np.int32(-1)
gc.collect()
n_before = len(gc.get_objects())
z**p # this shouldn't leak a reference to errobj
gc.collect()
n_after = len(gc.get_objects())
assert_(n_before >= n_after, (n_before, n_after))
def test_markup_leaks(self):
counts = set()
for count in range(20):
for item in range(1000):
escape("foo")
escape("<foo>")
escape(u"foo")
escape(u"<foo>")
counts.add(len(gc.get_objects()))
assert len(counts) == 1, 'ouch, c extension seems to leak objects'
def __enter__(self):
gc.disable()
_gc_lock.acquire()
loc = flask._request_ctx_stack._local
# Force Python to track this dictionary at all times.
# This is necessary since Python only starts tracking
# dicts if they contain mutable objects. It's a horrible,
# horrible hack but makes this kinda testable.
loc.__storage__['FOOO'] = [1, 2, 3]
gc.collect()
self.old_objects = len(gc.get_objects())
def __exit__(self, exc_type, exc_value, tb):
if not hasattr(sys, 'getrefcount'):
gc.collect()
new_objects = len(gc.get_objects())
if new_objects > self.old_objects:
self.testcase.fail('Example code leaked')
_gc_lock.release()
gc.enable()