def send(self, source, raw=False, catch_exceptions=False, gather=True):
"""
Send signal with source.
If any receiver raises an error, the error propagates back through send,
terminating the dispatch loop. So it's possible that all receivers
won't be called if an error is raised.
:param source: The data to be send to the processor which produces data that will be send to the receivers.
:param raw: Optional bool parameter to just send the source to the receivers without any processing.
:param catch_exceptions: Catch and return the exceptions.
:param gather: Execute multiple receivers at the same time (parallel). On by default!
:return: Return a list of tuple pairs [(receiver, response), ... ].
"""
if raw is False:
try:
kwargs = await self.process_target(signal=self, source=source)
except SignalGlueStop:
# Stop calling the receivers when our glue says we should!
return []
else:
kwargs = dict(**source, signal=self)
if not self.receivers:
return []
# Prepare the responses from the calls.
responses = []
gather_list = []
for key, receiver in self._live_receivers():
# Dereference the weak reference.
slf = self.self_refs.get(key, None)
if slf and isinstance(slf, weakref.ReferenceType):
slf = slf()
args = [slf] if slf else []
# Execute the receiver.
coro = self.execute_receiver(receiver, args, kwargs, ignore_exceptions=catch_exceptions)
if gather:
gather_list.append(coro)
else:
responses.append(await coro)
# If gather, wait on the asyncio.gather operation and return the responses from there.
if gather:
return await asyncio.gather(*gather_list)
# Done, respond with all the results
return responses
评论列表
文章目录