def test_hashing(self):
# Alive WeakMethods are hashable if the underlying object is
# hashable.
x = Object(1)
y = Object(1)
a = weakref.WeakMethod(x.some_method)
b = weakref.WeakMethod(y.some_method)
c = weakref.WeakMethod(y.other_method)
# Since WeakMethod objects are equal, the hashes should be equal.
self.assertEqual(hash(a), hash(b))
ha = hash(a)
# Dead WeakMethods retain their old hash value
del x, y
gc.collect()
self.assertEqual(hash(a), ha)
self.assertEqual(hash(b), ha)
# If it wasn't hashed when alive, a dead WeakMethod cannot be hashed.
self.assertRaises(TypeError, hash, c)
python类WeakMethod()的实例源码
def test_hashing(self):
# Alive WeakMethods are hashable if the underlying object is
# hashable.
x = Object(1)
y = Object(1)
a = weakref.WeakMethod(x.some_method)
b = weakref.WeakMethod(y.some_method)
c = weakref.WeakMethod(y.other_method)
# Since WeakMethod objects are equal, the hashes should be equal.
self.assertEqual(hash(a), hash(b))
ha = hash(a)
# Dead WeakMethods retain their old hash value
del x, y
gc.collect()
self.assertEqual(hash(a), ha)
self.assertEqual(hash(b), ha)
# If it wasn't hashed when alive, a dead WeakMethod cannot be hashed.
self.assertRaises(TypeError, hash, c)
def register(self, receiver, weak=True, dispatch_uid=None):
"""
Connect receiver to sender for signal.
:param receiver: A function or an instance method which is to receive signals. Receivers must be hashable objects.
If weak is True, then receiver must be weak referenceable.Receivers must be able to accept keyword arguments.
If a receiver is connected with a dispatch_uid argument, it
will not be added if another receiver was already connected with that dispatch_uid.
:param weak: Whether to use weak references to the receiver. By default, the
module will attempt to use weak references to the receiver
objects. If this parameter is false, then strong references will
be used.
:param dispatch_uid: An identifier used to uniquely identify a particular instance of
a receiver. This will usually be a string, though it may be anything hashable.
"""
if dispatch_uid:
lookup_key = dispatch_uid
else:
lookup_key = _make_id(receiver)
if weak:
ref = weakref.ref
receiver_object = receiver
# Check for bound methods.
if hasattr(receiver, '__self__') and hasattr(receiver, '__func__'):
ref = weakref.WeakMethod
receiver_object = receiver.__self__
receiver = ref(receiver)
weakref.finalize(receiver_object, self._remove_receiver)
with self.lock:
self._clear_dead_receivers()
for rec_key in self.receivers:
if rec_key == lookup_key:
break
else:
self.receivers.append((lookup_key, receiver))
self.sender_receivers_cache.clear()
def register_user_callback(self, func):
"""
Func to be called when a subscription receives a new EventAdd command.
This function will be called by a Task in the main thread. If ``func``
needs to do CPU-intensive or I/O-related work, it should execute that
work in a separate thread of process.
"""
if inspect.ismethod(func):
self._callback = weakref.WeakMethod(func, self._callback_cleared)
else:
self._callback = weakref.ref(func, self._callback_cleared)
def test_alive(self):
o = Object(1)
r = weakref.WeakMethod(o.some_method)
self.assertIsInstance(r, weakref.ReferenceType)
self.assertIsInstance(r(), type(o.some_method))
self.assertIs(r().__self__, o)
self.assertIs(r().__func__, o.some_method.__func__)
self.assertEqual(r()(), 4)
def test_object_dead(self):
o = Object(1)
r = weakref.WeakMethod(o.some_method)
del o
gc.collect()
self.assertIs(r(), None)
def test_method_dead(self):
C = self._subclass()
o = C(1)
r = weakref.WeakMethod(o.some_method)
del C.some_method
gc.collect()
self.assertIs(r(), None)
def test_callback_when_object_dead(self):
# Test callback behaviour when object dies first.
C = self._subclass()
calls = []
def cb(arg):
calls.append(arg)
o = C(1)
r = weakref.WeakMethod(o.some_method, cb)
del o
gc.collect()
self.assertEqual(calls, [r])
# Callback is only called once.
C.some_method = Object.some_method
gc.collect()
self.assertEqual(calls, [r])
def test_callback_when_method_dead(self):
# Test callback behaviour when method dies first.
C = self._subclass()
calls = []
def cb(arg):
calls.append(arg)
o = C(1)
r = weakref.WeakMethod(o.some_method, cb)
del C.some_method
gc.collect()
self.assertEqual(calls, [r])
# Callback is only called once.
del o
gc.collect()
self.assertEqual(calls, [r])
def test_equality(self):
def _eq(a, b):
self.assertTrue(a == b)
self.assertFalse(a != b)
def _ne(a, b):
self.assertTrue(a != b)
self.assertFalse(a == b)
x = Object(1)
y = Object(1)
a = weakref.WeakMethod(x.some_method)
b = weakref.WeakMethod(y.some_method)
c = weakref.WeakMethod(x.other_method)
d = weakref.WeakMethod(y.other_method)
# Objects equal, same method
_eq(a, b)
_eq(c, d)
# Objects equal, different method
_ne(a, c)
_ne(a, d)
_ne(b, c)
_ne(b, d)
# Objects unequal, same or different method
z = Object(2)
e = weakref.WeakMethod(z.some_method)
f = weakref.WeakMethod(z.other_method)
_ne(a, e)
_ne(a, f)
_ne(b, e)
_ne(b, f)
del x, y, z
gc.collect()
# Dead WeakMethods compare by identity
refs = a, b, c, d, e, f
for q in refs:
for r in refs:
self.assertEqual(q == r, q is r)
self.assertEqual(q != r, q is not r)
def test_subscribe_method(self, pubpen):
"""Test that adding method callbacks succeed"""
foo = Foo()
first = pubpen.subscribe('test_event', foo.method)
# Test internals of saving worked
assert pubpen._subscriptions[first] == 'test_event'
assert len(pubpen._event_handlers['test_event']) == 1
event = pubpen._event_handlers['test_event']
assert list(event.keys()) == [first]
assert list(event.values()) == [weakref.WeakMethod(foo.method)]
def test_subscribe_diff_callback_same_event(self, pubpen):
first = pubpen.subscribe('test_event', function)
foo = Foo()
second = pubpen.subscribe('test_event', foo.method)
# Test internals of subscribing worked
assert pubpen._subscriptions[first] == 'test_event'
assert pubpen._subscriptions[second] == 'test_event'
assert len(pubpen._event_handlers['test_event']) == 2
events = pubpen._event_handlers['test_event']
assert events[first] != events[second]
assert events[first] in (weakref.ref(function), weakref.WeakMethod(foo.method))
assert events[second] in (weakref.ref(function), weakref.WeakMethod(foo.method))
def test_subscribe_diff_callback_diff_event(self, pubpen):
first = pubpen.subscribe('test_event1', function)
foo = Foo()
second = pubpen.subscribe('test_event2', foo.method)
# Test internals of subscribing worked
assert pubpen._subscriptions[first] == 'test_event1'
assert pubpen._subscriptions[second] == 'test_event2'
assert len(pubpen._event_handlers['test_event1']) == 1
assert len(pubpen._event_handlers['test_event2']) == 1
events = pubpen._event_handlers['test_event1']
assert events[first] == weakref.ref(function)
events = pubpen._event_handlers['test_event2']
assert events[second] == weakref.WeakMethod(foo.method)
def register(self, slot):
if inspect.ismethod(slot):
self._methods.add(weakref.WeakMethod(slot))
else:
self._functions.add(slot)
def __init__(self, slot, weak=False):
self._weak = weak or isinstance(slot, weakref.ref)
if weak and not isinstance(slot, weakref.ref):
if isinstance(slot, types.MethodType):
slot = WeakMethod(slot)
else:
slot = weakref.ref(slot)
self._slot = slot
def test_alive(self):
o = Object(1)
r = weakref.WeakMethod(o.some_method)
self.assertIsInstance(r, weakref.ReferenceType)
self.assertIsInstance(r(), type(o.some_method))
self.assertIs(r().__self__, o)
self.assertIs(r().__func__, o.some_method.__func__)
self.assertEqual(r()(), 4)
def test_object_dead(self):
o = Object(1)
r = weakref.WeakMethod(o.some_method)
del o
gc.collect()
self.assertIs(r(), None)
def test_method_dead(self):
C = self._subclass()
o = C(1)
r = weakref.WeakMethod(o.some_method)
del C.some_method
gc.collect()
self.assertIs(r(), None)
def test_callback_when_object_dead(self):
# Test callback behaviour when object dies first.
C = self._subclass()
calls = []
def cb(arg):
calls.append(arg)
o = C(1)
r = weakref.WeakMethod(o.some_method, cb)
del o
gc.collect()
self.assertEqual(calls, [r])
# Callback is only called once.
C.some_method = Object.some_method
gc.collect()
self.assertEqual(calls, [r])
def test_callback_when_method_dead(self):
# Test callback behaviour when method dies first.
C = self._subclass()
calls = []
def cb(arg):
calls.append(arg)
o = C(1)
r = weakref.WeakMethod(o.some_method, cb)
del C.some_method
gc.collect()
self.assertEqual(calls, [r])
# Callback is only called once.
del o
gc.collect()
self.assertEqual(calls, [r])
def test_equality(self):
def _eq(a, b):
self.assertTrue(a == b)
self.assertFalse(a != b)
def _ne(a, b):
self.assertTrue(a != b)
self.assertFalse(a == b)
x = Object(1)
y = Object(1)
a = weakref.WeakMethod(x.some_method)
b = weakref.WeakMethod(y.some_method)
c = weakref.WeakMethod(x.other_method)
d = weakref.WeakMethod(y.other_method)
# Objects equal, same method
_eq(a, b)
_eq(c, d)
# Objects equal, different method
_ne(a, c)
_ne(a, d)
_ne(b, c)
_ne(b, d)
# Objects unequal, same or different method
z = Object(2)
e = weakref.WeakMethod(z.some_method)
f = weakref.WeakMethod(z.other_method)
_ne(a, e)
_ne(a, f)
_ne(b, e)
_ne(b, f)
del x, y, z
gc.collect()
# Dead WeakMethods compare by identity
refs = a, b, c, d, e, f
for q in refs:
for r in refs:
self.assertEqual(q == r, q is r)
self.assertEqual(q != r, q is not r)
def subscribe(self, event, callback):
""" Subscribe a callback to an event
:arg event: String name of an event to subscribe to
:callback: The function to call when the event is published. This can
be any python callable.
Use :func:`functools.partial` to call the callback with any other
arguments.
.. note:: The callback is registered with the event each time this
method is called. The callback is called each time it has been
registered when the event is published. For example::
>>> import asyncio
>>> import pubmarine
>>> pubpen = pubmarine.PubPen(asyncio.get_event_loop)
>>> def message():
... print('message called')
>>> pubpen.subscribe('test', message)
>>> pubpen.subscribe('test', message)
>>> pubpen.publish('test')
message called
message called
If the caller wants the callback to only be called once, it is the
caller's responsibility to only subscribe the callback once.
"""
if self._event_list and event not in self._event_list:
raise EventNotFoundError('{} is not a registered event'
.format(event))
# Get an id for the subscription
sub_id = next(self._next_id)
self._subscriptions[sub_id] = event
try:
# Add a method
self._event_handlers[event][sub_id] = WeakMethod(callback)
except TypeError:
# Add a function
self._event_handlers[event][sub_id] = ref(callback)
return sub_id