def listen(cls, event, func):
"""Add a callback for a signal against the class"""
signal(event).connect(func, sender=cls)
python类signal()的实例源码
def stop_listening(cls, event, func):
"""Remove a callback for a signal against the class"""
signal(event).disconnect(func, sender=cls)
# Misc.
def test_blinker(self):
# a few tests so we know how blinker works
self.assertEqual( signals.creation, blinker.signal('creation') )
# subscribe
signals.creation.connect( lambda instance: None )
def send(self):
msg_signal = signal('msg_signal')
msg_signal.send(self) # I can get msg use self.msg from socketio register function:)
def send(self):
msg_signal = signal('msg_signal')
msg_signal.send(self) # I can get msg use self.msg from socketio register function:)
def __init__(self, srvapi, interval=1, autoconnect=True):
self._session_stats_updated = False
self._tcounts_updated = False
self._reset_session_stats()
self._reset_tcounts()
self._on_update = blinker.Signal()
self._poller_stats = RequestPoller(srvapi.rpc.session_stats,
autoconnect=autoconnect,
interval=interval,
loop=srvapi.loop)
self._poller_stats.on_response(self._handle_session_stats)
self._poller_stats.on_error(lambda e: log.debug('Ignoring exception: %r', e),
autoremove=False)
# 'session-stats' provides some counters, but not enough, so we
# request a minimalistic torrent list.
self._poller_tcount = RequestPoller(srvapi.torrent.torrents,
keys=('rate-down', 'rate-up', 'status'),
autoconnect=autoconnect,
interval=interval,
loop=srvapi.loop)
self._poller_tcount.on_response(self._handle_tlist)
def on(self, signal, callback, autoremove=True):
"""Register `callback` for `signal`
signal: 'connecting', 'connected', 'disconnected' or 'error'
callback: a callable that receives the RPC URL and, for 'error', the
exception
Callbacks are automatically unsubscribed when they are
garbage-collected.
"""
try:
# Attributes with '__' become '_Classname__attribute'
sig = getattr(self, '_TransmissionRPC__on_' + signal)
except AttributeError:
raise ValueError('Unknown signal: {!r}'.format(signal))
else:
if not isinstance(sig, Signal):
raise ValueError('Unknown signal: {!r}'.format(signal))
else:
log.debug('Registering %r for %r event', callback, signal)
sig.connect(callback, weak=autoremove)
def __init__(self, callback=None):
self._default_callback = callback
self._actions = {DEFAULT_CONTEXT: {}}
self._bindunbind_callbacks = blinker.Signal()
self._keychain_callbacks = blinker.Signal()
self._keychain_partial = []
self._keychain_context = NO_CONTEXT
def __init__(self, name, *, default=None, description=None):
self._name = str(name)
self._description = str(description) or 'No description available'
self._on_change = Signal()
self._default = self._value = self._prev_value = None
if default is not None:
initial_value = self.convert(default)
self.validate(initial_value)
self._value = initial_value
self._default = self._value
log.debug('Initialized ValueBase: %s=%r', self._name, self._value)
def __init__(self, *values):
# _values and _values_dict hold the same instances; the list is to
# preserve order and the dict is for fast access via __getitem__
self._values = []
self._values_dict = {}
self._on_change = Signal()
self.load(*values)
def __init__(self, srvapi, interval=1, autoconnect=True):
self._raw = None # Raw dict from 'session-get' or None if not connected
self._cache = {} # Cached values convert by _AFTER_GET
self._srvapi = srvapi
self._get_timestamp = 0
self._on_update = blinker.Signal()
# autoconnect must be True so the CLI 'help' command can display
# current values (e.g. 'help srv.limit.rate.down').
super().__init__(self._srvapi.rpc.session_get, autoconnect=autoconnect,
interval=interval, loop=srvapi.loop)
self.on_response(self._handle_session_get)
self.on_error(lambda error: log.debug('Ignoring %r', error), autoremove=False)
def __init__(self, request, *args, interval=1, loop=None, **kwargs):
self.loop = loop if loop is not None else asyncio.get_event_loop()
self._on_response = blinker.Signal()
self._on_error = blinker.Signal()
self._prev_error = None
self._interval = interval
self._poll_task = None
self._poll_loop_task = None
self._sleep = SleepUneasy(loop=loop)
self._skip_ongoing_request = False
self._debug_info = {'request': 'No request specified yet',
'update_cbs': [], 'error_cbs': []}
self.set_request(request, *args, **kwargs)
def add_signal(self, name):
signal = self._signals.get(name, False)
if signal: raise NameAlreadyExists()
self._signals[name] = Signal()