def mem_check(opts):
while True:
if opts['gc']:
try:
gc.collect()
except Exception as e:
logging.exception(repr(e) + ' while gc.collect()')
try:
rss = psutil.Process(os.getpid()).memory_info().rss
logging.info('current memory used: {rss}'.format(rss=rss))
if rss > opts['threshold']:
memory_dump(opts)
os.abort()
except Exception as e:
logging.exception(repr(e) + ' while checking memory usage')
finally:
time.sleep(opts['interval'])
python类collect()的实例源码
def test_cleanup(self):
gc.enable()
gc.collect()
assert not gc.garbage, "Object leak: %s" % str(gc.garbage)
container = pyngus.Container("abc")
c1 = container.create_connection("c1")
c2 = container.create_connection("c2")
assert c2
del c2
gc.collect()
c2 = container.get_connection("c2")
assert c2
c1 = container.get_connection("c1")
assert c1
c1.create_receiver("r1")
c1.create_sender("s1")
del c1
del c2
container.destroy()
del container
gc.collect()
assert not gc.garbage, "Object leak: %s" % str(gc.garbage)
def test_getViewWidget_deleted():
view = pg.PlotWidget()
item = pg.InfiniteLine()
view.addItem(item)
assert item.getViewWidget() is view
# Arrange to have Qt automatically delete the view widget
obj = pg.QtGui.QWidget()
view.setParent(obj)
del obj
gc.collect()
assert not pg.Qt.isQObjectAlive(view)
assert item.getViewWidget() is None
#if __name__ == '__main__':
#view = pg.PlotItem()
#vref = weakref.ref(view)
#item = pg.InfiniteLine()
#view.addItem(item)
#del view
#gc.collect()
def check(self):
#return self.debug_cycles() # uncomment to just debug cycles
l0, l1, l2 = gc.get_count()
if self.debug:
print('gc_check called:', l0, l1, l2)
if l0 > self.threshold[0]:
num = gc.collect(0)
if self.debug:
print('collecting gen 0, found: %d unreachable' % num)
if l1 > self.threshold[1]:
num = gc.collect(1)
if self.debug:
print('collecting gen 1, found: %d unreachable' % num)
if l2 > self.threshold[2]:
num = gc.collect(2)
if self.debug:
print('collecting gen 2, found: %d unreachable' % num)
def _getr(slist, olist, first=True):
i = 0
for e in slist:
oid = id(e)
typ = type(e)
if oid in olist or typ is int: ## or e in olist: ## since we're excluding all ints, there is no longer a need to check for olist keys
continue
olist[oid] = e
if first and (i%1000) == 0:
gc.collect()
tl = gc.get_referents(e)
if tl:
_getr(tl, olist, first=False)
i += 1
# The public function.
def start(self):
"""
Remember the current set of objects as the comparison for all future calls to diff()
Called automatically on init, but can be called manually as well.
"""
refs, count, objs = self.collect()
for r in self.startRefs:
self.forgetRef(self.startRefs[r])
self.startRefs.clear()
self.startRefs.update(refs)
for r in refs:
self.rememberRef(r)
self.startCount.clear()
self.startCount.update(count)
#self.newRefs.clear()
#self.newRefs.update(refs)
def test_getViewWidget_deleted():
view = pg.PlotWidget()
item = pg.InfiniteLine()
view.addItem(item)
assert item.getViewWidget() is view
# Arrange to have Qt automatically delete the view widget
obj = pg.QtGui.QWidget()
view.setParent(obj)
del obj
gc.collect()
assert not pg.Qt.isQObjectAlive(view)
assert item.getViewWidget() is None
#if __name__ == '__main__':
#view = pg.PlotItem()
#vref = weakref.ref(view)
#item = pg.InfiniteLine()
#view.addItem(item)
#del view
#gc.collect()
def check(self):
#return self.debug_cycles() # uncomment to just debug cycles
l0, l1, l2 = gc.get_count()
if self.debug:
print('gc_check called:', l0, l1, l2)
if l0 > self.threshold[0]:
num = gc.collect(0)
if self.debug:
print('collecting gen 0, found: %d unreachable' % num)
if l1 > self.threshold[1]:
num = gc.collect(1)
if self.debug:
print('collecting gen 1, found: %d unreachable' % num)
if l2 > self.threshold[2]:
num = gc.collect(2)
if self.debug:
print('collecting gen 2, found: %d unreachable' % num)
def _getr(slist, olist, first=True):
i = 0
for e in slist:
oid = id(e)
typ = type(e)
if oid in olist or typ is int: ## or e in olist: ## since we're excluding all ints, there is no longer a need to check for olist keys
continue
olist[oid] = e
if first and (i%1000) == 0:
gc.collect()
tl = gc.get_referents(e)
if tl:
_getr(tl, olist, first=False)
i += 1
# The public function.
def start(self):
"""
Remember the current set of objects as the comparison for all future calls to diff()
Called automatically on init, but can be called manually as well.
"""
refs, count, objs = self.collect()
for r in self.startRefs:
self.forgetRef(self.startRefs[r])
self.startRefs.clear()
self.startRefs.update(refs)
for r in refs:
self.rememberRef(r)
self.startCount.clear()
self.startCount.update(count)
#self.newRefs.clear()
#self.newRefs.update(refs)
def test_lots_of_queries(self):
import resource
import objgraph
class LoadTest(Model):
k = columns.Integer(primary_key=True)
v = columns.Integer()
sync_table(LoadTest)
gc.collect()
objgraph.show_most_common_types()
print("Starting...")
for i in range(1000000):
if i % 25000 == 0:
# print memory statistic
print("Memory usage: %s" % (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
LoadTest.create(k=i, v=i)
objgraph.show_most_common_types()
raise Exception("you shouldn't be here")
def setUp(self):
# gc.collect()
self.zeroVec = Vector2()
self.e1 = Vector2(1, 0)
self.e2 = Vector2(0, 1)
# self.t1 = (random(), random())
self.t1 = (1.2, 3.4)
self.l1 = list(self.t1)
self.v1 = Vector2(self.t1)
# self.t2 = (random(), random())
self.t2 = (5.6, 7.8)
self.l2 = list(self.t2)
self.v2 = Vector2(self.t2)
# self.s1 = random()
# self.s2 = random()
self.s1 = 5.6
self.s2 = 7.8
def setUp(self):
# gc.collect()
self.zeroVec = Vector3()
self.e1 = Vector3(1, 0, 0)
self.e2 = Vector3(0, 1, 0)
self.e3 = Vector3(0, 0, 1)
# self.t1 = (random(), random())
self.t1 = (1.2, 3.4, 9.6)
self.l1 = list(self.t1)
self.v1 = Vector3(self.t1)
# self.t2 = (random(), random())
self.t2 = (5.6, 7.8, 2.1)
self.l2 = list(self.t2)
self.v2 = Vector3(self.t2)
# self.s1 = random()
# self.s2 = random()
self.s1 = 5.6
self.s2 = 7.8
def make_gc_snapShot(filename, name):
"""Append the signatures to a file, giving them the given
'name'. A signature is a pair object_id / type_name"""
global first_time
if first_time:
gc.collect()
first_time = False
contents = []
for o in gc.get_objects():
try:
tname = o.__class__.__name__
except AttributeError:
tname = str(type(o))
contents.append((id(o), tname))
del tname
f = open(filename, 'a')
pickle.dump((name, contents), f)
f.close()
del contents
del f
def reach(self, ids):
"""
\param ids Iterable of object id, as returned by x[0],
with x in the result of (snapshot2 - snapshot1)
Return a dict id -> object with that id currently known.
The objects recorded with these id might have been
replaced by new ones... so we might end-up seeing objects
that don't correspond to the original ones. This is
especially true after a gc.collect()
"""
result = dict()
for obj in gc.get_objects():
if id(obj) in ids:
result[id(obj)] = obj
return result
def create_agents(self, generator):
"""
Given information on a set of countries and a generator function,
generate the agents and assign the results to ``self.agents``.
:type generator: DataFrame, str, int
:param generator: A function which generates the agents.
"""
self.generator = generator
country_array = pd.concat([pd.Series([c] * k["Population"]) for c, k in self.df.iterrows()])
country_array.index = range(len(country_array))
# Garbage collect before creating new processes.
gc.collect()
self.agents = pd.concat(
self.pool.imap(self._gen_agents,
np.array_split(country_array, self.processes * self.splits))
)
self.agents.index = range(len(self.agents))
def create_agents(self, generator):
"""
Given information on a set of countries and a generator function,
generate the agents and assign the results to ``self.agents``.
:type generator: DataFrame, str, int
:param generator: A function which generates the agents.
"""
self.generator = generator
country_array = pd.concat([pd.Series([c] * k["Population"]) for c, k in self.df.iterrows()])
country_array.index = range(len(country_array))
# Garbage collect before creating new processes.
gc.collect()
self.agents = pd.concat(
self.pool.imap(self._gen_agents,
np.array_split(country_array, self.processes * self.splits))
)
self.agents.index = range(len(self.agents))
def timer(func, repetitions=100000):
@wraps(func)
def wrapper(*args, **kwargs):
sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
sys.stdout.flush()
print(" ")
# disable garbage collection
gc.collect()
gc.disable()
start = time.time()
for x in range(repetitions):
result = func(*args, **kwargs)
end = time.time()
gc.enable() # re-enable garbage collection
gc.collect()
print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
return result
return wrapper
#------------------------------------------------------------------------------
# [ timer_X function decorators ]
# replicate the above decorator with different number of repetitions
#------------------------------------------------------------------------------
def timer_10(func, repetitions=10):
@wraps(func)
def wrapper(*args, **kwargs):
sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
sys.stdout.flush()
print(" ")
# disable garbage collection
gc.collect()
gc.disable()
start = time.time()
for x in range(repetitions):
result = func(*args, **kwargs)
end = time.time()
gc.enable() # re-enable garbage collection
gc.collect()
print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
return result
return wrapper
def timer_100(func, repetitions=100):
@wraps(func)
def wrapper(*args, **kwargs):
sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
sys.stdout.flush()
print(" ")
# disable garbage collection
gc.collect()
gc.disable()
start = time.time()
for x in range(repetitions):
result = func(*args, **kwargs)
end = time.time()
gc.enable() # re-enable garbage collection
gc.collect()
print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
return result
return wrapper
def timer_1k(func, repetitions=1000):
@wraps(func)
def wrapper(*args, **kwargs):
sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
sys.stdout.flush()
print(" ")
# disable garbage collection
gc.collect()
gc.disable()
start = time.time()
for x in range(repetitions):
result = func(*args, **kwargs)
end = time.time()
gc.enable() # re-enable garbage collection
gc.collect()
print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
return result
return wrapper
def timer_10k(func, repetitions=10000):
@wraps(func)
def wrapper(*args, **kwargs):
sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
sys.stdout.flush()
print(" ")
# disable garbage collection
gc.collect()
gc.disable()
start = time.time()
for x in range(repetitions):
result = func(*args, **kwargs)
end = time.time()
gc.enable() # re-enable garbage collection
gc.collect()
print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
return result
return wrapper
def timer(func, repetitions=100000):
@wraps(func)
def wrapper(*args, **kwargs):
sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
sys.stdout.flush()
print(" ")
# disable garbage collection
gc.collect()
gc.disable()
start = time.time()
for x in range(repetitions):
result = func(*args, **kwargs)
end = time.time()
gc.enable() # re-enable garbage collection
gc.collect()
print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
return result
return wrapper
#------------------------------------------------------------------------------
# [ timer_X function decorators ]
# replicate the above decorator with different number of repetitions
#------------------------------------------------------------------------------
def timer_10(func, repetitions=10):
@wraps(func)
def wrapper(*args, **kwargs):
sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
sys.stdout.flush()
print(" ")
# disable garbage collection
gc.collect()
gc.disable()
start = time.time()
for x in range(repetitions):
result = func(*args, **kwargs)
end = time.time()
gc.enable() # re-enable garbage collection
gc.collect()
print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
return result
return wrapper
def timer_100(func, repetitions=100):
@wraps(func)
def wrapper(*args, **kwargs):
sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
sys.stdout.flush()
print(" ")
# disable garbage collection
gc.collect()
gc.disable()
start = time.time()
for x in range(repetitions):
result = func(*args, **kwargs)
end = time.time()
gc.enable() # re-enable garbage collection
gc.collect()
print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
return result
return wrapper
def timer_1k(func, repetitions=1000):
@wraps(func)
def wrapper(*args, **kwargs):
sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
sys.stdout.flush()
print(" ")
# disable garbage collection
gc.collect()
gc.disable()
start = time.time()
for x in range(repetitions):
result = func(*args, **kwargs)
end = time.time()
gc.enable() # re-enable garbage collection
gc.collect()
print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
return result
return wrapper
def timer_10k(func, repetitions=10000):
@wraps(func)
def wrapper(*args, **kwargs):
sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
sys.stdout.flush()
print(" ")
# disable garbage collection
gc.collect()
gc.disable()
start = time.time()
for x in range(repetitions):
result = func(*args, **kwargs)
end = time.time()
gc.enable() # re-enable garbage collection
gc.collect()
print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
return result
return wrapper
def make_chunk(filename, arr, arr_len):
remain_arr = []
dump_arr = []
prob = float(chunk_num_frms) / arr_len
for e in arr:
flag_choose = np.random.binomial(1, prob)
if flag_choose == 1:
dump_arr.append(e)
arr_len -= 1
else:
remain_arr.append(e)
save_hd5(filename, dump_arr)
del dump_arr[:]
del dump_arr
gc.collect()
return remain_arr, arr_len
def test_it(collectible):
dd()
dd('======= ', ('collectible' if collectible else 'uncollectible'), ' object =======')
dd()
gc.collect()
dd('*** init, nr of referrers: ', len(gc.get_referrers(One)))
dd(' garbage: ', gc.garbage)
one = One(collectible)
dd(' created: ', one.typ, ': ', one)
dd(' nr of referrers: ', len(gc.get_referrers(One)))
dd(' delete:')
del one
gc.collect()
dd('*** after gc, nr of referrers: ', len(gc.get_referrers(One)))
dd(' garbage: ', gc.garbage)
def flushUnder(dirpath):
"""Flushes all modules that live under the given directory
:param dirpath: the name of the top most directory to search under.
:type dirpath: str
"""
modulePaths = list()
for name, module in sys.modules.items():
if module is None:
del sys.modules[name]
continue
try:
moduleDirpath = os.path.realpath(os.path.dirname(inspect.getfile(module)))
if moduleDirpath.startswith(dirpath):
modulePaths.append((name, inspect.getfile(sys.modules[name])))
del sys.modules[name]
logger.debug('unloaded module: %s ' % name)
except TypeError:
continue
# Force a garbage collection
gc.collect()
return modulePaths