python类collect()的实例源码

test_module.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_diagnostics_life(self):
        import gc
        from weakref import ref

        def tmp():
            cur = self.conn.cursor()
            try:
                cur.execute("select * from nonexist")
            except psycopg2.Error as exc:
                return cur, exc

        cur, e = tmp()
        diag = e.diag
        w = ref(cur)

        del e, cur
        gc.collect()
        assert(w() is not None)

        self.assertEqual(diag.sqlstate, '42P01')

        del diag
        gc.collect()
        gc.collect()
        assert(w() is None)
test_module.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_diagnostics_life(self):
        import gc
        from weakref import ref

        def tmp():
            cur = self.conn.cursor()
            try:
                cur.execute("select * from nonexist")
            except psycopg2.Error, exc:
                return cur, exc

        cur, e = tmp()
        diag = e.diag
        w = ref(cur)

        del e, cur
        gc.collect()
        assert(w() is not None)

        self.assertEqual(diag.sqlstate, '42P01')

        del diag
        gc.collect()
        gc.collect()
        assert(w() is None)
default.py 文件源码 项目:SIDR 作者: damurdock 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def runAnalysis(bam, fasta, blastresults, taxdump, modelOutput, output, tokeep, toremove, binary, target, level):
    taxdump, taxidDict = common.parseTaxdump(taxdump, False)
    gc.collect()
    click.echo("Taxdump parsed, %d taxIDs loaded" % len(taxdump))
    contigs = readFasta(fasta)
    gc.collect()
    click.echo("FASTA loaded, %d contigs returned" % len(contigs))
    contigs = readBAM(bam, contigs)
    gc.collect()
    click.echo("BAM loaded")
    contigs, classMap, classList = readBLAST(blastresults,
                                             taxdump, level.lower(), contigs)
    gc.collect()
    click.echo("BLAST results loaded")
    corpus, testdata, features = common.constructCorpus(list(contigs.values()), classMap, binary, target)
    gc.collect()
    click.echo("Corpus constucted, %d contigs in corpus and %d contigs in test data" % (len(corpus), len(testdata)))
    classifier = common.constructModel(corpus, classList, features, modelOutput)
    result = common.classifyData(classifier, testdata, classMap)
    common.generateOutput(tokeep, toremove, result, contigs.values(), target, output)
test_context.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_gc(self):
        """test close&term by garbage collection alone"""
        if PYPY:
            raise SkipTest("GC doesn't work ")

        # test credit @dln (GH #137):
        def gcf():
            def inner():
                ctx = self.Context()
                s = ctx.socket(zmq.PUSH)
            inner()
            gc.collect()
        t = Thread(target=gcf)
        t.start()
        t.join(timeout=1)
        self.assertFalse(t.is_alive(), "Garbage collection should have cleaned up context")
unittest.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _cleanUp(self, result):
        try:
            if self.forceGarbageCollection:
                gc.collect()
            util._Janitor().postCaseCleanup()
        except util.FailureError, e:
            result.addError(self, e.original)
            self._passed = False
        except:
            result.cleanupErrors(failure.Failure())
            self._passed = False
        for error in self._observer.getErrors():
            result.addError(self, error)
            self._passed = False
        self.flushLoggedErrors()
        self._removeObserver()
        if self._passed:
            result.addSuccess(self)
pytester.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def pytest_runtest_item(self, item):
        lines1 = self.get_open_files()
        yield
        if hasattr(sys, "pypy_version_info"):
            gc.collect()
        lines2 = self.get_open_files()

        new_fds = set([t[0] for t in lines2]) - set([t[0] for t in lines1])
        leaked_files = [t for t in lines2 if t[0] in new_fds]
        if leaked_files:
            error = []
            error.append("***** %s FD leakage detected" % len(leaked_files))
            error.extend([str(f) for f in leaked_files])
            error.append("*** Before:")
            error.extend([str(f) for f in lines1])
            error.append("*** After:")
            error.extend([str(f) for f in lines2])
            error.append(error[0])
            error.append("*** function %s:%s: %s " % item.location)
            pytest.fail("\n".join(error), pytrace=False)


# XXX copied from execnet's conftest.py - needs to be merged
pytester.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def getmodulecol(self,  source, configargs=(), withinit=False):
        """Return the module collection node for ``source``.

        This writes ``source`` to a file using :py:meth:`makepyfile`
        and then runs the pytest collection on it, returning the
        collection node for the test module.

        :param source: The source code of the module to collect.

        :param configargs: Any extra arguments to pass to
           :py:meth:`parseconfigure`.

        :param withinit: Whether to also write a ``__init__.py`` file
           to the temporarly directory to ensure it is a package.

        """
        kw = {self.request.function.__name__: Source(source).strip()}
        path = self.makepyfile(**kw)
        if withinit:
            self.makepyfile(__init__ = "#")
        self.config = config = self.parseconfigure(path, *configargs)
        node = self.getnode(config, path)
        return node
