def check(self):
#return self.debug_cycles() # uncomment to just debug cycles
l0, l1, l2 = gc.get_count()
if self.debug:
print('gc_check called:', l0, l1, l2)
if l0 > self.threshold[0]:
num = gc.collect(0)
if self.debug:
print('collecting gen 0, found: %d unreachable' % num)
if l1 > self.threshold[1]:
num = gc.collect(1)
if self.debug:
print('collecting gen 1, found: %d unreachable' % num)
if l2 > self.threshold[2]:
num = gc.collect(2)
if self.debug:
print('collecting gen 2, found: %d unreachable' % num)
python类get_count()的实例源码
def check(self):
#return self.debug_cycles() # uncomment to just debug cycles
l0, l1, l2 = gc.get_count()
if self.debug:
print('gc_check called:', l0, l1, l2)
if l0 > self.threshold[0]:
num = gc.collect(0)
if self.debug:
print('collecting gen 0, found: %d unreachable' % num)
if l1 > self.threshold[1]:
num = gc.collect(1)
if self.debug:
print('collecting gen 1, found: %d unreachable' % num)
if l2 > self.threshold[2]:
num = gc.collect(2)
if self.debug:
print('collecting gen 2, found: %d unreachable' % num)
def test_del_newclass(self):
# __del__ methods can trigger collection, make this to happen
thresholds = gc.get_threshold()
gc.enable()
gc.set_threshold(1)
class A(object):
def __del__(self):
dir(self)
a = A()
del a
gc.disable()
gc.set_threshold(*thresholds)
# The following two tests are fragile:
# They precisely count the number of allocations,
# which is highly implementation-dependent.
# For example, disposed tuples are not freed, but reused.
# To minimize variations, though, we first store the get_count() results
# and check them at the end.
def test_collect_generations(self):
gc.collect()
# This object will "trickle" into generation N + 1 after
# each call to collect(N)
x = []
gc.collect(0)
# x is now in gen 1
a, b, c = gc.get_count()
gc.collect(1)
# x is now in gen 2
d, e, f = gc.get_count()
gc.collect(2)
# x is now in gen 3
g, h, i = gc.get_count()
# We don't check a, d, g since their exact values depends on
# internal implementation details of the interpreter.
self.assertEqual((b, c), (1, 0))
self.assertEqual((e, f), (0, 1))
self.assertEqual((h, i), (0, 0))
def test_del_newclass(self):
# __del__ methods can trigger collection, make this to happen
thresholds = gc.get_threshold()
gc.enable()
gc.set_threshold(1)
class A(object):
def __del__(self):
dir(self)
a = A()
del a
gc.disable()
gc.set_threshold(*thresholds)
# The following two tests are fragile:
# They precisely count the number of allocations,
# which is highly implementation-dependent.
# For example, disposed tuples are not freed, but reused.
# To minimize variations, though, we first store the get_count() results
# and check them at the end.
def test_collect_generations(self):
gc.collect()
# This object will "trickle" into generation N + 1 after
# each call to collect(N)
x = []
gc.collect(0)
# x is now in gen 1
a, b, c = gc.get_count()
gc.collect(1)
# x is now in gen 2
d, e, f = gc.get_count()
gc.collect(2)
# x is now in gen 3
g, h, i = gc.get_count()
# We don't check a, d, g since their exact values depends on
# internal implementation details of the interpreter.
self.assertEqual((b, c), (1, 0))
self.assertEqual((e, f), (0, 1))
self.assertEqual((h, i), (0, 0))
def check(self):
"""
Method called by the garbage collector timer to check if there is
something to collect.
"""
# return self.debug_cycles() # uncomment to just debug cycles
l0, l1, l2 = gc.get_count()
if self.debug:
logger.debug('gc_check called: {0} {1} {2}'.format(l0, l1, l2))
if l0 > self.threshold[0]:
num = gc.collect(0)
if self.debug:
logger.debug('collecting gen 0, found: {0:d} unreachable'
''.format(num))
if l1 > self.threshold[1]:
num = gc.collect(1)
if self.debug:
logger.debug('collecting gen 1, found: {0:d} unreachable'
''.format(num))
if l2 > self.threshold[2]:
num = gc.collect(2)
if self.debug:
logger.debug('collecting gen 2, found: {0:d} '
'unreachable'.format(num))
def test_del_newclass(self):
# __del__ methods can trigger collection, make this to happen
thresholds = gc.get_threshold()
gc.enable()
gc.set_threshold(1)
class A(object):
def __del__(self):
dir(self)
a = A()
del a
gc.disable()
gc.set_threshold(*thresholds)
# The following two tests are fragile:
# They precisely count the number of allocations,
# which is highly implementation-dependent.
# For example, disposed tuples are not freed, but reused.
# To minimize variations, though, we first store the get_count() results
# and check them at the end.
def test_collect_generations(self):
gc.collect()
# This object will "trickle" into generation N + 1 after
# each call to collect(N)
x = []
gc.collect(0)
# x is now in gen 1
a, b, c = gc.get_count()
gc.collect(1)
# x is now in gen 2
d, e, f = gc.get_count()
gc.collect(2)
# x is now in gen 3
g, h, i = gc.get_count()
# We don't check a, d, g since their exact values depends on
# internal implementation details of the interpreter.
self.assertEqual((b, c), (1, 0))
self.assertEqual((e, f), (0, 1))
self.assertEqual((h, i), (0, 0))
def test_del_newclass(self):
# __del__ methods can trigger collection, make this to happen
thresholds = gc.get_threshold()
gc.enable()
gc.set_threshold(1)
class A(object):
def __del__(self):
dir(self)
a = A()
del a
gc.disable()
gc.set_threshold(*thresholds)
# The following two tests are fragile:
# They precisely count the number of allocations,
# which is highly implementation-dependent.
# For example, disposed tuples are not freed, but reused.
# To minimize variations, though, we first store the get_count() results
# and check them at the end.
def test_collect_generations(self):
gc.collect()
# This object will "trickle" into generation N + 1 after
# each call to collect(N)
x = []
gc.collect(0)
# x is now in gen 1
a, b, c = gc.get_count()
gc.collect(1)
# x is now in gen 2
d, e, f = gc.get_count()
gc.collect(2)
# x is now in gen 3
g, h, i = gc.get_count()
# We don't check a, d, g since their exact values depends on
# internal implementation details of the interpreter.
self.assertEqual((b, c), (1, 0))
self.assertEqual((e, f), (0, 1))
self.assertEqual((h, i), (0, 0))
def test_get_count(self):
gc.collect()
a, b, c = gc.get_count()
x = []
d, e, f = gc.get_count()
self.assertEqual((b, c), (0, 0))
self.assertEqual((e, f), (0, 0))
# This is less fragile than asserting that a equals 0.
self.assertLess(a, 5)
# Between the two calls to get_count(), at least one object was
# created (the list).
self.assertGreater(d, a)
def test_get_count(self):
# Avoid future allocation of method object
assertEqual = self._baseAssertEqual
gc.collect()
assertEqual(gc.get_count(), (0, 0, 0))
a = dict()
# since gc.collect(), we created two objects:
# the dict, and the tuple returned by get_count()
assertEqual(gc.get_count(), (2, 0, 0))
def test_collect_generations(self):
# Avoid future allocation of method object
assertEqual = self.assertEqual
gc.collect()
a = dict()
gc.collect(0)
assertEqual(gc.get_count(), (0, 1, 0))
gc.collect(1)
assertEqual(gc.get_count(), (0, 0, 1))
gc.collect(2)
assertEqual(gc.get_count(), (0, 0, 0))
def test_get_count(self):
# Avoid future allocation of method object
assertEqual = self._baseAssertEqual
gc.collect()
assertEqual(gc.get_count(), (0, 0, 0))
a = dict()
# since gc.collect(), we created two objects:
# the dict, and the tuple returned by get_count()
assertEqual(gc.get_count(), (2, 0, 0))
def test_collect_generations(self):
# Avoid future allocation of method object
assertEqual = self.assertEqual
gc.collect()
a = dict()
gc.collect(0)
assertEqual(gc.get_count(), (0, 1, 0))
gc.collect(1)
assertEqual(gc.get_count(), (0, 0, 1))
gc.collect(2)
assertEqual(gc.get_count(), (0, 0, 0))
def test_get_count(self):
gc.collect()
a, b, c = gc.get_count()
x = []
d, e, f = gc.get_count()
self.assertEqual((b, c), (0, 0))
self.assertEqual((e, f), (0, 0))
# This is less fragile than asserting that a equals 0.
self.assertLess(a, 5)
# Between the two calls to get_count(), at least one object was
# created (the list).
self.assertGreater(d, a)
def test_get_count(self):
# Avoid future allocation of method object
assertEqual = self._baseAssertEqual
gc.collect()
assertEqual(gc.get_count(), (0, 0, 0))
a = dict()
# since gc.collect(), we created two objects:
# the dict, and the tuple returned by get_count()
assertEqual(gc.get_count(), (2, 0, 0))
def test_collect_generations(self):
# Avoid future allocation of method object
assertEqual = self.assertEqual
gc.collect()
a = dict()
gc.collect(0)
assertEqual(gc.get_count(), (0, 1, 0))
gc.collect(1)
assertEqual(gc.get_count(), (0, 0, 1))
gc.collect(2)
assertEqual(gc.get_count(), (0, 0, 0))
def test_get_count(self):
gc.collect()
a, b, c = gc.get_count()
x = []
d, e, f = gc.get_count()
self.assertEqual((b, c), (0, 0))
self.assertEqual((e, f), (0, 0))
# This is less fragile than asserting that a equals 0.
self.assertLess(a, 5)
# Between the two calls to get_count(), at least one object was
# created (the list).
self.assertGreater(d, a)
def test_get_count(self):
# Avoid future allocation of method object
assertEqual = self._baseAssertEqual
gc.collect()
assertEqual(gc.get_count(), (0, 0, 0))
a = dict()
# since gc.collect(), we created two objects:
# the dict, and the tuple returned by get_count()
assertEqual(gc.get_count(), (2, 0, 0))
def test_collect_generations(self):
# Avoid future allocation of method object
assertEqual = self.assertEqual
gc.collect()
a = dict()
gc.collect(0)
assertEqual(gc.get_count(), (0, 1, 0))
gc.collect(1)
assertEqual(gc.get_count(), (0, 0, 1))
gc.collect(2)
assertEqual(gc.get_count(), (0, 0, 0))
def test_get_count(self):
gc.collect()
a, b, c = gc.get_count()
x = []
d, e, f = gc.get_count()
self.assertEqual((b, c), (0, 0))
self.assertEqual((e, f), (0, 0))
# This is less fragile than asserting that a equals 0.
self.assertLess(a, 5)
# Between the two calls to get_count(), at least one object was
# created (the list).
self.assertGreater(d, a)
def report(self):
# CPU
if not runtime_info.OS_WIN:
cpu_time = read_cpu_time()
if cpu_time != None:
cpu_time_metric = self.report_metric(Metric.TYPE_COUNTER, Metric.CATEGORY_CPU, Metric.NAME_CPU_TIME, Metric.UNIT_NANOSECOND, cpu_time)
if cpu_time_metric.has_measurement():
cpu_usage = (cpu_time_metric.measurement.value / (60 * 1e9)) * 100
try:
cpu_usage = cpu_usage / multiprocessing.cpu_count()
except Exception:
pass
self.report_metric(Metric.TYPE_STATE, Metric.CATEGORY_CPU, Metric.NAME_CPU_USAGE, Metric.UNIT_PERCENT, cpu_usage)
# Memory
if not runtime_info.OS_WIN:
max_rss = read_max_rss()
if max_rss != None:
self.report_metric(Metric.TYPE_STATE, Metric.CATEGORY_MEMORY, Metric.NAME_MAX_RSS, Metric.UNIT_KILOBYTE, max_rss)
if runtime_info.OS_LINUX:
current_rss = read_current_rss()
if current_rss != None:
self.report_metric(Metric.TYPE_STATE, Metric.CATEGORY_MEMORY, Metric.NAME_CURRENT_RSS, Metric.UNIT_KILOBYTE, current_rss)
vm_size = read_vm_size()
if vm_size != None:
self.report_metric(Metric.TYPE_STATE, Metric.CATEGORY_MEMORY, Metric.NAME_VM_SIZE, Metric.UNIT_KILOBYTE, vm_size)
# GC stats
gc_count0, gc_count1, gc_count2 = gc.get_count()
total_gc_count = gc_count0 + gc_count1 + gc_count2
self.report_metric(Metric.TYPE_STATE, Metric.CATEGORY_GC, Metric.NAME_GC_COUNT, Metric.UNIT_NONE, total_gc_count)
if min_version(3, 4):
gc_stats = gc.get_stats()
if gc_stats and gc_stats[0] and gc_stats[1] and gc_stats[2]:
total_collections = gc_stats[0]['collections'] + gc_stats[1]['collections'] + gc_stats[2]['collections']
self.report_metric(Metric.TYPE_COUNTER, Metric.CATEGORY_GC, Metric.NAME_GC_COLLECTIONS, Metric.UNIT_NONE, total_collections)
total_collected = gc_stats[0]['collected'] + gc_stats[1]['collected'] + gc_stats[2]['collected']
self.report_metric(Metric.TYPE_COUNTER, Metric.CATEGORY_GC, Metric.NAME_GC_COLLECTED, Metric.UNIT_NONE, total_collected)
total_uncollectable = gc_stats[0]['uncollectable'] + gc_stats[1]['uncollectable'] + gc_stats[2]['uncollectable']
self.report_metric(Metric.TYPE_STATE, Metric.CATEGORY_GC, Metric.NAME_GC_UNCOLLECTABLE, Metric.UNIT_NONE, total_uncollectable)
# Runtime
thread_count = threading.active_count()
self.report_metric(Metric.TYPE_STATE, Metric.CATEGORY_RUNTIME, Metric.NAME_THREAD_COUNT, Metric.UNIT_NONE, thread_count)
def collect_metrics(self):
u = resource.getrusage(resource.RUSAGE_SELF)
if gc_.isenabled():
c = list(gc_.get_count())
th = list(gc_.get_threshold())
g = GC(collect0=c[0] if not self.last_collect else c[0] - self.last_collect[0],
collect1=c[1] if not self.last_collect else c[
1] - self.last_collect[1],
collect2=c[2] if not self.last_collect else c[
2] - self.last_collect[2],
threshold0=th[0],
threshold1=th[1],
threshold2=th[2])
thr = t.enumerate()
daemon_threads = len([tr.daemon and tr.is_alive() for tr in thr])
alive_threads = len([not tr.daemon and tr.is_alive() for tr in thr])
dead_threads = len([not tr.is_alive() for tr in thr])
m = Metrics(ru_utime=u[0] if not self.last_usage else u[0] - self.last_usage[0],
ru_stime=u[1] if not self.last_usage else u[1] - self.last_usage[1],
ru_maxrss=u[2],
ru_ixrss=u[3],
ru_idrss=u[4],
ru_isrss=u[5],
ru_minflt=u[6] if not self.last_usage else u[6] - self.last_usage[6],
ru_majflt=u[7] if not self.last_usage else u[7] - self.last_usage[7],
ru_nswap=u[8] if not self.last_usage else u[8] - self.last_usage[8],
ru_inblock=u[9] if not self.last_usage else u[9] - self.last_usage[9],
ru_oublock=u[10] if not self.last_usage else u[10] - self.last_usage[10],
ru_msgsnd=u[11] if not self.last_usage else u[11] - self.last_usage[11],
ru_msgrcv=u[12] if not self.last_usage else u[12] - self.last_usage[12],
ru_nsignals=u[13] if not self.last_usage else u[13] - self.last_usage[13],
ru_nvcs=u[14] if not self.last_usage else u[14] - self.last_usage[14],
ru_nivcsw=u[15] if not self.last_usage else u[15] - self.last_usage[15],
alive_threads=alive_threads,
dead_threads=dead_threads,
daemon_threads=daemon_threads,
gc=g)
self.last_usage = u
if gc_.isenabled():
self.last_collect = c
return m