python类get_objects()的实例源码

Noparallel.py 文件源码 项目:zeronet-debian 作者: bashrc 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def testBenchmark():
        import time

        def printThreadNum():
            import gc
            from greenlet import greenlet
            objs = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)]
            print "Greenlets: %s" % len(objs)

        printThreadNum()
        test = TestNoblock()
        s = time.time()
        for i in range(3):
            gevent.spawn(test.count, i + 1)
        print "Created in %.3fs" % (time.time() - s)
        printThreadNum()
        time.sleep(5)
recipe-576523.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def make_gc_snapShot(filename, name):
        """Append the signatures to a file, giving them the given
        'name'. A signature is a pair object_id / type_name"""
    global first_time
    if first_time:
        gc.collect()
        first_time = False
    contents = []
    for o in gc.get_objects():
        try:
            tname = o.__class__.__name__
        except AttributeError:
            tname = str(type(o))
        contents.append((id(o), tname))
        del tname
    f = open(filename, 'a')
    pickle.dump((name, contents), f)
    f.close()
    del contents
    del f
recipe-576523.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def reach(self, ids):
            """
            \param ids Iterable of object id, as returned by x[0],
            with x in the result of (snapshot2 - snapshot1)

            Return a dict id -> object with that id currently known.

            The objects recorded with these id might have been
            replaced by new ones... so we might end-up seeing objects
            that don't correspond to the original ones. This is
            especially true after a gc.collect()
            """
            result = dict()
            for obj in gc.get_objects():
                if id(obj) in ids:
                    result[id(obj)] = obj
            return result
general.py 文件源码 项目:coquery 作者: gkunter 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def memory_dump():
    import gc
    x = 0
    for obj in gc.get_objects():
        i = id(obj)
        size = sys.getsizeof(obj, 0)
        # referrers = [id(o) for o in gc.get_referrers(obj)]
        try:
            cls = str(obj.__class__)
        except:
            cls = "<no class>"
        if size > 1024 * 50:
            referents = set([id(o) for o in gc.get_referents(obj)])
            x += 1
            print(x, {'id': i,
                      'class': cls,
                      'size': size,
                      "ref": len(referents)})
            #if len(referents) < 2000:
                #print(obj)
hdf5_viewer.py 文件源码 项目:nyroglancer 作者: funkey 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def add_file(self, filename):

        # make sure this file is closed before opening it again, h5py doesn't
        # handle this well otherwise
        for obj in gc.get_objects():
            if isinstance(obj, h5py.File):
                try:
                    if obj.filename == filename:
                        print("Closing previously open %s"%filename)
                        obj.close()
                except:
                    pass

        f = h5py.File(filename, 'r')
        self.files.append(f)
        self.__traverse_add(f, filename)
patcher.py 文件源码 项目:remoteControlPPT 作者: htwenning 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _green_existing_locks():
    """Make locks created before monkey-patching safe.

    RLocks rely on a Lock and on Python 2, if an unpatched Lock blocks, it
    blocks the native thread. We need to replace these with green Locks.

    This was originally noticed in the stdlib logging module."""
    import gc
    import threading
    import eventlet.green.thread
    lock_type = type(threading.Lock())
    rlock_type = type(threading.RLock())
    if sys.version_info[0] >= 3:
        pyrlock_type = type(threading._PyRLock())
    # We're monkey-patching so there can't be any greenlets yet, ergo our thread
    # ID is the only valid owner possible.
    tid = eventlet.green.thread.get_ident()
    for obj in gc.get_objects():
        if isinstance(obj, rlock_type):
            if (sys.version_info[0] == 2 and
                    isinstance(obj._RLock__block, lock_type)):
                _fix_py2_rlock(obj, tid)
            elif (sys.version_info[0] >= 3 and
                    not isinstance(obj, pyrlock_type)):
                _fix_py3_rlock(obj)