test_frombuffer.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_fom_buffer(self):
        a = array.array("i", range(16))
        x = (c_int * 16).from_buffer(a)

        y = X.from_buffer(a)
        self.assertEqual(y.c_int, a[0])
        self.assertFalse(y.init_called)

        self.assertEqual(x[:], a.tolist())

        a[0], a[-1] = 200, -200
        self.assertEqual(x[:], a.tolist())

        self.assertIn(a, x._objects.values())

        self.assertRaises(ValueError,
                          c_int.from_buffer, a, -1)

        expected = x[:]
        del a; gc.collect(); gc.collect(); gc.collect()
        self.assertEqual(x[:], expected)

        self.assertRaises(TypeError,
                          (c_char * 16).from_buffer, "a" * 16)
test_frombuffer.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_from_buffer_copy(self):
        a = array.array("i", range(16))
        x = (c_int * 16).from_buffer_copy(a)

        y = X.from_buffer_copy(a)
        self.assertEqual(y.c_int, a[0])
        self.assertFalse(y.init_called)

        self.assertEqual(x[:], range(16))

        a[0], a[-1] = 200, -200
        self.assertEqual(x[:], range(16))

        self.assertEqual(x._objects, None)

        self.assertRaises(ValueError,
                          c_int.from_buffer, a, -1)

        del a; gc.collect(); gc.collect(); gc.collect()
        self.assertEqual(x[:], range(16))

        x = (c_char * 16).from_buffer_copy("a" * 16)
        self.assertEqual(x[:], "a" * 16)
test_refcounts.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_1(self):
        from sys import getrefcount as grc

        f = dll._testfunc_callback_i_if
        f.restype = ctypes.c_int
        f.argtypes = [ctypes.c_int, MyCallback]

        def callback(value):
            #print "called back with", value
            return value

        self.assertEqual(grc(callback), 2)
        cb = MyCallback(callback)

        self.assertGreater(grc(callback), 2)
        result = f(-10, cb)
        self.assertEqual(result, -18)
        cb = None

        gc.collect()

        self.assertEqual(grc(callback), 2)
main.py 文件源码 项目:Recruit 作者: Weiyanyu 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def showImage(self): 
        #????????????????? main thread is not in main loop ????????main loop???????????????????????????UI???????????
        try:
            image = GenImage(os.getcwd() + '/resource/%s/' % (self.type))
            image.generateImage('position_for_image.csv','1.png','bar')             
            image.generateImage('salary_for_image.csv','2.png','pie')
        except:
            self.networkError()

        PixMapSalary = QtGui.QPixmap(os.getcwd() + '/resource/%s/images/1.png' % (self.type)).scaled(400,600)
        self.SalaryImage.setPixmap(PixMapSalary)
        PixMapPosition = QtGui.QPixmap(os.getcwd() + '/resource/%s/images/2.png' % (self.type)).scaled(500,500)
        self.PositionImage.setPixmap(PixMapPosition)

        del image
        gc.collect()

        #??????
        self.showStaff()

    #????????????????????????????webbrowser???
test_functools32.py 文件源码 项目:deb-python-functools32 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_main(verbose=None):
    test_classes = (
        TestPartial,
        TestPartialSubclass,
        TestPythonPartial,
        TestUpdateWrapper,
        TestTotalOrdering,
        TestCmpToKey,
        TestWraps,
        TestReduce,
        TestLRU,
        TestOrderedDict,
    )
    support.run_unittest(*test_classes)

    # verify reference counting
    if verbose and hasattr(sys, "gettotalrefcount"):
        import gc
        counts = [None] * 5
        for i in range(len(counts)):
            support.run_unittest(*test_classes)
            gc.collect()
            counts[i] = sys.gettotalrefcount()
        print(counts)
test_support27.py 文件源码 项目:deb-python-functools32 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def gc_collect():
    """Force as many objects as possible to be collected.

    In non-CPython implementations of Python, this is needed because timely
    deallocation is not guaranteed by the garbage collector.  (Even in CPython
    this can be the case in case of reference cycles.)  This means that __del__
    methods may be called later than expected and weakrefs may remain alive for
    longer than expected.  This function tries its best to force all garbage
    objects to disappear.
    """
    gc.collect()
    if is_jython:
        time.sleep(0.1)
    gc.collect()
    gc.collect()


#=======================================================================
# Decorator for running a function in a different locale, correctly resetting
# it afterwards.
models.py 文件源码 项目:zing 作者: evernote 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def delete(self, *args, **kwargs):
        directory = self.directory

        # Just doing a plain delete will collect all related objects in memory
        # before deleting: translation projects, stores, units, quality checks,
        # suggestions, and submissions.
        # This can easily take down a process. If we do a translation project
        # at a time and force garbage collection, things stay much more
        # managable.
        import gc
        gc.collect()
        for tp in self.translationproject_set.iterator():
            tp.delete()
            gc.collect()

        super(Project, self).delete(*args, **kwargs)

        directory.delete()
