def _make_cached_stream_func(src_func, wrapper_func):
cache = WeakKeyDictionary()
def func():
stream = src_func()
try:
rv = cache.get(stream)
except Exception:
rv = None
if rv is not None:
return rv
rv = wrapper_func()
try:
cache[stream] = rv
except Exception:
pass
return rv
return func
python类WeakKeyDictionary()的实例源码
def _make_cached_stream_func(src_func, wrapper_func):
cache = WeakKeyDictionary()
def func():
stream = src_func()
try:
rv = cache.get(stream)
except Exception:
rv = None
if rv is not None:
return rv
rv = wrapper_func()
try:
cache[stream] = rv
except Exception:
pass
return rv
return func
def __init__(self, providing_args=None, use_caching=False):
"""
Create a new signal.
providing_args
A list of the arguments this signal can pass along in a send() call.
"""
self.receivers = []
if providing_args is None:
providing_args = []
self.providing_args = set(providing_args)
self.lock = threading.Lock()
self.use_caching = use_caching
# For convenience we create empty caches even if they are not used.
# A note about caching: if use_caching is defined, then for each
# distinct sender we cache the receivers that sender has in
# 'sender_receivers_cache'. The cache is cleaned when .connect() or
# .disconnect() is called and populated on send().
self.sender_receivers_cache = weakref.WeakKeyDictionary() if use_caching else {}
self._dead_receivers = False
def __init__(self, parent=None):
self._children = WeakKeyDictionary()
self._parents = []
self._local_registry = {}
self._local_reserved_words = {}
self._local_group_words = {}
self._local_list_words = {}
self._registry = {}
self._reserved_words = {}
self._group_words = {}
self._list_words = {}
if parent:
self._parents.extend(parent._parents)
self._parents.append(parent)
parent._children[self] = True
self._update_cache()
def test_deepcopy_weakkeydict(self):
class C(object):
def __init__(self, i):
self.i = i
a, b, c, d = [C(i) for i in range(4)]
u = weakref.WeakKeyDictionary()
u[a] = b
u[c] = d
# Keys aren't copied, values are
v = copy.deepcopy(u)
self.assertNotEqual(v, u)
self.assertEqual(len(v), 2)
self.assertIsNot(v[a], b)
self.assertIsNot(v[c], d)
self.assertEqual(v[a].i, b.i)
self.assertEqual(v[c].i, d.i)
del c
self.assertEqual(len(v), 1)
def test_trashcan_16602(self):
# Issue #16602: when a weakref's target was part of a long
# deallocation chain, the trashcan mechanism could delay clearing
# of the weakref and make the target object visible from outside
# code even though its refcount had dropped to 0. A crash ensued.
class C:
def __init__(self, parent):
if not parent:
return
wself = weakref.ref(self)
def cb(wparent):
o = wself()
self.wparent = weakref.ref(parent, cb)
d = weakref.WeakKeyDictionary()
root = c = C(None)
for n in range(100):
d[c] = c = C(c)
del root
gc.collect()
def __init__(self, get):
self._get = get
self._cache = WeakKeyDictionary()
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
threading.Thread.__init__(self, group, target, name, args, kwargs)
self._pid = None
self._children = weakref.WeakKeyDictionary()
self._start_called = False
self._parent = current_process()
def dns_bulk_resolve(candidates, reverse=False, ip_version=None, threads=50):
"""
Resolve a list of host names to IPs or, if reverse is true, IPs to
host names. Return a map of each result keyed to its candidate.
WARNING: This function will create a pool of up to 'threads'
threads.
"""
# This is based loosely on http://stackoverflow.com/a/34377198
if reverse and ip_version is not None:
raise ValueError("Unable to force IP version when reverse-resolving")
if ip_version is None:
ip_version = 4
__check_ip_version__(ip_version)
result = {}
if len(candidates) == 0:
return result
# Work around a bug in 2.6
# TODO: Get rid of this when 2.6 is no longer in the picture.
if not hasattr(threading.current_thread(), "_children"):
threading.current_thread()._children = weakref.WeakKeyDictionary()
pool = multiprocessing.dummy.Pool(
processes=min(len(candidates), threads) )
candidate_args = [ (candidate, ip_version) for candidate in candidates ]
for ip, name in pool.imap(
__reverser__ if reverse else __forwarder__,
candidate_args,
chunksize=1):
result[ip] = name
pool.close()
return result
def __init__(self, area):
object.__init__(self)
self.area = area
self._container = None
self._stretch = (10, 10)
self.stretches = weakref.WeakKeyDictionary()
def __init__(self, widgetList=None):
"""Initialize WidgetGroup, adding specified widgets into this group.
widgetList can be:
- a list of widget specifications (widget, [name], [scale])
- a dict of name: widget pairs
- any QObject, and all compatible child widgets will be added recursively.
The 'scale' parameter for each widget allows QSpinBox to display a different value than the value recorded
in the group state (for example, the program may set a spin box value to 100e-6 and have it displayed as 100 to the user)
"""
QtCore.QObject.__init__(self)
self.widgetList = weakref.WeakKeyDictionary() # Make sure widgets don't stick around just because they are listed here
self.scales = weakref.WeakKeyDictionary()
self.cache = {} ## name:value pairs
self.uncachedWidgets = weakref.WeakKeyDictionary()
if isinstance(widgetList, QtCore.QObject):
self.autoAdd(widgetList)
elif isinstance(widgetList, list):
for w in widgetList:
self.addWidget(*w)
elif isinstance(widgetList, dict):
for name, w in widgetList.items():
self.addWidget(w, name)
elif widgetList is None:
return
else:
raise Exception("Wrong argument type %s" % type(widgetList))
def __init__(self, area):
object.__init__(self)
self.area = area
self._container = None
self._stretch = (10, 10)
self.stretches = weakref.WeakKeyDictionary()
def _type_memos(self):
return weakref.WeakKeyDictionary()
def _take_snapshot(self):
if not self._is_transaction_boundary:
self._new = self._parent._new
self._deleted = self._parent._deleted
self._dirty = self._parent._dirty
self._key_switches = self._parent._key_switches
return
if not self.session._flushing:
self.session.flush()
self._new = weakref.WeakKeyDictionary()
self._deleted = weakref.WeakKeyDictionary()
self._dirty = weakref.WeakKeyDictionary()
self._key_switches = weakref.WeakKeyDictionary()
def __init__(self, parent_dispatch_cls, fn):
self.name = fn.__name__
argspec = util.inspect_getargspec(fn)
self.arg_names = argspec.args[1:]
self.has_kw = bool(argspec.keywords)
self.legacy_signatures = list(reversed(
sorted(
getattr(fn, '_legacy_signatures', []),
key=lambda s: s[0]
)
))
fn.__doc__ = legacy._augment_fn_docs(self, parent_dispatch_cls, fn)
self._clslevel = weakref.WeakKeyDictionary()
def _parents(self):
"""Dictionary of parent object->attribute name on the parent.
This attribute is a so-called "memoized" property. It initializes
itself with a new ``weakref.WeakKeyDictionary`` the first time
it is accessed, returning the same object upon subsequent access.
"""
return weakref.WeakKeyDictionary()
def __init__(self, *args, **kw):
init_dict = dict(*args, **kw)
self._d = weakref.WeakKeyDictionary(
(key, self._create_value(key, value))
for key, value in init_dict.iteritems())
def __init__(self, Base):
self.base = Base
self.dbs = WeakKeyDictionary()
def _async_clients(cls):
attr_name = '_async_client_dict_' + cls.__name__
if not hasattr(cls, attr_name):
setattr(cls, attr_name, weakref.WeakKeyDictionary())
return getattr(cls, attr_name)
def _async_clients(cls):
attr_name = '_async_client_dict_' + cls.__name__
if not hasattr(cls, attr_name):
setattr(cls, attr_name, weakref.WeakKeyDictionary())
return getattr(cls, attr_name)