def test_1(self):
from sys import getrefcount as grc
f = dll._testfunc_callback_i_if
f.restype = ctypes.c_int
f.argtypes = [ctypes.c_int, MyCallback]
def callback(value):
#print "called back with", value
return value
self.assertEqual(grc(callback), 2)
cb = MyCallback(callback)
self.assertGreater(grc(callback), 2)
result = f(-10, cb)
self.assertEqual(result, -18)
cb = None
gc.collect()
self.assertEqual(grc(callback), 2)
python类getrefcount()的实例源码
def _getRefCounts():
"""
Return information about all object allocated and the number of
references pointing to each object. This can be used to check a running
server for memory (object) leaks.
Taken from: http://www.nightmare.com/medusa/memory-leaks.html.
Returns:
"""
T = TRACE()
d = {}
sys.modules
# Collect all classes
for m in sys.modules.values():
for sym in dir(m):
o = getattr (m, sym)
if type(o) is types.ClassType:
d[o] = sys.getrefcount (o)
# sort by refcount
pairs = map(lambda x: (x[1],x[0]), d.items())
pairs.sort()
pairs.reverse()
return pairs
def test_bug1229429(self):
# this bug was never in reversed, it was in
# PyObject_CallMethod, and reversed_new calls that sometimes.
if not hasattr(sys, "getrefcount"):
return
def f():
pass
r = f.__reversed__ = object()
rc = sys.getrefcount(r)
for i in range(10):
try:
reversed(f)
except TypeError:
pass
else:
self.fail("non-callable __reversed__ didn't raise!")
self.assertEqual(rc, sys.getrefcount(r))
def test_ass_subscript (self):
for bpp in (8, 16, 24, 32):
sf = pygame.Surface ((6, 8), 0, bpp)
sf.fill ((255, 255, 255))
ar = pygame.PixelArray (sf)
# Test ellipse working
ar[...,...] = (0, 0, 0)
self.assertEqual (ar[0,0], 0)
self.assertEqual (ar[1,0], 0)
self.assertEqual (ar[-1,-1], 0)
ar[...,] = (0, 0, 255)
self.assertEqual (ar[0,0], sf.map_rgb ((0, 0, 255)))
self.assertEqual (ar[1,0], sf.map_rgb ((0, 0, 255)))
self.assertEqual (ar[-1,-1], sf.map_rgb ((0, 0, 255)))
ar[:,...] = (255, 0, 0)
self.assertEqual (ar[0,0], sf.map_rgb ((255, 0, 0)))
self.assertEqual (ar[1,0], sf.map_rgb ((255, 0, 0)))
self.assertEqual (ar[-1,-1], sf.map_rgb ((255, 0, 0)))
ar[...] = (0, 255, 0)
self.assertEqual (ar[0,0], sf.map_rgb ((0, 255, 0)))
self.assertEqual (ar[1,0], sf.map_rgb ((0, 255, 0)))
self.assertEqual (ar[-1,-1], sf.map_rgb ((0, 255, 0)))
# Ensure x and y are freed for array[x, y] = p
# Bug fix: reference counting
if hasattr(sys, 'getrefcount'):
class Int(int):
"""Unique int instances"""
pass
sf = pygame.Surface ((2, 2), 0, 32)
ar = pygame.PixelArray (sf)
x, y = Int(0), Int(1)
rx_before, ry_before = sys.getrefcount (x), sys.getrefcount (y)
ar[x, y] = 0
rx_after, ry_after = sys.getrefcount (x), sys.getrefcount (y)
self.assertEqual (rx_after, rx_before)
self.assertEqual (ry_after, ry_before)
def test_GetView_array_struct(self):
from pygame.bufferproxy import BufferProxy
class Exporter(self.ExporterBase):
def __init__(self, shape, typechar, itemsize):
super(Exporter, self).__init__(shape, typechar, itemsize)
self.view = BufferProxy(self.__dict__)
def get__array_struct__(self):
return self.view.__array_struct__
__array_struct__ = property(get__array_struct__)
# Should not cause PgObject_GetBuffer to fail
__array_interface__ = property(lambda self: None)
_shape = [2, 3, 5, 7, 11] # Some prime numbers
for ndim in range(1, len(_shape)):
o = Exporter(_shape[0:ndim], 'i', 2)
v = BufferProxy(o)
self.assertSame(v, o)
ndim = 2
shape = _shape[0:ndim]
for typechar in ('i', 'u'):
for itemsize in (1, 2, 4, 8):
o = Exporter(shape, typechar, itemsize)
v = BufferProxy(o)
self.assertSame(v, o)
for itemsize in (4, 8):
o = Exporter(shape, 'f', itemsize)
v = BufferProxy(o)
self.assertSame(v, o)
# Check returned cobject/capsule reference count
try:
from sys import getrefcount
except ImportError:
# PyPy: no reference counting
pass
else:
o = Exporter(shape, typechar, itemsize)
self.assertEqual(getrefcount(o.__array_struct__), 1)
def skip_if_no_getrefcount(f):
@wraps(f)
def skip_if_no_getrefcount_(self):
if not hasattr(sys, 'getrefcount'):
return self.skipTest('skipped, no sys.getrefcount()')
else:
return f(self)
return skip_if_no_getrefcount_
def test_mogrify_leak_on_multiple_reference(self):
# issue #81: reference leak when a parameter value is referenced
# more than once from a dict.
cur = self.conn.cursor()
foo = (lambda x: x)('foo') * 10
import sys
nref1 = sys.getrefcount(foo)
cur.mogrify("select %(foo)s, %(foo)s, %(foo)s", {'foo': foo})
nref2 = sys.getrefcount(foo)
self.assertEqual(nref1, nref2)
def skip_if_no_getrefcount(f):
@wraps(f)
def skip_if_no_getrefcount_(self):
if not hasattr(sys, 'getrefcount'):
return self.skipTest('skipped, no sys.getrefcount()')
else:
return f(self)
return skip_if_no_getrefcount_
def test_mogrify_leak_on_multiple_reference(self):
# issue #81: reference leak when a parameter value is referenced
# more than once from a dict.
cur = self.conn.cursor()
foo = (lambda x: x)('foo') * 10
import sys
nref1 = sys.getrefcount(foo)
cur.mogrify("select %(foo)s, %(foo)s, %(foo)s", {'foo': foo})
nref2 = sys.getrefcount(foo)
self.assertEqual(nref1, nref2)
def await_gc(obj, rc):
"""wait for refcount on an object to drop to an expected value
Necessary because of the zero-copy gc thread,
which can take some time to receive its DECREF message.
"""
for i in range(50):
# rc + 2 because of the refs in this function
if grc(obj) <= rc + 2:
return
time.sleep(0.05)
def test_above_30(self):
"""Message above 30 bytes are never copied by 0MQ."""
for i in range(5, 16): # 32, 64,..., 65536
s = (2**i)*x
self.assertEqual(grc(s), 2)
m = zmq.Frame(s)
self.assertEqual(grc(s), 4)
del m
await_gc(s, 2)
self.assertEqual(grc(s), 2)
del s
def test_lifecycle1(self):
"""Run through a ref counting cycle with a copy."""
for i in range(5, 16): # 32, 64,..., 65536
s = (2**i)*x
rc = 2
self.assertEqual(grc(s), rc)
m = zmq.Frame(s)
rc += 2
self.assertEqual(grc(s), rc)
m2 = copy.copy(m)
rc += 1
self.assertEqual(grc(s), rc)
buf = m2.buffer
rc += view_rc
self.assertEqual(grc(s), rc)
self.assertEqual(s, b(str(m)))
self.assertEqual(s, bytes(m2))
self.assertEqual(s, m.bytes)
# self.assert_(s is str(m))
# self.assert_(s is str(m2))
del m2
rc -= 1
self.assertEqual(grc(s), rc)
rc -= view_rc
del buf
self.assertEqual(grc(s), rc)
del m
rc -= 2
await_gc(s, rc)
self.assertEqual(grc(s), rc)
self.assertEqual(rc, 2)
del s
def test_lifecycle2(self):
"""Run through a different ref counting cycle with a copy."""
for i in range(5, 16): # 32, 64,..., 65536
s = (2**i)*x
rc = 2
self.assertEqual(grc(s), rc)
m = zmq.Frame(s)
rc += 2
self.assertEqual(grc(s), rc)
m2 = copy.copy(m)
rc += 1
self.assertEqual(grc(s), rc)
buf = m.buffer
rc += view_rc
self.assertEqual(grc(s), rc)
self.assertEqual(s, b(str(m)))
self.assertEqual(s, bytes(m2))
self.assertEqual(s, m2.bytes)
self.assertEqual(s, m.bytes)
# self.assert_(s is str(m))
# self.assert_(s is str(m2))
del buf
self.assertEqual(grc(s), rc)
del m
# m.buffer is kept until m is del'd
rc -= view_rc
rc -= 1
self.assertEqual(grc(s), rc)
del m2
rc -= 2
await_gc(s, rc)
self.assertEqual(grc(s), rc)
self.assertEqual(rc, 2)
del s
def view_get_refcount(self, perspective):
return sys.getrefcount(self)
def test03_leak_assignment(self) :
a = "example of private object"
refcount = sys.getrefcount(a)
self.obj.set_private(a)
self.assertEqual(refcount+1, sys.getrefcount(a))
self.obj.set_private(None)
self.assertEqual(refcount, sys.getrefcount(a))
def test04_leak_GC(self) :
a = "example of private object"
refcount = sys.getrefcount(a)
self.obj.set_private(a)
self.obj = None
self.assertEqual(refcount, sys.getrefcount(a))
def test_PyString_FromString(self):
pythonapi.PyString_FromString.restype = py_object
pythonapi.PyString_FromString.argtypes = (c_char_p,)
s = "abc"
refcnt = grc(s)
pyob = pythonapi.PyString_FromString(s)
self.assertEqual(grc(s), refcnt)
self.assertEqual(s, pyob)
del pyob
self.assertEqual(grc(s), refcnt)
# This test is unreliable, because it is possible that code in
# unittest changes the refcount of the '42' integer. So, it
# is disabled by default.
def test_PyObj_FromPtr(self):
s = "abc def ghi jkl"
ref = grc(s)
# id(python-object) is the address
pyobj = PyObj_FromPtr(id(s))
self.assertIs(s, pyobj)
self.assertEqual(grc(s), ref + 1)
del pyobj
self.assertEqual(grc(s), ref)
def test_pyobject(self):
o = ()
from sys import getrefcount as grc
for o in (), [], object():
initial = grc(o)
# This call leaks a reference to 'o'...
self.check_type(py_object, o)
before = grc(o)
# ...but this call doesn't leak any more. Where is the refcount?
self.check_type(py_object, o)
after = grc(o)
self.assertEqual((after, o), (before, o))
def test_X(self):
class X(Structure):
_fields_ = [("p", POINTER(c_char_p))]
x = X()
i = c_char_p("abc def")
from sys import getrefcount as grc
print "2?", grc(i)
x.p = pointer(i)
print "3?", grc(i)
for i in range(320):
c_int(99)
x.p[0]
print x.p[0]
## del x
## print "2?", grc(i)
## del i
import gc
gc.collect()
for i in range(320):
c_int(99)
x.p[0]
print x.p[0]
print x.p.contents
## print x._objects
x.p[0] = "spam spam"
## print x.p[0]
print "+" * 42
print x._objects