def test_free_from_gc(self):
# Check that freeing of blocks by the garbage collector doesn't deadlock
# (issue #12352).
# Make sure the GC is enabled, and set lower collection thresholds to
# make collections more frequent (and increase the probability of
# deadlock).
if not gc.isenabled():
gc.enable()
self.addCleanup(gc.disable)
thresholds = gc.get_threshold()
self.addCleanup(gc.set_threshold, *thresholds)
gc.set_threshold(10)
# perform numerous block allocations, with cyclic references to make
# sure objects are collected asynchronously by the gc
for i in range(5000):
a = multiprocessing.heap.BufferWrapper(1)
b = multiprocessing.heap.BufferWrapper(1)
# circular references
a.buddy = b
b.buddy = a
#
#
#
python类get_threshold()的实例源码
def __init__(self, interval=1.0, debug=False):
self.debug = debug
if debug:
gc.set_debug(gc.DEBUG_LEAK)
self.timer = QtCore.QTimer()
self.timer.timeout.connect(self.check)
self.threshold = gc.get_threshold()
gc.disable()
self.timer.start(interval * 1000)
def __init__(self, interval=1.0, debug=False):
self.debug = debug
if debug:
gc.set_debug(gc.DEBUG_LEAK)
self.timer = QtCore.QTimer()
self.timer.timeout.connect(self.check)
self.threshold = gc.get_threshold()
gc.disable()
self.timer.start(interval * 1000)
def test_del(self):
# __del__ methods can trigger collection, make this to happen
thresholds = gc.get_threshold()
gc.enable()
gc.set_threshold(1)
class A:
def __del__(self):
dir(self)
a = A()
del a
gc.disable()
gc.set_threshold(*thresholds)
def test_del(self):
# __del__ methods can trigger collection, make this to happen
thresholds = gc.get_threshold()
gc.enable()
gc.set_threshold(1)
class A:
def __del__(self):
dir(self)
a = A()
del a
gc.disable()
gc.set_threshold(*thresholds)
def test_del(self):
# __del__ methods can trigger collection, make this to happen
thresholds = gc.get_threshold()
gc.enable()
gc.set_threshold(1)
class A:
def __del__(self):
dir(self)
a = A()
del a
gc.disable()
gc.set_threshold(*thresholds)
def _genObjectStatus():
"""
Generate a report with information about objects allocated, numbers of
references to each object and other information that can be used to
track down memory (object) leaks in the server.
Returns: Object report (string).
"""
T = TRACE()
rep = "NG/AMS SERVER OBJECT STATUS REPORT\n\n"
import gc
gc.set_debug(gc.DEBUG_COLLECTABLE | gc.DEBUG_UNCOLLECTABLE |
gc.DEBUG_INSTANCES | gc.DEBUG_OBJECTS)
rep += "=Garbage Collector Status:\n\n"
rep += "Enabled: %d\n" % gc.isenabled()
rep += "Unreachable objects: %d\n" % gc.collect()
rep += "Threshold: %s\n" % str(gc.get_threshold())
#rep += "Objects:\n"
#rep += str(gc.get_objects()) + "\n"
rep += "Garbage:\n"
rep += str(gc.garbage) + "\n\n"
# Dump refence count status of all objects allocated into file specified.
rep += "=Object Reference Counts:\n\n"
for objInfo in _genRefCountRep():
rep += "%-4d %s\n" % (objInfo[0], objInfo[1])
rep += "\n=EOF"
return rep
def test_del(self):
# __del__ methods can trigger collection, make this to happen
thresholds = gc.get_threshold()
gc.enable()
gc.set_threshold(1)
class A:
def __del__(self):
dir(self)
a = A()
del a
gc.disable()
gc.set_threshold(*thresholds)
def check_gc_during_creation(self, makeref):
if test_support.check_impl_detail():
import gc
thresholds = gc.get_threshold()
gc.set_threshold(1, 1, 1)
gc_collect()
class A:
pass
def callback(*args):
pass
referenced = A()
a = A()
a.a = a
a.wr = makeref(referenced)
try:
# now make sure the object and the ref get labeled as
# cyclic trash:
a = A()
weakref.ref(referenced, callback)
finally:
if test_support.check_impl_detail():
gc.set_threshold(*thresholds)
def test_del(self):
# __del__ methods can trigger collection, make this to happen
thresholds = gc.get_threshold()
gc.enable()
gc.set_threshold(1)
class A:
def __del__(self):
dir(self)
a = A()
del a
gc.disable()
gc.set_threshold(*thresholds)
def test_del(self):
# __del__ methods can trigger collection, make this to happen
thresholds = gc.get_threshold()
gc.enable()
gc.set_threshold(1)
class A:
def __del__(self):
dir(self)
a = A()
del a
gc.disable()
gc.set_threshold(*thresholds)
def test_del(self):
# __del__ methods can trigger collection, make this to happen
thresholds = gc.get_threshold()
gc.enable()
gc.set_threshold(1)
class A:
def __del__(self):
dir(self)
a = A()
del a
gc.disable()
gc.set_threshold(*thresholds)
def test_del(self):
# __del__ methods can trigger collection, make this to happen
thresholds = gc.get_threshold()
gc.enable()
gc.set_threshold(1)
class A:
def __del__(self):
dir(self)
a = A()
del a
gc.disable()
gc.set_threshold(*thresholds)
def test_implicit_parent_with_threads(self):
if not gc.isenabled():
return # cannot test with disabled gc
N = gc.get_threshold()[0]
if N < 50:
return # cannot test with such a small N
def attempt():
lock1 = threading.Lock()
lock1.acquire()
lock2 = threading.Lock()
lock2.acquire()
recycled = [False]
def another_thread():
lock1.acquire() # wait for gc
greenlet.getcurrent() # update ts_current
lock2.release() # release gc
t = threading.Thread(target=another_thread)
t.start()
class gc_callback(object):
def __del__(self):
lock1.release()
lock2.acquire()
recycled[0] = True
class garbage(object):
def __init__(self):
self.cycle = self
self.callback = gc_callback()
l = []
x = range(N*2)
current = greenlet.getcurrent()
g = garbage()
for i in x:
g = None # lose reference to garbage
if recycled[0]:
# gc callback called prematurely
t.join()
return False
last = greenlet()
if recycled[0]:
break # yes! gc called in green_new
l.append(last) # increase allocation counter
else:
# gc callback not called when expected
gc.collect()
if recycled[0]:
t.join()
return False
self.assertEqual(last.parent, current)
for g in l:
self.assertEqual(g.parent, current)
return True
for i in range(5):
if attempt():
break
def collect_metrics(self):
u = resource.getrusage(resource.RUSAGE_SELF)
if gc_.isenabled():
c = list(gc_.get_count())
th = list(gc_.get_threshold())
g = GC(collect0=c[0] if not self.last_collect else c[0] - self.last_collect[0],
collect1=c[1] if not self.last_collect else c[
1] - self.last_collect[1],
collect2=c[2] if not self.last_collect else c[
2] - self.last_collect[2],
threshold0=th[0],
threshold1=th[1],
threshold2=th[2])
thr = t.enumerate()
daemon_threads = len([tr.daemon and tr.is_alive() for tr in thr])
alive_threads = len([not tr.daemon and tr.is_alive() for tr in thr])
dead_threads = len([not tr.is_alive() for tr in thr])
m = Metrics(ru_utime=u[0] if not self.last_usage else u[0] - self.last_usage[0],
ru_stime=u[1] if not self.last_usage else u[1] - self.last_usage[1],
ru_maxrss=u[2],
ru_ixrss=u[3],
ru_idrss=u[4],
ru_isrss=u[5],
ru_minflt=u[6] if not self.last_usage else u[6] - self.last_usage[6],
ru_majflt=u[7] if not self.last_usage else u[7] - self.last_usage[7],
ru_nswap=u[8] if not self.last_usage else u[8] - self.last_usage[8],
ru_inblock=u[9] if not self.last_usage else u[9] - self.last_usage[9],
ru_oublock=u[10] if not self.last_usage else u[10] - self.last_usage[10],
ru_msgsnd=u[11] if not self.last_usage else u[11] - self.last_usage[11],
ru_msgrcv=u[12] if not self.last_usage else u[12] - self.last_usage[12],
ru_nsignals=u[13] if not self.last_usage else u[13] - self.last_usage[13],
ru_nvcs=u[14] if not self.last_usage else u[14] - self.last_usage[14],
ru_nivcsw=u[15] if not self.last_usage else u[15] - self.last_usage[15],
alive_threads=alive_threads,
dead_threads=dead_threads,
daemon_threads=daemon_threads,
gc=g)
self.last_usage = u
if gc_.isenabled():
self.last_collect = c
return m
def main():
# gc.set_debug(gc.DEBUG_STATS)
thv = gc.get_threshold()
gc.set_threshold(10*thv[0], 10*thv[1], 10*thv[2])
filenames = [ ]
desc = None
bench = None
opt = None
send_msg = None
i = 1
while i < len(sys.argv):
if sys.argv[i] == "-in":
desc = sys.argv[i+1]
i += 2
elif sys.argv[i] == "-file":
filenames.append(sys.argv[i+1])
i += 2
elif sys.argv[i] == "-bench":
bench = sys.argv[i+1]
i += 2
elif sys.argv[i] == "-opt":
opt = sys.argv[i+1]
i += 2
elif sys.argv[i] == "-send":
send_msg = sys.argv[i+1]
i += 2
else:
usage()
if send_msg != None:
js = JT65Send()
js.send(send_msg)
sys.exit(0)
if bench != None:
benchmark(bench, True)
sys.exit(0)
if opt != None:
optimize(opt)
sys.exit(0)
if len(filenames) > 0 and desc == None:
r = JT65()
r.verbose = True
for filename in filenames:
r.gowav(filename, 0)
elif len(filenames) == 0 and desc != None:
r = JT65()
r.verbose = True
r.opencard(desc)
r.gocard()
else:
usage()