python类WeakKeyDictionary()的实例源码

dispatcher.py 文件源码 项目:PyPlanet 作者: PyPlanet 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, code=None, namespace=None, process_target=None, use_caching=False):
        """
        Create a new signal.
        """
        if not process_target:
            process_target = self.process
        self.process_target = process_target

        self.receivers = list()
        self.self_refs = dict()
        self.lock = threading.Lock()

        if code:
            self.code = code
        else:
            self.code = self.Meta.code

        if namespace:
            self.namespace = namespace
        else:
            self.namespace = self.Meta.namespace

        self.use_caching = use_caching
        self.sender_receivers_cache = weakref.WeakKeyDictionary() if use_caching else {}
        self._dead_receivers = False
GraphicsScene.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __init__(self, clickRadius=2, moveDistance=5, parent=None):
        QtGui.QGraphicsScene.__init__(self, parent)
        self.setClickRadius(clickRadius)
        self.setMoveDistance(moveDistance)
        self.exportDirectory = None

        self.clickEvents = []
        self.dragButtons = []
        self.mouseGrabber = None
        self.dragItem = None
        self.lastDrag = None
        self.hoverItems = weakref.WeakKeyDictionary()
        self.lastHoverEvent = None
        self.minDragTime = 0.5  # drags shorter than 0.5 sec are interpreted as clicks

        self.contextMenu = [QtGui.QAction("Export...", self)]
        self.contextMenu[0].triggered.connect(self.showExportDialog)

        self.exportDialog = None
GraphicsScene.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, clickRadius=2, moveDistance=5, parent=None):
        QtGui.QGraphicsScene.__init__(self, parent)
        self.setClickRadius(clickRadius)
        self.setMoveDistance(moveDistance)
        self.exportDirectory = None

        self.clickEvents = []
        self.dragButtons = []
        self.mouseGrabber = None
        self.dragItem = None
        self.lastDrag = None
        self.hoverItems = weakref.WeakKeyDictionary()
        self.lastHoverEvent = None
        self.minDragTime = 0.5  # drags shorter than 0.5 sec are interpreted as clicks

        self.contextMenu = [QtGui.QAction("Export...", self)]
        self.contextMenu[0].triggered.connect(self.showExportDialog)

        self.exportDialog = None
_compat.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
dispatcher.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, providing_args=None, use_caching=False):
        """
        Create a new signal.

        providing_args
            A list of the arguments this signal can pass along in a send() call.
        """
        self.receivers = []
        if providing_args is None:
            providing_args = []
        self.providing_args = set(providing_args)
        self.lock = threading.Lock()
        self.use_caching = use_caching
        # For convenience we create empty caches even if they are not used.
        # A note about caching: if use_caching is defined, then for each
        # distinct sender we cache the receivers that sender has in
        # 'sender_receivers_cache'. The cache is cleaned when .connect() or
        # .disconnect() is called and populated on send().
        self.sender_receivers_cache = weakref.WeakKeyDictionary() if use_caching else {}
        self._dead_receivers = False
_compat.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
_compat.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
_compat.py 文件源码 项目:RPoint 作者: george17-meet 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
attr.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, parent_dispatch_cls, fn):
        self.__name__ = fn.__name__
        argspec = util.inspect_getargspec(fn)
        self.arg_names = argspec.args[1:]
        self.has_kw = bool(argspec.keywords)
        self.legacy_signatures = list(reversed(
            sorted(
                getattr(fn, '_legacy_signatures', []),
                key=lambda s: s[0]
            )
        ))
        self.__doc__ = fn.__doc__ = legacy._augment_fn_docs(
            self, parent_dispatch_cls, fn)

        self._clslevel = weakref.WeakKeyDictionary()
        self._empty_listeners = weakref.WeakKeyDictionary()