main.py 文件源码 项目:PYDOJO 作者: sprintingkiwi 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def getactors(tag=None):
    if tag is not None:
        if tag in game_info.tagged_actors:
            return game_info.tagged_actors[tag]
        else:
            print('DEBUG: tag not in dictionary')
            return []
    else:
        return ACTORS
    # tagged_actors_list = []
    # for obj in gc.get_objects():
    #     if isinstance(obj, Actor):
    #         if tag is None:
    #             tagged_actors_list.append(obj)
    #         elif tag is str:
    #             if tag in obj.tags:
    #                 tagged_actors_list.append(obj)
    # return tagged_actors_list
tpool_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_leakage_from_tracebacks(self):
        tpool.execute(noop)  # get it started
        gc.collect()
        initial_objs = len(gc.get_objects())
        for i in range(10):
            self.assertRaises(RuntimeError, tpool.execute, raise_exception)
        gc.collect()
        middle_objs = len(gc.get_objects())
        # some objects will inevitably be created by the previous loop
        # now we test to ensure that running the loop an order of
        # magnitude more doesn't generate additional objects
        for i in six.moves.range(100):
            self.assertRaises(RuntimeError, tpool.execute, raise_exception)
        first_created = middle_objs - initial_objs
        gc.collect()
        second_created = len(gc.get_objects()) - middle_objs
        self.assert_(second_created - first_created < 10,
                     "first loop: %s, second loop: %s" % (first_created,
                                                          second_created))
        tpool.killall()
patcher.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _green_existing_locks():
    """Make locks created before monkey-patching safe.

    RLocks rely on a Lock and on Python 2, if an unpatched Lock blocks, it
    blocks the native thread. We need to replace these with green Locks.

    This was originally noticed in the stdlib logging module."""
    import gc
    import threading
    import eventlet.green.thread
    lock_type = type(threading.Lock())
    rlock_type = type(threading.RLock())
    if sys.version_info[0] >= 3:
        pyrlock_type = type(threading._PyRLock())
    # We're monkey-patching so there can't be any greenlets yet, ergo our thread
    # ID is the only valid owner possible.
    tid = eventlet.green.thread.get_ident()
    for obj in gc.get_objects():
        if isinstance(obj, rlock_type):
            if (sys.version_info[0] == 2 and
                    isinstance(obj._RLock__block, lock_type)):
                _fix_py2_rlock(obj, tid)
            elif (sys.version_info[0] >= 3 and
                    not isinstance(obj, pyrlock_type)):
                _fix_py3_rlock(obj)
plugin.py 文件源码 项目:dutch-boy 作者: Nextdoor 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def begin(self):
        self.create_initial_summary()

        if self.detect_leaked_mocks:

            # Record pre-existing mocks
            gc.collect()
            self.known_mocks = list(KnownMock(weakref.ref(m), None, None)
                                    for m in gc.get_objects() if isinstance(m, mock.Mock))
            self.previous_mock_refs = list(m.mock_ref for m in self.known_mocks)

            if self.patch_mock:
                detector = self

                def decorator(f):
                    @functools.wraps(f)
                    def wrapper(new_mock, *args, **kwargs):
                        f(new_mock, *args, **kwargs)
                        detector.register_mock(new_mock, detector.level_name.get(LEVEL_TEST))
                    return wrapper

                Base.__init__ = decorator(Base.__init__)
debug.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def listObjs(regex='Q', typ=None):
    """List all objects managed by python gc with class name matching regex.
    Finds 'Q...' classes by default."""
    if typ is not None:
        return [x for x in gc.get_objects() if isinstance(x, typ)]
    else:
        return [x for x in gc.get_objects() if re.match(regex, type(x).__name__)]
debug.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_all_objects():
    """Return a list of all live Python objects (excluding int and long), not including the list itself."""
    gc.collect()
    gcl = gc.get_objects()
    olist = {}
    _getr(gcl, olist)

    del olist[id(olist)]
    del olist[id(gcl)]
    del olist[id(sys._getframe())]
    return olist
