def producer(self):
self.logger.info(f"{self.name}: producer coroutine started!")
# Setup
await self.client.wait_until_ready()
await self.init()
yield_delay = self.config['crawler']['delays']['yield']
long_delay = self.config['crawler']['delays']['empty-source']
done = dict.fromkeys(self.progress.keys(), False)
while True:
self._update_current()
# Round-robin between all sources:
# Tuple because the underlying dictionary may change size
for source, last_id in tuple(self.progress.items()):
if done[source] and not self.continuous:
continue
try:
events = await self.read(source, last_id)
if events is None:
# This source is exhausted
done[source] = True
await self.queue.put((source, None, self.current))
self.progress[source] = self.current
else:
# This source still has more
done[source] = False
last_id = self.get_last_id(events)
await self.queue.put((source, events, last_id))
self.progress[source] = last_id
except discord.DiscordException:
self.logger.error(f"{self.name}: error during event read", exc_info=1)
if all(done.values()):
self.logger.info(f"{self.name}: all sources are exhausted, sleeping for a while...")
delay = long_delay
else:
delay = yield_delay
await asyncio.sleep(delay)
评论列表
文章目录