attr.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, parent_dispatch_cls, fn):
        self.__name__ = fn.__name__
        argspec = util.inspect_getargspec(fn)
        self.arg_names = argspec.args[1:]
        self.has_kw = bool(argspec.keywords)
        self.legacy_signatures = list(reversed(
            sorted(
                getattr(fn, '_legacy_signatures', []),
                key=lambda s: s[0]
            )
        ))
        self.__doc__ = fn.__doc__ = legacy._augment_fn_docs(
            self, parent_dispatch_cls, fn)

        self._clslevel = weakref.WeakKeyDictionary()
        self._empty_listeners = weakref.WeakKeyDictionary()
_compat.py 文件源码 项目:RealtimePythonChat 作者: quangtqag 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
cache.py 文件源码 项目:adaptivemd 作者: markovmodel 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, size_limit=100, weak_type='value'):
        """
        Parameters
        ----------
        size_limit : int
            integer that defines the size of the LRU cache. Default is 100.
        """

        super(WeakLRUCache, self).__init__()
        self._size_limit = size_limit
        self.weak_type = weak_type

        if weak_type == 'value':
            self._weak_cache = weakref.WeakValueDictionary()
        elif weak_type == 'key':
            self._weak_cache = weakref.WeakKeyDictionary()
        else:
            raise ValueError("weak_type must be either 'key' or 'value'")

        self._cache = OrderedDict()
_compat.py 文件源码 项目:Indushell 作者: SecarmaLabs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
_compat.py 文件源码 项目:Liljimbo-Chatbot 作者: chrisjim316 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
_compat.py 文件源码 项目:flask_system 作者: prashasy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
test_copy.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_deepcopy_weakkeydict(self):
        class C(object):
            def __init__(self, i):
                self.i = i
        a, b, c, d = [C(i) for i in range(4)]
        u = weakref.WeakKeyDictionary()
        u[a] = b
        u[c] = d
        # Keys aren't copied, values are
        v = copy.deepcopy(u)
        self.assertNotEqual(v, u)
        self.assertEqual(len(v), 2)
        self.assertFalse(v[a] is b)
        self.assertFalse(v[c] is d)
        self.assertEqual(v[a].i, b.i)
        self.assertEqual(v[c].i, d.i)
        del c
        support.gc_collect()
        self.assertEqual(len(v), 1)
_compat.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
_compat.py 文件源码 项目:jltools 作者: ownport 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
_compat.py 文件源码 项目:FileStoreGAE 作者: liantian-cn 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
dispatcher.py 文件源码 项目:DjangoBlog 作者: 0daybug 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, providing_args=None, use_caching=False):
        """
        Create a new signal.

        providing_args
            A list of the arguments this signal can pass along in a send() call.
        """
        self.receivers = []
        if providing_args is None:
            providing_args = []
        self.providing_args = set(providing_args)
        self.lock = threading.Lock()
        self.use_caching = use_caching
        # For convenience we create empty caches even if they are not used.
        # A note about caching: if use_caching is defined, then for each
        # distinct sender we cache the receivers that sender has in
        # 'sender_receivers_cache'. The cache is cleaned when .connect() or
        # .disconnect() is called and populated on send().
        self.sender_receivers_cache = weakref.WeakKeyDictionary() if use_caching else {}
        self._dead_receivers = False
_compat.py 文件源码 项目:bawk 作者: jttwnsnd 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
_compat.py 文件源码 项目:infinite-lorem-ipsum 作者: patjm1992 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
_compat.py 文件源码 项目:Price-Comparator 作者: Thejas-1 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
test_copy.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_deepcopy_weakkeydict(self):
        class C(object):
            def __init__(self, i):
                self.i = i
        a, b, c, d = [C(i) for i in xrange(4)]
        u = weakref.WeakKeyDictionary()
        u[a] = b
        u[c] = d
        # Keys aren't copied, values are
        v = copy.deepcopy(u)
        self.assertNotEqual(v, u)
        self.assertEqual(len(v), 2)
        self.assertFalse(v[a] is b)
        self.assertFalse(v[c] is d)
        self.assertEqual(v[a].i, b.i)
        self.assertEqual(v[c].i, d.i)
        del c
        self.assertEqual(len(v), 1)