__init__.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def cleanup():
    global _cleanupCalled
    if _cleanupCalled:
        return

    if not getConfigOption('exitCleanup'):
        return

    ViewBox.quit()  ## tell ViewBox that it doesn't need to deregister views anymore.

    ## Workaround for Qt exit crash:
    ## ALL QGraphicsItems must have a scene before they are deleted.
    ## This is potentially very expensive, but preferred over crashing.
    ## Note: this appears to be fixed in PySide as of 2012.12, but it should be left in for a while longer..
    if QtGui.QApplication.instance() is None:
        return
    import gc
    s = QtGui.QGraphicsScene()
    for o in gc.get_objects():
        try:
            if isinstance(o, QtGui.QGraphicsItem) and isQObjectAlive(o) and o.scene() is None:
                if getConfigOption('crashWarning'):
                    sys.stderr.write('Error: graphics item without scene. '
                        'Make sure ViewBox.close() and GraphicsView.close() '
                        'are properly called before app shutdown (%s)\n' % (o,))

                s.addItem(o)
        except RuntimeError:  ## occurs if a python wrapper no longer has its underlying C++ object
            continue
    _cleanupCalled = True
debug.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def listObjs(regex='Q', typ=None):
    """List all objects managed by python gc with class name matching regex.
    Finds 'Q...' classes by default."""
    if typ is not None:
        return [x for x in gc.get_objects() if isinstance(x, typ)]
    else:
        return [x for x in gc.get_objects() if re.match(regex, type(x).__name__)]
debug.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_all_objects():
    """Return a list of all live Python objects (excluding int and long), not including the list itself."""
    gc.collect()
    gcl = gc.get_objects()
    olist = {}
    _getr(gcl, olist)

    del olist[id(olist)]
    del olist[id(gcl)]
    del olist[id(sys._getframe())]
    return olist
tests.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_markup_leaks(self):
        counts = set()
        for count in range(20):
            for item in range(1000):
                escape("foo")
                escape("<foo>")
                escape(u"foo")
                escape(u"<foo>")
            counts.add(len(gc.get_objects()))
        assert len(counts) == 1, 'ouch, c extension seems to leak objects'
regression.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __enter__(self):
        gc.disable()
        _gc_lock.acquire()
        loc = flask._request_ctx_stack._local

        # Force Python to track this dictionary at all times.
        # This is necessary since Python only starts tracking
        # dicts if they contain mutable objects.  It's a horrible,
        # horrible hack but makes this kinda testable.
        loc.__storage__['FOOO'] = [1, 2, 3]

        gc.collect()
        self.old_objects = len(gc.get_objects())
regression.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __exit__(self, exc_type, exc_value, tb):
        if not hasattr(sys, 'getrefcount'):
            gc.collect()
        new_objects = len(gc.get_objects())
        if new_objects > self.old_objects:
            self.testcase.fail('Example code leaked')
        _gc_lock.release()
        gc.enable()
recipe-576523.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _get_objects(self):
        return gc.get_objects()
tests.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_markup_leaks(self):
        counts = set()
        for count in range(20):
            for item in range(1000):
                escape("foo")
                escape("<foo>")
                escape(u"foo")
                escape(u"<foo>")
            counts.add(len(gc.get_objects()))
        assert len(counts) == 1, 'ouch, c extension seems to leak objects'
eventlet_backdoor.py 文件源码 项目:iotronic 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _find_objects(t):
    return [o for o in gc.get_objects() if isinstance(o, t)]
tests.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_markup_leaks(self):
        counts = set()
        for count in range(20):
            for item in range(1000):
                escape("foo")
                escape("<foo>")
                escape(u"foo")
                escape(u"<foo>")
            counts.add(len(gc.get_objects()))
        assert len(counts) == 1, 'ouch, c extension seems to leak objects'
