def poll_all_received_events(self):
""" This will be triggered once for each `echo_node_alarm_callback`.
It polls all channels for `EventTransferReceivedSuccess` events,
adds all new events to the `self.received_transfers` queue and
respawns `self.echo_node_worker`, if it died. """
locked = False
try:
with Timeout(10):
locked = self.lock.acquire(blocking=False)
if not locked:
return
else:
channels = self.api.get_channel_list(token_address=self.token_address)
received_transfers = list()
for channel in channels:
channel_events = self.api.get_channel_events(
channel.channel_address,
self.last_poll_block
)
received_transfers.extend([
event for event in channel_events
if event['_event_type'] == 'EventTransferReceivedSuccess'
])
for event in received_transfers:
transfer = event.copy()
transfer.pop('block_number')
self.received_transfers.put(transfer)
# set last_poll_block after events are enqueued (timeout safe)
if received_transfers:
self.last_poll_block = max(
event['block_number']
for event in received_transfers
)
# increase last_poll_block if the blockchain proceeded
delta_blocks = self.api.raiden.get_block_number() - self.last_poll_block
if delta_blocks > 1:
self.last_poll_block += 1
if not self.echo_worker_greenlet.started:
log.debug(
'restarting echo_worker_greenlet',
dead=self.echo_worker_greenlet.dead,
successful=self.echo_worker_greenlet.successful(),
exception=self.echo_worker_greenlet.exception
)
self.echo_worker_greenlet = gevent.spawn(self.echo_worker)
except Timeout:
log.info('timeout while polling for events')
finally:
if locked:
self.lock.release()
评论列表
文章目录