def _drain_events(self, on_event):
if not hasattr(self, 'bus_queue'):
raise Exception('You must listen for events before consuming them')
with Connection(self._url) as conn:
with Consumer(conn, self.bus_queue, callbacks=[on_event]):
try:
while True:
conn.drain_events(timeout=0.5)
except TimeoutError:
pass
评论列表
文章目录