regression.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __enter__(self):
        gc.disable()
        _gc_lock.acquire()
        loc = flask._request_ctx_stack._local

        # Force Python to track this dictionary at all times.
        # This is necessary since Python only starts tracking
        # dicts if they contain mutable objects.  It's a horrible,
        # horrible hack but makes this kinda testable.
        loc.__storage__['FOOO'] = [1, 2, 3]

        gc.collect()
        self.old_objects = len(gc.get_objects())
regression.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __exit__(self, exc_type, exc_value, tb):
        if not hasattr(sys, 'getrefcount'):
            gc.collect()
        new_objects = len(gc.get_objects())
        if new_objects > self.old_objects:
            self.testcase.fail('Example code leaked')
        _gc_lock.release()
        gc.enable()
test_callbacks.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_issue_7959(self):
        proto = self.functype.im_func(None)

        class X(object):
            def func(self): pass
            def __init__(self):
                self.v = proto(self.func)

        import gc
        for i in range(32):
            X()
        gc.collect()
        live = [x for x in gc.get_objects()
                if isinstance(x, X)]
        self.assertEqual(len(live), 0)
StatsPlugin.py 文件源码 项目:zeronet-debian 作者: bashrc 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def actionDumpobj(self):

        import gc
        import sys

        self.sendHeader()

        # No more if not in debug mode
        if not config.debug:
            yield "Not in debug mode"
            raise StopIteration

        class_filter = self.get.get("class")

        yield """
        <style>
         * { font-family: monospace; white-space: pre }
         table * { text-align: right; padding: 0px 10px }
        </style>
        """

        objs = gc.get_objects()
        for obj in objs:
            obj_type = str(type(obj))
            if obj_type != "<type 'instance'>" or obj.__class__.__name__ != class_filter:
                continue
            yield "%.1fkb %s... " % (float(sys.getsizeof(obj)) / 1024, cgi.escape(str(obj)))
            for attr in dir(obj):
                yield "- %s: %s<br>" % (attr, cgi.escape(str(getattr(obj, attr))))
            yield "<br>"

        gc.collect()  # Implicit grabage collection
test_regression.py 文件源码 项目:radar 作者: amoose136 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_errobj_reference_leak(self, level=rlevel):
        # Ticket #955
        with np.errstate(all="ignore"):
            z = int(0)
            p = np.int32(-1)

            gc.collect()
            n_before = len(gc.get_objects())
            z**p  # this shouldn't leak a reference to errobj
            gc.collect()
            n_after = len(gc.get_objects())
            assert_(n_before >= n_after, (n_before, n_after))
tests.py 文件源码 项目:Texty 作者: sarthfrey 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_markup_leaks(self):
        counts = set()
        for count in range(20):
            for item in range(1000):
                escape("foo")
                escape("<foo>")
                escape(u"foo")
                escape(u"<foo>")
            counts.add(len(gc.get_objects()))
        assert len(counts) == 1, 'ouch, c extension seems to leak objects'
regression.py 文件源码 项目:Texty 作者: sarthfrey 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __enter__(self):
        gc.disable()
        _gc_lock.acquire()
        loc = flask._request_ctx_stack._local

        # Force Python to track this dictionary at all times.
        # This is necessary since Python only starts tracking
        # dicts if they contain mutable objects.  It's a horrible,
        # horrible hack but makes this kinda testable.
        loc.__storage__['FOOO'] = [1, 2, 3]

        gc.collect()
        self.old_objects = len(gc.get_objects())
regression.py 文件源码 项目:Texty 作者: sarthfrey 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __exit__(self, exc_type, exc_value, tb):
        if not hasattr(sys, 'getrefcount'):
            gc.collect()
        new_objects = len(gc.get_objects())
        if new_objects > self.old_objects:
            self.testcase.fail('Example code leaked')
        _gc_lock.release()
        gc.enable()


问题


面经


文章

微信
公众号

扫码关注公众号