test_weakref.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_trashcan_16602(self):
        # Issue #16602: when a weakref's target was part of a long
        # deallocation chain, the trashcan mechanism could delay clearing
        # of the weakref and make the target object visible from outside
        # code even though its refcount had dropped to 0.  A crash ensued.
        class C(object):
            def __init__(self, parent):
                if not parent:
                    return
                wself = weakref.ref(self)
                def cb(wparent):
                    o = wself()
                self.wparent = weakref.ref(parent, cb)

        d = weakref.WeakKeyDictionary()
        root = c = C(None)
        for n in range(100):
            d[c] = c = C(c)
        del root
        gc.collect()
test_copy.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_deepcopy_weakkeydict(self):
        class C(object):
            def __init__(self, i):
                self.i = i
        a, b, c, d = [C(i) for i in xrange(4)]
        u = weakref.WeakKeyDictionary()
        u[a] = b
        u[c] = d
        # Keys aren't copied, values are
        v = copy.deepcopy(u)
        self.assertNotEqual(v, u)
        self.assertEqual(len(v), 2)
        self.assertFalse(v[a] is b)
        self.assertFalse(v[c] is d)
        self.assertEqual(v[a].i, b.i)
        self.assertEqual(v[c].i, d.i)
        del c
        self.assertEqual(len(v), 1)
test_weakref.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_trashcan_16602(self):
        # Issue #16602: when a weakref's target was part of a long
        # deallocation chain, the trashcan mechanism could delay clearing
        # of the weakref and make the target object visible from outside
        # code even though its refcount had dropped to 0.  A crash ensued.
        class C(object):
            def __init__(self, parent):
                if not parent:
                    return
                wself = weakref.ref(self)
                def cb(wparent):
                    o = wself()
                self.wparent = weakref.ref(parent, cb)

        d = weakref.WeakKeyDictionary()
        root = c = C(None)
        for n in range(100):
            d[c] = c = C(c)
        del root
        gc.collect()
data_runner.py 文件源码 项目:hart 作者: akosiorek 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __init__(self, queue, enqueue_ops):
        """Create a PyQueueRunner.

        When you later call the `create_threads()` method, the `QueueRunner` will
        create one thread for each op in `enqueue_ops`.  Each thread will run its
        enqueue op in parallel with the other threads.  The enqueue ops do not have
        to all be the same op, but it is expected that they all enqueue tensors in
        `queue`.

        Args:
          qnqueue_handler: a python function that transforms
          queue: A `Queue`.
          enqueue_ops: List of enqueue ops to run in threads later.
        """
        self._queue = queue
        self._enqueue_ops = enqueue_ops

        self._lock = threading.Lock()
        # A map from a session object to the number of outstanding queue runner
        # threads for that session.
        self._runs_per_session = weakref.WeakKeyDictionary()
dispatcher.py 文件源码 项目:trydjango18 作者: lucifer-yqh 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, providing_args=None, use_caching=False):
        """
        Create a new signal.

        providing_args
            A list of the arguments this signal can pass along in a send() call.
        """
        self.receivers = []
        if providing_args is None:
            providing_args = []
        self.providing_args = set(providing_args)
        self.lock = threading.Lock()
        self.use_caching = use_caching
        # For convenience we create empty caches even if they are not used.
        # A note about caching: if use_caching is defined, then for each
        # distinct sender we cache the receivers that sender has in
        # 'sender_receivers_cache'. The cache is cleaned when .connect() or
        # .disconnect() is called and populated on send().
        self.sender_receivers_cache = weakref.WeakKeyDictionary() if use_caching else {}
        self._dead_receivers = False
_compat.py 文件源码 项目:python-group-proj 作者: Sharcee 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func


问题


面经


文章

微信
公众号

扫码关注公众号