test_leaks.py 文件源码 项目:Leics 作者: LeicsFrameWork 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_threaded_leak(self):
            gg = []
            def worker():
                # only main greenlet present
                gg.append(weakref.ref(greenlet.getcurrent()))
            for i in range(2):
                t = threading.Thread(target=worker)
                t.start()
                t.join()
                del t
            greenlet.getcurrent() # update ts_current
            self.recycle_threads()
            greenlet.getcurrent() # update ts_current
            gc.collect()
            greenlet.getcurrent() # update ts_current
            for g in gg:
                self.assertTrue(g() is None)
test_leaks.py 文件源码 项目:Leics 作者: LeicsFrameWork 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_threaded_adv_leak(self):
            gg = []
            def worker():
                # main and additional *finished* greenlets
                ll = greenlet.getcurrent().ll = []
                def additional():
                    ll.append(greenlet.getcurrent())
                for i in range(2):
                    greenlet.greenlet(additional).switch()
                gg.append(weakref.ref(greenlet.getcurrent()))
            for i in range(2):
                t = threading.Thread(target=worker)
                t.start()
                t.join()
                del t
            greenlet.getcurrent() # update ts_current
            self.recycle_threads()
            greenlet.getcurrent() # update ts_current
            gc.collect()
            greenlet.getcurrent() # update ts_current
            for g in gg:
                self.assertTrue(g() is None)
pywin32_testutil.py 文件源码 项目:purelove 作者: hucmosin 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __call__(self, result = None):
        # For the COM suite's sake, always ensure we don't leak
        # gateways/interfaces
        from pythoncom import _GetInterfaceCount, _GetGatewayCount
        gc.collect()
        ni = _GetInterfaceCount()
        ng = _GetGatewayCount()
        self.real_test(result)
        # Failed - no point checking anything else
        if result.shouldStop or not result.wasSuccessful():
            return
        self._do_leak_tests(result)
        gc.collect()
        lost_i = _GetInterfaceCount() - ni
        lost_g = _GetGatewayCount() - ng
        if lost_i or lost_g:
            msg = "%d interface objects and %d gateway objects leaked" \
                                                        % (lost_i, lost_g)
            exc = AssertionError(msg)
            result.addFailure(self.real_test, (exc.__class__, exc, None))
pywin32_testutil.py 文件源码 项目:purelove 作者: hucmosin 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _do_leak_tests(self, result = None):
        try:
            gtrc = sys.gettotalrefcount
        except AttributeError:
            return # can't do leak tests in this build
        # Assume already called once, to prime any caches etc
        gc.collect()
        trc = gtrc()
        for i in range(self.num_leak_iters):
            self.real_test(result)
            if result.shouldStop:
                break
        del i # created after we remembered the refcount!
        # int division here means one or 2 stray references won't force 
        # failure, but one per loop
        gc.collect()
        lost = (gtrc() - trc) // self.num_leak_iters
        if lost < 0:
            msg = "LeakTest: %s appeared to gain %d references!!" % (self.real_test, -lost)
            result.addFailure(self.real_test, (AssertionError, msg, None))
        if lost > 0:
            msg = "LeakTest: %s lost %d references" % (self.real_test, lost)
            exc = AssertionError(msg)
            result.addFailure(self.real_test, (exc.__class__, exc, None))
test_humanname.py 文件源码 项目:human-name-py 作者: djudd 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_no_memory_leak():
    import gc
    import os

    def rss():
        gc.collect()
        out = os.popen("ps -o rss= -p %d" % os.getpid()).read()
        return int(out.strip())

    before = rss()

    for _ in range(100000):
        n = Name.parse("Reallyverylongfirstname Reallyverylonglastname")
        n.given_name
        n.surname

    after = rss()

    assert after < 1.25 * before
test_ffi_obj.py 文件源码 项目:SwiftKitten 作者: johncsnyder 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_ffi_type_not_immortal():
    import weakref, gc
    ffi = _cffi1_backend.FFI()
    t1 = ffi.typeof("int **")
    t2 = ffi.typeof("int *")
    w1 = weakref.ref(t1)
    w2 = weakref.ref(t2)
    del t1, ffi
    gc.collect()
    assert w1() is None
    assert w2() is t2
    ffi = _cffi1_backend.FFI()
    assert ffi.typeof(ffi.new("int **")[0]) is t2
    #
    ffi = _cffi1_backend.FFI()
    t1 = ffi.typeof("int ***")
    t2 = ffi.typeof("int **")
    w1 = weakref.ref(t1)
    w2 = weakref.ref(t2)
    del t2, ffi
    gc.collect()
    assert w1() is t1
    assert w2() is not None   # kept alive by t1
    ffi = _cffi1_backend.FFI()
    assert ffi.typeof("int * *") is t1.item


问题


面经


文章

微信
公众号

扫码关注公众号