def test_diagnostics_life(self):
import gc
from weakref import ref
def tmp():
cur = self.conn.cursor()
try:
cur.execute("select * from nonexist")
except psycopg2.Error as exc:
return cur, exc
cur, e = tmp()
diag = e.diag
w = ref(cur)
del e, cur
gc.collect()
assert(w() is not None)
self.assertEqual(diag.sqlstate, '42P01')
del diag
gc.collect()
gc.collect()
assert(w() is None)
python类collect()的实例源码
def test_diagnostics_life(self):
import gc
from weakref import ref
def tmp():
cur = self.conn.cursor()
try:
cur.execute("select * from nonexist")
except psycopg2.Error, exc:
return cur, exc
cur, e = tmp()
diag = e.diag
w = ref(cur)
del e, cur
gc.collect()
assert(w() is not None)
self.assertEqual(diag.sqlstate, '42P01')
del diag
gc.collect()
gc.collect()
assert(w() is None)
def runAnalysis(bam, fasta, blastresults, taxdump, modelOutput, output, tokeep, toremove, binary, target, level):
taxdump, taxidDict = common.parseTaxdump(taxdump, False)
gc.collect()
click.echo("Taxdump parsed, %d taxIDs loaded" % len(taxdump))
contigs = readFasta(fasta)
gc.collect()
click.echo("FASTA loaded, %d contigs returned" % len(contigs))
contigs = readBAM(bam, contigs)
gc.collect()
click.echo("BAM loaded")
contigs, classMap, classList = readBLAST(blastresults,
taxdump, level.lower(), contigs)
gc.collect()
click.echo("BLAST results loaded")
corpus, testdata, features = common.constructCorpus(list(contigs.values()), classMap, binary, target)
gc.collect()
click.echo("Corpus constucted, %d contigs in corpus and %d contigs in test data" % (len(corpus), len(testdata)))
classifier = common.constructModel(corpus, classList, features, modelOutput)
result = common.classifyData(classifier, testdata, classMap)
common.generateOutput(tokeep, toremove, result, contigs.values(), target, output)
def test_gc(self):
"""test close&term by garbage collection alone"""
if PYPY:
raise SkipTest("GC doesn't work ")
# test credit @dln (GH #137):
def gcf():
def inner():
ctx = self.Context()
s = ctx.socket(zmq.PUSH)
inner()
gc.collect()
t = Thread(target=gcf)
t.start()
t.join(timeout=1)
self.assertFalse(t.is_alive(), "Garbage collection should have cleaned up context")
def _cleanUp(self, result):
try:
if self.forceGarbageCollection:
gc.collect()
util._Janitor().postCaseCleanup()
except util.FailureError, e:
result.addError(self, e.original)
self._passed = False
except:
result.cleanupErrors(failure.Failure())
self._passed = False
for error in self._observer.getErrors():
result.addError(self, error)
self._passed = False
self.flushLoggedErrors()
self._removeObserver()
if self._passed:
result.addSuccess(self)
def pytest_runtest_item(self, item):
lines1 = self.get_open_files()
yield
if hasattr(sys, "pypy_version_info"):
gc.collect()
lines2 = self.get_open_files()
new_fds = set([t[0] for t in lines2]) - set([t[0] for t in lines1])
leaked_files = [t for t in lines2 if t[0] in new_fds]
if leaked_files:
error = []
error.append("***** %s FD leakage detected" % len(leaked_files))
error.extend([str(f) for f in leaked_files])
error.append("*** Before:")
error.extend([str(f) for f in lines1])
error.append("*** After:")
error.extend([str(f) for f in lines2])
error.append(error[0])
error.append("*** function %s:%s: %s " % item.location)
pytest.fail("\n".join(error), pytrace=False)
# XXX copied from execnet's conftest.py - needs to be merged
def getmodulecol(self, source, configargs=(), withinit=False):
"""Return the module collection node for ``source``.
This writes ``source`` to a file using :py:meth:`makepyfile`
and then runs the pytest collection on it, returning the
collection node for the test module.
:param source: The source code of the module to collect.
:param configargs: Any extra arguments to pass to
:py:meth:`parseconfigure`.
:param withinit: Whether to also write a ``__init__.py`` file
to the temporarly directory to ensure it is a package.
"""
kw = {self.request.function.__name__: Source(source).strip()}
path = self.makepyfile(**kw)
if withinit:
self.makepyfile(__init__ = "#")
self.config = config = self.parseconfigure(path, *configargs)
node = self.getnode(config, path)
return node
def test_fom_buffer(self):
a = array.array("i", range(16))
x = (c_int * 16).from_buffer(a)
y = X.from_buffer(a)
self.assertEqual(y.c_int, a[0])
self.assertFalse(y.init_called)
self.assertEqual(x[:], a.tolist())
a[0], a[-1] = 200, -200
self.assertEqual(x[:], a.tolist())
self.assertIn(a, x._objects.values())
self.assertRaises(ValueError,
c_int.from_buffer, a, -1)
expected = x[:]
del a; gc.collect(); gc.collect(); gc.collect()
self.assertEqual(x[:], expected)
self.assertRaises(TypeError,
(c_char * 16).from_buffer, "a" * 16)
def test_from_buffer_copy(self):
a = array.array("i", range(16))
x = (c_int * 16).from_buffer_copy(a)
y = X.from_buffer_copy(a)
self.assertEqual(y.c_int, a[0])
self.assertFalse(y.init_called)
self.assertEqual(x[:], range(16))
a[0], a[-1] = 200, -200
self.assertEqual(x[:], range(16))
self.assertEqual(x._objects, None)
self.assertRaises(ValueError,
c_int.from_buffer, a, -1)
del a; gc.collect(); gc.collect(); gc.collect()
self.assertEqual(x[:], range(16))
x = (c_char * 16).from_buffer_copy("a" * 16)
self.assertEqual(x[:], "a" * 16)
def test_1(self):
from sys import getrefcount as grc
f = dll._testfunc_callback_i_if
f.restype = ctypes.c_int
f.argtypes = [ctypes.c_int, MyCallback]
def callback(value):
#print "called back with", value
return value
self.assertEqual(grc(callback), 2)
cb = MyCallback(callback)
self.assertGreater(grc(callback), 2)
result = f(-10, cb)
self.assertEqual(result, -18)
cb = None
gc.collect()
self.assertEqual(grc(callback), 2)
def showImage(self):
#????????????????? main thread is not in main loop ????????main loop???????????????????????????UI???????????
try:
image = GenImage(os.getcwd() + '/resource/%s/' % (self.type))
image.generateImage('position_for_image.csv','1.png','bar')
image.generateImage('salary_for_image.csv','2.png','pie')
except:
self.networkError()
PixMapSalary = QtGui.QPixmap(os.getcwd() + '/resource/%s/images/1.png' % (self.type)).scaled(400,600)
self.SalaryImage.setPixmap(PixMapSalary)
PixMapPosition = QtGui.QPixmap(os.getcwd() + '/resource/%s/images/2.png' % (self.type)).scaled(500,500)
self.PositionImage.setPixmap(PixMapPosition)
del image
gc.collect()
#??????
self.showStaff()
#????????????????????????????webbrowser???
def test_main(verbose=None):
test_classes = (
TestPartial,
TestPartialSubclass,
TestPythonPartial,
TestUpdateWrapper,
TestTotalOrdering,
TestCmpToKey,
TestWraps,
TestReduce,
TestLRU,
TestOrderedDict,
)
support.run_unittest(*test_classes)
# verify reference counting
if verbose and hasattr(sys, "gettotalrefcount"):
import gc
counts = [None] * 5
for i in range(len(counts)):
support.run_unittest(*test_classes)
gc.collect()
counts[i] = sys.gettotalrefcount()
print(counts)
def gc_collect():
"""Force as many objects as possible to be collected.
In non-CPython implementations of Python, this is needed because timely
deallocation is not guaranteed by the garbage collector. (Even in CPython
this can be the case in case of reference cycles.) This means that __del__
methods may be called later than expected and weakrefs may remain alive for
longer than expected. This function tries its best to force all garbage
objects to disappear.
"""
gc.collect()
if is_jython:
time.sleep(0.1)
gc.collect()
gc.collect()
#=======================================================================
# Decorator for running a function in a different locale, correctly resetting
# it afterwards.
def delete(self, *args, **kwargs):
directory = self.directory
# Just doing a plain delete will collect all related objects in memory
# before deleting: translation projects, stores, units, quality checks,
# suggestions, and submissions.
# This can easily take down a process. If we do a translation project
# at a time and force garbage collection, things stay much more
# managable.
import gc
gc.collect()
for tp in self.translationproject_set.iterator():
tp.delete()
gc.collect()
super(Project, self).delete(*args, **kwargs)
directory.delete()
def test_threaded_leak(self):
gg = []
def worker():
# only main greenlet present
gg.append(weakref.ref(greenlet.getcurrent()))
for i in range(2):
t = threading.Thread(target=worker)
t.start()
t.join()
del t
greenlet.getcurrent() # update ts_current
self.recycle_threads()
greenlet.getcurrent() # update ts_current
gc.collect()
greenlet.getcurrent() # update ts_current
for g in gg:
self.assertTrue(g() is None)
def test_threaded_adv_leak(self):
gg = []
def worker():
# main and additional *finished* greenlets
ll = greenlet.getcurrent().ll = []
def additional():
ll.append(greenlet.getcurrent())
for i in range(2):
greenlet.greenlet(additional).switch()
gg.append(weakref.ref(greenlet.getcurrent()))
for i in range(2):
t = threading.Thread(target=worker)
t.start()
t.join()
del t
greenlet.getcurrent() # update ts_current
self.recycle_threads()
greenlet.getcurrent() # update ts_current
gc.collect()
greenlet.getcurrent() # update ts_current
for g in gg:
self.assertTrue(g() is None)
def __call__(self, result = None):
# For the COM suite's sake, always ensure we don't leak
# gateways/interfaces
from pythoncom import _GetInterfaceCount, _GetGatewayCount
gc.collect()
ni = _GetInterfaceCount()
ng = _GetGatewayCount()
self.real_test(result)
# Failed - no point checking anything else
if result.shouldStop or not result.wasSuccessful():
return
self._do_leak_tests(result)
gc.collect()
lost_i = _GetInterfaceCount() - ni
lost_g = _GetGatewayCount() - ng
if lost_i or lost_g:
msg = "%d interface objects and %d gateway objects leaked" \
% (lost_i, lost_g)
exc = AssertionError(msg)
result.addFailure(self.real_test, (exc.__class__, exc, None))
def _do_leak_tests(self, result = None):
try:
gtrc = sys.gettotalrefcount
except AttributeError:
return # can't do leak tests in this build
# Assume already called once, to prime any caches etc
gc.collect()
trc = gtrc()
for i in range(self.num_leak_iters):
self.real_test(result)
if result.shouldStop:
break
del i # created after we remembered the refcount!
# int division here means one or 2 stray references won't force
# failure, but one per loop
gc.collect()
lost = (gtrc() - trc) // self.num_leak_iters
if lost < 0:
msg = "LeakTest: %s appeared to gain %d references!!" % (self.real_test, -lost)
result.addFailure(self.real_test, (AssertionError, msg, None))
if lost > 0:
msg = "LeakTest: %s lost %d references" % (self.real_test, lost)
exc = AssertionError(msg)
result.addFailure(self.real_test, (exc.__class__, exc, None))
def test_no_memory_leak():
import gc
import os
def rss():
gc.collect()
out = os.popen("ps -o rss= -p %d" % os.getpid()).read()
return int(out.strip())
before = rss()
for _ in range(100000):
n = Name.parse("Reallyverylongfirstname Reallyverylonglastname")
n.given_name
n.surname
after = rss()
assert after < 1.25 * before
def test_ffi_type_not_immortal():
import weakref, gc
ffi = _cffi1_backend.FFI()
t1 = ffi.typeof("int **")
t2 = ffi.typeof("int *")
w1 = weakref.ref(t1)
w2 = weakref.ref(t2)
del t1, ffi
gc.collect()
assert w1() is None
assert w2() is t2
ffi = _cffi1_backend.FFI()
assert ffi.typeof(ffi.new("int **")[0]) is t2
#
ffi = _cffi1_backend.FFI()
t1 = ffi.typeof("int ***")
t2 = ffi.typeof("int **")
w1 = weakref.ref(t1)
w2 = weakref.ref(t2)
del t2, ffi
gc.collect()
assert w1() is t1
assert w2() is not None # kept alive by t1
ffi = _cffi1_backend.FFI()
assert ffi.typeof("int * *") is t1.item